diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index 8897cced89..a91bd63c3b 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -3,16 +3,16 @@ from abc import abstractmethod from typing import TYPE_CHECKING, Optional -import numpy as np from zarr.abc.metadata import Metadata +from zarr.buffer import Buffer, NDBuffer from zarr.common import ArraySpec from zarr.store import StorePath if TYPE_CHECKING: from typing_extensions import Self - from zarr.common import BytesLike, SliceSelection + from zarr.common import SliceSelection from zarr.metadata import ArrayMetadata @@ -37,17 +37,17 @@ class ArrayArrayCodec(Codec): @abstractmethod async def decode( self, - chunk_array: np.ndarray, + chunk_array: NDBuffer, chunk_spec: ArraySpec, - ) -> np.ndarray: + ) -> NDBuffer: pass @abstractmethod async def encode( self, - chunk_array: np.ndarray, + chunk_array: NDBuffer, chunk_spec: ArraySpec, - ) -> Optional[np.ndarray]: + ) -> Optional[NDBuffer]: pass @@ -55,17 +55,17 @@ class ArrayBytesCodec(Codec): @abstractmethod async def decode( self, - chunk_array: BytesLike, + chunk_array: Buffer, chunk_spec: ArraySpec, - ) -> np.ndarray: + ) -> NDBuffer: pass @abstractmethod async def encode( self, - chunk_array: np.ndarray, + chunk_array: NDBuffer, chunk_spec: ArraySpec, - ) -> Optional[BytesLike]: + ) -> Optional[Buffer]: pass @@ -76,7 +76,7 @@ async def decode_partial( store_path: StorePath, selection: SliceSelection, chunk_spec: ArraySpec, - ) -> Optional[np.ndarray]: + ) -> Optional[NDBuffer]: pass @@ -85,7 +85,7 @@ class ArrayBytesCodecPartialEncodeMixin: async def encode_partial( self, store_path: StorePath, - chunk_array: np.ndarray, + chunk_array: NDBuffer, selection: SliceSelection, chunk_spec: ArraySpec, ) -> None: @@ -96,15 +96,15 @@ class BytesBytesCodec(Codec): @abstractmethod async def decode( self, - chunk_array: BytesLike, + chunk_array: Buffer, chunk_spec: ArraySpec, - ) -> BytesLike: + ) -> Buffer: pass @abstractmethod async def encode( self, - chunk_array: BytesLike, + chunk_array: Buffer, chunk_spec: ArraySpec, - ) -> Optional[BytesLike]: + ) -> Optional[Buffer]: pass diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index 3d9550f733..914987cda7 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -3,12 +3,14 @@ from typing import List, Tuple, Optional +from zarr.buffer import Buffer + class Store(ABC): @abstractmethod async def get( self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[bytes]: + ) -> Optional[Buffer]: """Retrieve the value associated with a given key. Parameters @@ -18,14 +20,14 @@ async def get( Returns ------- - bytes + Buffer """ ... @abstractmethod async def get_partial_values( self, key_ranges: List[Tuple[str, Tuple[int, int]]] - ) -> List[Optional[bytes]]: + ) -> List[Optional[Buffer]]: """Retrieve possibly partial values from given key_ranges. Parameters @@ -35,8 +37,7 @@ async def get_partial_values( Returns ------- - list[bytes] - list of values, in the order of the key_ranges, may contain null/none for missing keys + list of values, in the order of the key_ranges, may contain null/none for missing keys """ ... @@ -61,7 +62,7 @@ def supports_writes(self) -> bool: ... @abstractmethod - async def set(self, key: str, value: bytes) -> None: + async def set(self, key: str, value: Buffer) -> None: """Store a (key, value) pair. Parameters diff --git a/src/zarr/array.py b/src/zarr/array.py index a594b3dd11..1567c9bbe5 100644 --- a/src/zarr/array.py +++ b/src/zarr/array.py @@ -20,6 +20,7 @@ # from zarr.array_v2 import ArrayV2 +from zarr.buffer import Buffer, Factory, NDArrayLike, NDBuffer from zarr.codecs import BytesCodec from zarr.codecs.pipeline import CodecPipeline from zarr.common import ( @@ -147,7 +148,7 @@ async def open( assert zarr_json_bytes is not None return cls.from_dict( store_path, - json.loads(zarr_json_bytes), + json.loads(zarr_json_bytes.to_bytes()), ) @classmethod @@ -160,7 +161,7 @@ async def open_auto( if v3_metadata_bytes is not None: return cls.from_dict( store_path, - json.loads(v3_metadata_bytes), + json.loads(v3_metadata_bytes.to_bytes()), ) else: raise ValueError("no v2 support yet") @@ -186,7 +187,9 @@ def dtype(self) -> np.dtype[Any]: def attrs(self) -> dict[str, Any]: return self.metadata.attributes - async def getitem(self, selection: Selection) -> npt.NDArray[Any]: + async def getitem( + self, selection: Selection, *, factory: Factory.Create = NDBuffer.create + ) -> NDArrayLike: assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) indexer = BasicIndexer( selection, @@ -195,10 +198,8 @@ async def getitem(self, selection: Selection) -> npt.NDArray[Any]: ) # setup output array - out = np.zeros( - indexer.shape, - dtype=self.metadata.dtype, - order=self.order, + out = factory( + shape=indexer.shape, dtype=self.metadata.dtype, order=self.order, fill_value=0 ) # reading chunks and decoding them @@ -210,21 +211,17 @@ async def getitem(self, selection: Selection) -> npt.NDArray[Any]: self._read_chunk, config.get("async.concurrency"), ) - - if out.shape: - return out - else: - return out[()] + return out.as_ndarray_like() async def _save_metadata(self) -> None: - await (self.store_path / ZARR_JSON).set(self.metadata.to_bytes()) + await (self.store_path / ZARR_JSON).set(Buffer.from_bytes(self.metadata.to_bytes())) async def _read_chunk( self, chunk_coords: ChunkCoords, chunk_selection: SliceSelection, out_selection: SliceSelection, - out: npt.NDArray[Any], + out: NDBuffer, ) -> None: chunk_spec = self.metadata.get_chunk_spec(chunk_coords, self.order) chunk_key_encoding = self.metadata.chunk_key_encoding @@ -246,7 +243,12 @@ async def _read_chunk( else: out[out_selection] = self.metadata.fill_value - async def setitem(self, selection: Selection, value: npt.NDArray[Any]) -> None: + async def setitem( + self, + selection: Selection, + value: NDArrayLike, + factory: Factory.NDArrayLike = NDBuffer.from_ndarray_like, + ) -> None: assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) chunk_shape = self.metadata.chunk_grid.chunk_shape indexer = BasicIndexer( @@ -259,8 +261,7 @@ async def setitem(self, selection: Selection, value: npt.NDArray[Any]) -> None: # check value shape if np.isscalar(value): - # setting a scalar value - pass + value = np.asanyarray(value) else: if not hasattr(value, "shape"): value = np.asarray(value, self.metadata.dtype) @@ -268,6 +269,11 @@ async def setitem(self, selection: Selection, value: npt.NDArray[Any]) -> None: if value.dtype.name != self.metadata.dtype.name: value = value.astype(self.metadata.dtype, order="A") + # We accept any ndarray like object from the user and convert it + # to a NDBuffer (or subclass). From this point onwards, we only pass + # Buffer and NDBuffer between components. + value = factory(value) + # merging with existing data and encoding chunks await concurrent_map( [ @@ -286,7 +292,7 @@ async def setitem(self, selection: Selection, value: npt.NDArray[Any]) -> None: async def _write_chunk( self, - value: npt.NDArray[Any], + value: NDBuffer, chunk_shape: ChunkCoords, chunk_coords: ChunkCoords, chunk_selection: SliceSelection, @@ -300,11 +306,9 @@ async def _write_chunk( if is_total_slice(chunk_selection, chunk_shape): # write entire chunks if np.isscalar(value): - chunk_array = np.empty( - chunk_shape, - dtype=self.metadata.dtype, + chunk_array = NDBuffer.create( + shape=chunk_shape, dtype=self.metadata.dtype, fill_value=value ) - chunk_array.fill(value) else: chunk_array = value[out_selection] await self._write_chunk_to_store(store_path, chunk_array, chunk_spec) @@ -324,11 +328,11 @@ async def _write_chunk( # merge new value if chunk_bytes is None: - chunk_array = np.empty( - chunk_shape, + chunk_array = NDBuffer.create( + shape=chunk_shape, dtype=self.metadata.dtype, + fill_value=self.metadata.fill_value, ) - chunk_array.fill(self.metadata.fill_value) else: chunk_array = ( await self.codecs.decode(chunk_bytes, chunk_spec) @@ -338,9 +342,9 @@ async def _write_chunk( await self._write_chunk_to_store(store_path, chunk_array, chunk_spec) async def _write_chunk_to_store( - self, store_path: StorePath, chunk_array: npt.NDArray[Any], chunk_spec: ArraySpec + self, store_path: StorePath, chunk_array: NDBuffer, chunk_spec: ArraySpec ) -> None: - if np.all(chunk_array == self.metadata.fill_value): + if chunk_array.all_equal(self.metadata.fill_value): # chunks that only contain fill_value will be removed await store_path.delete() else: @@ -379,14 +383,14 @@ async def _delete_key(key: str) -> None: ) # Write new metadata - await (self.store_path / ZARR_JSON).set(new_metadata.to_bytes()) + await (self.store_path / ZARR_JSON).set(Buffer.from_bytes(new_metadata.to_bytes())) return replace(self, metadata=new_metadata) async def update_attributes(self, new_attributes: Dict[str, Any]) -> AsyncArray: new_metadata = replace(self.metadata, attributes=new_attributes) # Write new metadata - await (self.store_path / ZARR_JSON).set(new_metadata.to_bytes()) + await (self.store_path / ZARR_JSON).set(Buffer.from_bytes(new_metadata.to_bytes())) return replace(self, metadata=new_metadata) def __repr__(self) -> str: diff --git a/src/zarr/array_v2.py b/src/zarr/array_v2.py index 18251e7db7..053d58eb1a 100644 --- a/src/zarr/array_v2.py +++ b/src/zarr/array_v2.py @@ -10,6 +10,7 @@ from numcodecs.compat import ensure_bytes, ensure_ndarray +from zarr.buffer import Buffer, NDBuffer from zarr.common import ( ZARRAY_JSON, ZATTRS_JSON, @@ -29,6 +30,13 @@ from zarr.array import Array +def as_bytearray(data: Optional[Buffer]) -> Optional[bytes]: + """Help function to convert a Buffer into bytes if not None""" + if data is None: + return data + return data.to_bytes() + + @dataclass(frozen=True) class _AsyncArrayProxy: array: ArrayV2 @@ -144,8 +152,8 @@ async def open_async( assert zarray_bytes is not None return cls.from_dict( store_path, - zarray_json=json.loads(zarray_bytes), - zattrs_json=json.loads(zattrs_bytes) if zattrs_bytes is not None else None, + zarray_json=json.loads(zarray_bytes.to_bytes()), + zattrs_json=json.loads(zattrs_bytes.to_bytes()) if zattrs_bytes is not None else None, ) @classmethod @@ -179,7 +187,7 @@ async def _save_metadata(self) -> None: await (self.store_path / ZARRAY_JSON).set(self.metadata.to_bytes()) if self.attributes is not None and len(self.attributes) > 0: await (self.store_path / ZATTRS_JSON).set( - json.dumps(self.attributes).encode(), + Buffer.from_bytes(json.dumps(self.attributes).encode()), ) else: await (self.store_path / ZATTRS_JSON).delete() @@ -216,10 +224,8 @@ async def get_async(self, selection: Selection): ) # setup output array - out = np.zeros( - indexer.shape, - dtype=self.metadata.dtype, - order=self.metadata.order, + out = NDBuffer.create( + shape=indexer.shape, dtype=self.metadata.dtype, order=self.metadata.order, fill_value=0 ) # reading chunks and decoding them @@ -245,7 +251,7 @@ async def _read_chunk( ): store_path = self.store_path / self._encode_chunk_key(chunk_coords) - chunk_array = await self._decode_chunk(await store_path.get()) + chunk_array = await self._decode_chunk(as_bytearray(await store_path.get())) if chunk_array is not None: tmp = chunk_array[chunk_selection] out[out_selection] = tmp @@ -333,12 +339,12 @@ async def _write_chunk( if is_total_slice(chunk_selection, chunk_shape): # write entire chunks if np.isscalar(value): - chunk_array = np.empty( - chunk_shape, + chunk_array = NDBuffer.create( + shape=chunk_shape, dtype=self.metadata.dtype, order=self.metadata.order, + fill_value=value, ) - chunk_array.fill(value) else: chunk_array = value[out_selection] await self._write_chunk_to_store(store_path, chunk_array) @@ -346,16 +352,16 @@ async def _write_chunk( else: # writing partial chunks # read chunk first - tmp = await self._decode_chunk(await store_path.get()) + tmp = await self._decode_chunk(as_bytearray(await store_path.get())) # merge new value if tmp is None: - chunk_array = np.empty( - chunk_shape, + chunk_array = NDBuffer.create( + shape=chunk_shape, dtype=self.metadata.dtype, order=self.metadata.order, + fill_value=self.metadata.fill_value, ) - chunk_array.fill(self.metadata.fill_value) else: chunk_array = tmp.copy( order=self.metadata.order, @@ -374,7 +380,7 @@ async def _write_chunk_to_store(self, store_path: StorePath, chunk_array: np.nda if chunk_bytes is None: await store_path.delete() else: - await store_path.set(chunk_bytes) + await store_path.set(Buffer.from_bytes(chunk_bytes)) async def _encode_chunk(self, chunk_array: np.ndarray) -> Optional[BytesLike]: chunk_array = chunk_array.ravel(order=self.metadata.order) @@ -493,7 +499,7 @@ async def convert_to_v3_async(self) -> Array: ) new_metadata_bytes = new_metadata.to_bytes() - await (self.store_path / ZARR_JSON).set(new_metadata_bytes) + await (self.store_path / ZARR_JSON).set(Buffer.from_bytes(new_metadata_bytes)) return Array.from_dict( store_path=self.store_path, @@ -501,7 +507,9 @@ async def convert_to_v3_async(self) -> Array: ) async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> ArrayV2: - await (self.store_path / ZATTRS_JSON).set(json.dumps(new_attributes).encode()) + await (self.store_path / ZATTRS_JSON).set( + Buffer.from_bytes(json.dumps(new_attributes).encode()) + ) return replace(self, attributes=new_attributes) def update_attributes(self, new_attributes: Dict[str, Any]) -> ArrayV2: diff --git a/src/zarr/buffer.py b/src/zarr/buffer.py new file mode 100644 index 0000000000..a633cc09ec --- /dev/null +++ b/src/zarr/buffer.py @@ -0,0 +1,448 @@ +from __future__ import annotations + +import sys +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Iterable, + Literal, + Optional, + Protocol, + Tuple, + TypeAlias, +) + +import numpy as np + +if TYPE_CHECKING: + from typing_extensions import Self + from zarr.codecs.bytes import Endian + from zarr.common import BytesLike + +# TODO: create a protocol for the attributes we need, for now we alias Numpy's ndarray +# both for the array-like and ndarray-like +ArrayLike: TypeAlias = np.ndarray +NDArrayLike: TypeAlias = np.ndarray + + +def check_item_key_is_1d_contiguous(key: Any) -> None: + """Raises error if `key` isn't a 1d contiguous slice""" + if not isinstance(key, slice): + raise TypeError( + f"Item key has incorrect type (expected slice, got {key.__class__.__name__})" + ) + if not (key.step is None or key.step == 1): + raise ValueError("slice must be contiguous") + + +class Factory: + class Create(Protocol): + def __call__( + self, + *, + shape: Iterable[int], + dtype: np.DTypeLike, + order: Literal["C", "F"], + fill_value: Optional[Any], + ) -> NDBuffer: + """Factory function to create a new NDBuffer (or subclass) + + Callables implementing the `Factory.Create` protocol must create a new + instance of NDBuffer (or subclass) given the following parameters. + + Parameters + ---------- + shape + The shape of the new buffer + dtype + The datatype of each element in the new buffer + order + Whether to store multi-dimensional data in row-major (C-style) or + column-major (Fortran-style) order in memory. + fill_value + If not None, fill the new buffer with a scalar value. + + Return + ------ + A new NDBuffer or subclass instance + """ + + class NDArrayLike(Protocol): + def __call__(self, ndarray_like: NDArrayLike) -> NDBuffer: + """Factory function to coerce an array into a NDBuffer (or subclass) + + Callables implementing the `Factory.NDArrayLike` protocol must return + an instance of NDBuffer (or subclass) given an ndarray-like object. + + Parameters + ---------- + ndarray_like + ndarray-like object + + Return + ------ + A NDBuffer or subclass instance that represents `ndarray_like` + """ + + +class Buffer: + """A flat contiguous memory block + + We use Buffer throughout Zarr to represent a contiguous block of memory. + + A Buffer is backed by a underlying array-like instance that represents + the memory. The memory type is unspecified; can be regular host memory, + CUDA device memory, or something else. The only requirement is that the + array-like instance can be copied/converted to a regular Numpy array + (host memory). + + Note + ---- + This buffer is untyped, so all indexing and sizes are in bytes. + + Parameters + ---------- + array_like + array-like object that must be 1-dim, contiguous, and byte dtype. + """ + + def __init__(self, array_like: ArrayLike): + if array_like.ndim != 1: + raise ValueError("array_like: only 1-dim allowed") + if array_like.dtype != np.dtype("b"): + raise ValueError("array_like: only byte dtype allowed") + self._data = array_like + + @classmethod + def create_zero_length(cls) -> Self: + """Create an empty buffer with length zero + + Return + ------ + New empty 0-length buffer + """ + return cls(np.array([], dtype="b")) + + @classmethod + def from_array_like(cls, array_like: NDArrayLike) -> Self: + """Create a new buffer of a array-like object + + Parameters + ---------- + array_like + array-like object that must be 1-dim, contiguous, and byte dtype. + + Return + ------ + New buffer representing `array_like` + """ + return cls(array_like) + + @classmethod + def from_bytes(cls, bytes_like: BytesLike) -> Self: + """Create a new buffer of a bytes-like object (host memory) + + Parameters + ---------- + bytes_like + bytes-like object + + Return + ------ + New buffer representing `bytes_like` + """ + return cls.from_array_like(np.frombuffer(bytes_like, dtype="b")) + + def as_array_like(self) -> NDArrayLike: + """Return the underlying array (host or device memory) of this buffer + + This will never copy data. + + Return + ------ + The underlying 1d array such as a NumPy or CuPy array. + """ + return self._data + + def as_nd_buffer(self, *, dtype: np.DTypeLike) -> NDBuffer: + """Create a new NDBuffer from this one. + + This will never copy data. + + Parameters + ---------- + dtype + The datatype of the returned buffer (reinterpretation of the bytes) + + Return + ------ + New NDbuffer representing `self.as_array_like()` + """ + return NDBuffer.from_ndarray_like(self._data.view(dtype=dtype)) + + def as_numpy_array(self) -> np.ndarray: + """Return the buffer as a NumPy array (host memory). + + Warning + ------- + Might have to copy data, consider using `.as_array_like()` instead. + + Return + ------ + NumPy array of this buffer (might be a data copy) + """ + return np.asanyarray(self._data) + + def to_bytes(self) -> bytes: + """Return the buffer as `bytes` (host memory). + + Warning + ------- + Will always copy data, only use this method for small buffers such as metadata + buffers. If possible, use `.as_numpy_array()` or `.as_array_like()` instead. + + Return + ------ + `bytes` of this buffer (data copy) + """ + return bytes(self.as_numpy_array()) + + def __getitem__(self, key: slice) -> Self: + check_item_key_is_1d_contiguous(key) + return self.__class__(self._data.__getitem__(key)) + + def __setitem__(self, key: slice, value: Any) -> None: + check_item_key_is_1d_contiguous(key) + self._data.__setitem__(key, value) + + def __len__(self) -> int: + return self._data.size + + def __add__(self, other: Buffer) -> Self: + """Concatenate two buffers""" + + other_array = other.as_array_like() + assert other_array.dtype == np.dtype("b") + return self.__class__(np.concatenate((self._data, other_array))) + + def __eq__(self, other: Any) -> bool: + if isinstance(other, (bytes, bytearray)): + # Many of the tests compares `Buffer` with `bytes` so we + # convert the bytes to a Buffer and try again + return self == self.from_bytes(other) + if isinstance(other, Buffer): + return (self._data == other.as_array_like()).all() + raise ValueError( + f"equal operator not supported between {self.__class__} and {other.__class__}" + ) + + +class NDBuffer: + """A n-dimensional memory block + + We use NDBuffer throughout Zarr to represent a n-dimensional memory block. + + A NDBuffer is backed by a underlying ndarray-like instance that represents + the memory. The memory type is unspecified; can be regular host memory, + CUDA device memory, or something else. The only requirement is that the + ndarray-like instance can be copied/converted to a regular Numpy array + (host memory). + + Note + ---- + The two buffer classes Buffer and NDBuffer are very similar. In fact, Buffer + is a special case of NDBuffer where dim=1, stride=1, and dtype="b". However, + in order to use Python's type system to differentiate between the contiguous + Buffer and the n-dim (non-contiguous) NDBuffer, we keep the definition of the + two classes separate. + + Parameters + ---------- + ndarray_like + ndarray-like object that is convertible to a regular Numpy array. + """ + + def __init__(self, array: NDArrayLike): + assert array.ndim > 0 + assert array.dtype != object + self._data = array + + @classmethod + def create( + cls, + *, + shape: Iterable[int], + dtype: np.DTypeLike, + order: Literal["C", "F"] = "C", + fill_value: Optional[Any] = None, + ) -> Self: + """Create a new buffer and its underlying ndarray-like object + + Parameters + ---------- + shape + The shape of the buffer and its underlying ndarray-like object + dtype + The datatype of the buffer and its underlying ndarray-like object + order + Whether to store multi-dimensional data in row-major (C-style) or + column-major (Fortran-style) order in memory. + fill_value + If not None, fill the new buffer with a scalar value. + + Return + ------ + New buffer representing a new ndarray_like object + + Developer Notes + --------------- + A subclass can overwrite this method to create a ndarray-like object + other then the default Numpy array. + """ + ret = cls(np.empty(shape=shape, dtype=dtype, order=order)) + if fill_value is not None: + ret.fill(fill_value) + return ret + + @classmethod + def from_ndarray_like(cls, ndarray_like: NDArrayLike) -> Self: + """Create a new buffer of a ndarray-like object + + Parameters + ---------- + ndarray_like + ndarray-like object + + Return + ------ + New buffer representing `ndarray_like` + """ + return cls(ndarray_like) + + @classmethod + def from_numpy_array(cls, array_like: np.ArrayLike) -> Self: + """Create a new buffer of Numpy array-like object + + Parameters + ---------- + array_like + Object that can be coerced into a Numpy array + + Return + ------ + New buffer representing `array_like` + """ + return cls.from_ndarray_like(np.asanyarray(array_like)) + + def as_ndarray_like(self) -> NDArrayLike: + """Return the underlying array (host or device memory) of this buffer + + This will never copy data. + + Return + ------ + The underlying array such as a NumPy or CuPy array. + """ + return self._data + + def as_buffer(self) -> Buffer: + """Create a new Buffer from this one. + + Warning + ------- + Copies data if the buffer is non-contiguous. + + Return + ------ + The new buffer (might be data copy) + """ + data = self._data + if not self._data.flags.contiguous: + data = np.ascontiguousarray(self._data) + return Buffer(data.reshape(-1).view(dtype="b")) # Flatten the array without copy + + def as_numpy_array(self) -> np.ndarray: + """Return the buffer as a NumPy array (host memory). + + Warning + ------- + Might have to copy data, consider using `.as_ndarray_like()` instead. + + Return + ------ + NumPy array of this buffer (might be a data copy) + """ + return np.asanyarray(self._data) + + @property + def dtype(self) -> np.dtype[Any]: + return self._data.dtype + + @property + def shape(self) -> Tuple[int, ...]: + return self._data.shape + + @property + def byteorder(self) -> Endian: + from zarr.codecs.bytes import Endian + + if self.dtype.byteorder == "<": + return Endian.little + elif self.dtype.byteorder == ">": + return Endian.big + else: + return Endian(sys.byteorder) + + def reshape(self, newshape: Iterable[int]) -> Self: + return self.__class__(self._data.reshape(newshape)) + + def astype(self, dtype: np.DTypeLike, order: Literal["K", "A", "C", "F"] = "K") -> Self: + return self.__class__(self._data.astype(dtype=dtype, order=order)) + + def __getitem__(self, key: Any) -> Self: + return self.__class__(np.asanyarray(self._data.__getitem__(key))) + + def __setitem__(self, key: Any, value: Any) -> None: + if isinstance(value, NDBuffer): + value = value._data + self._data.__setitem__(key, value) + + def __len__(self) -> int: + return self._data.__len__() + + def all_equal(self, other: Any) -> bool: + return bool((self._data == other).all()) + + def fill(self, value: Any) -> None: + self._data.fill(value) + + def copy(self) -> Self: + return self.__class__(self._data.copy()) + + def transpose(self, *axes: np.SupportsIndex) -> Self: + return self.__class__(self._data.transpose(*axes)) + + +def as_numpy_array_wrapper(func: Callable[[np.ndarray], bytes], buf: Buffer) -> Buffer: + """Converts the input of `func` to a numpy array and the output back to `Buffer`. + + This function is useful when calling a `func` that only support host memory such + as `GZip.decode` and `Blosc.decode`. In this case, use this wrapper to convert + the input `buf` to a Numpy array and convert the result back into a `Buffer`. + + Parameters + ---------- + func + The callable that will be called with the converted `buf` as input. + `func` must return bytes, which will be converted into a `Buffer` + before returned. + buf + The buffer that will be converted to a Numpy array before given as + input to `func`. + + Return + ------ + The result of `func` converted to a `Buffer` + """ + return Buffer.from_bytes(func(buf.as_numpy_array())) diff --git a/src/zarr/codecs/blosc.py b/src/zarr/codecs/blosc.py index 5ee2b7640d..7e94575f9a 100644 --- a/src/zarr/codecs/blosc.py +++ b/src/zarr/codecs/blosc.py @@ -6,17 +6,17 @@ from typing import TYPE_CHECKING, Union import numcodecs -import numpy as np from numcodecs.blosc import Blosc from zarr.abc.codec import BytesBytesCodec +from zarr.buffer import Buffer, as_numpy_array_wrapper from zarr.codecs.registry import register_codec from zarr.common import parse_enum, parse_named_configuration, to_thread if TYPE_CHECKING: from typing import Dict, Optional from typing_extensions import Self - from zarr.common import JSON, ArraySpec, BytesLike + from zarr.common import JSON, ArraySpec class BloscShuffle(Enum): @@ -160,18 +160,22 @@ def _blosc_codec(self) -> Blosc: async def decode( self, - chunk_bytes: bytes, + chunk_bytes: Buffer, _chunk_spec: ArraySpec, - ) -> BytesLike: - return await to_thread(self._blosc_codec.decode, chunk_bytes) + ) -> Buffer: + return await to_thread(as_numpy_array_wrapper, self._blosc_codec.decode, chunk_bytes) async def encode( self, - chunk_bytes: bytes, + chunk_bytes: Buffer, chunk_spec: ArraySpec, - ) -> Optional[BytesLike]: - chunk_array = np.frombuffer(chunk_bytes, dtype=chunk_spec.dtype) - return await to_thread(self._blosc_codec.encode, chunk_array) + ) -> Optional[Buffer]: + # Since blosc only takes bytes, we convert the input and output of the encoding + # between bytes and Buffer + return await to_thread( + lambda chunk: Buffer.from_bytes(self._blosc_codec.encode(chunk.as_array_like())), + chunk_bytes, + ) def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: raise NotImplementedError diff --git a/src/zarr/codecs/bytes.py b/src/zarr/codecs/bytes.py index 566b3a8df9..d6a626e160 100644 --- a/src/zarr/codecs/bytes.py +++ b/src/zarr/codecs/bytes.py @@ -8,11 +8,12 @@ import numpy as np from zarr.abc.codec import ArrayBytesCodec +from zarr.buffer import Buffer, NDBuffer from zarr.codecs.registry import register_codec from zarr.common import parse_enum, parse_named_configuration if TYPE_CHECKING: - from zarr.common import JSON, ArraySpec, BytesLike + from zarr.common import JSON, ArraySpec from typing_extensions import Self @@ -59,19 +60,12 @@ def evolve(self, array_spec: ArraySpec) -> Self: ) return self - def _get_byteorder(self, array: np.ndarray) -> Endian: - if array.dtype.byteorder == "<": - return Endian.little - elif array.dtype.byteorder == ">": - return Endian.big - else: - return default_system_endian - async def decode( self, - chunk_bytes: BytesLike, + chunk_bytes: Buffer, chunk_spec: ArraySpec, - ) -> np.ndarray: + ) -> NDBuffer: + assert isinstance(chunk_bytes, Buffer) if chunk_spec.dtype.itemsize > 0: if self.endian == Endian.little: prefix = "<" @@ -80,7 +74,7 @@ async def decode( dtype = np.dtype(f"{prefix}{chunk_spec.dtype.str[1:]}") else: dtype = np.dtype(f"|{chunk_spec.dtype.str[1:]}") - chunk_array = np.frombuffer(chunk_bytes, dtype) + chunk_array = chunk_bytes.as_nd_buffer(dtype=dtype) # ensure correct chunk shape if chunk_array.shape != chunk_spec.shape: @@ -91,15 +85,15 @@ async def decode( async def encode( self, - chunk_array: np.ndarray, + chunk_array: NDBuffer, _chunk_spec: ArraySpec, - ) -> Optional[BytesLike]: + ) -> Optional[Buffer]: + assert isinstance(chunk_array, NDBuffer) if chunk_array.dtype.itemsize > 1: - byteorder = self._get_byteorder(chunk_array) - if self.endian is not None and self.endian != byteorder: + if self.endian is not None and self.endian != chunk_array.byteorder: new_dtype = chunk_array.dtype.newbyteorder(self.endian.name) chunk_array = chunk_array.astype(new_dtype) - return chunk_array.tobytes() + return chunk_array.as_buffer() def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: return input_byte_length diff --git a/src/zarr/codecs/crc32c_.py b/src/zarr/codecs/crc32c_.py index dd61b3425e..1daf512e43 100644 --- a/src/zarr/codecs/crc32c_.py +++ b/src/zarr/codecs/crc32c_.py @@ -8,13 +8,14 @@ from crc32c import crc32c from zarr.abc.codec import BytesBytesCodec +from zarr.buffer import Buffer from zarr.codecs.registry import register_codec from zarr.common import parse_named_configuration if TYPE_CHECKING: from typing import Dict, Optional from typing_extensions import Self - from zarr.common import JSON, BytesLike, ArraySpec + from zarr.common import JSON, ArraySpec @dataclass(frozen=True) @@ -31,11 +32,12 @@ def to_dict(self) -> Dict[str, JSON]: async def decode( self, - chunk_bytes: bytes, + chunk_bytes: Buffer, _chunk_spec: ArraySpec, - ) -> BytesLike: - crc32_bytes = chunk_bytes[-4:] - inner_bytes = chunk_bytes[:-4] + ) -> Buffer: + data = chunk_bytes.as_numpy_array() + crc32_bytes = data[-4:] + inner_bytes = data[:-4] computed_checksum = np.uint32(crc32c(inner_bytes)).tobytes() stored_checksum = bytes(crc32_bytes) @@ -44,14 +46,18 @@ async def decode( "Stored and computed checksum do not match. " + f"Stored: {stored_checksum!r}. Computed: {computed_checksum!r}." ) - return inner_bytes + return Buffer.from_array_like(inner_bytes) async def encode( self, - chunk_bytes: bytes, + chunk_bytes: Buffer, _chunk_spec: ArraySpec, - ) -> Optional[BytesLike]: - return chunk_bytes + np.uint32(crc32c(chunk_bytes)).tobytes() + ) -> Optional[Buffer]: + data = chunk_bytes.as_numpy_array() + # Calculate the checksum and "cast" it to a numpy array + checksum = np.array([crc32c(data)], dtype=np.uint32) + # Append the checksum (as bytes) to the data + return Buffer.from_array_like(np.append(data, checksum.view("b"))) def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: return input_byte_length + 4 diff --git a/src/zarr/codecs/gzip.py b/src/zarr/codecs/gzip.py index 71dcaa6bb5..a8d7f815aa 100644 --- a/src/zarr/codecs/gzip.py +++ b/src/zarr/codecs/gzip.py @@ -5,13 +5,14 @@ from numcodecs.gzip import GZip from zarr.abc.codec import BytesBytesCodec +from zarr.buffer import Buffer, as_numpy_array_wrapper from zarr.codecs.registry import register_codec from zarr.common import parse_named_configuration, to_thread if TYPE_CHECKING: from typing import Optional, Dict from typing_extensions import Self - from zarr.common import JSON, ArraySpec, BytesLike + from zarr.common import JSON, ArraySpec def parse_gzip_level(data: JSON) -> int: @@ -45,17 +46,17 @@ def to_dict(self) -> Dict[str, JSON]: async def decode( self, - chunk_bytes: bytes, + chunk_bytes: Buffer, _chunk_spec: ArraySpec, - ) -> BytesLike: - return await to_thread(GZip(self.level).decode, chunk_bytes) + ) -> Buffer: + return await to_thread(as_numpy_array_wrapper, GZip(self.level).decode, chunk_bytes) async def encode( self, - chunk_bytes: bytes, + chunk_bytes: Buffer, _chunk_spec: ArraySpec, - ) -> Optional[BytesLike]: - return await to_thread(GZip(self.level).encode, chunk_bytes) + ) -> Optional[Buffer]: + return await to_thread(as_numpy_array_wrapper, GZip(self.level).encode, chunk_bytes) def compute_encoded_size( self, diff --git a/src/zarr/codecs/pipeline.py b/src/zarr/codecs/pipeline.py index da131868c4..1602eb1ef8 100644 --- a/src/zarr/codecs/pipeline.py +++ b/src/zarr/codecs/pipeline.py @@ -1,7 +1,6 @@ from __future__ import annotations from typing import TYPE_CHECKING, Iterable -import numpy as np from dataclasses import dataclass from warnings import warn @@ -14,6 +13,7 @@ Codec, ) from zarr.abc.metadata import Metadata +from zarr.buffer import Buffer, NDBuffer from zarr.codecs.registry import get_codec_class from zarr.common import parse_named_configuration @@ -21,7 +21,7 @@ from typing import Iterator, List, Optional, Tuple, Union from zarr.store import StorePath from zarr.metadata import ArrayMetadata - from zarr.common import JSON, ArraySpec, BytesLike, SliceSelection + from zarr.common import JSON, ArraySpec, SliceSelection @dataclass(frozen=True) @@ -148,9 +148,9 @@ def _codecs_with_resolved_metadata( async def decode( self, - chunk_bytes: BytesLike, + chunk_bytes: Buffer, array_spec: ArraySpec, - ) -> np.ndarray: + ) -> NDBuffer: ( aa_codecs_with_spec, ab_codec_with_spec, @@ -173,16 +173,16 @@ async def decode_partial( store_path: StorePath, selection: SliceSelection, chunk_spec: ArraySpec, - ) -> Optional[np.ndarray]: + ) -> Optional[NDBuffer]: assert self.supports_partial_decode assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin) return await self.array_bytes_codec.decode_partial(store_path, selection, chunk_spec) async def encode( self, - chunk_array: np.ndarray, + chunk_array: NDBuffer, array_spec: ArraySpec, - ) -> Optional[BytesLike]: + ) -> Optional[Buffer]: ( aa_codecs_with_spec, ab_codec_with_spec, @@ -207,12 +207,13 @@ async def encode( return None chunk_bytes = chunk_bytes_maybe + assert isinstance(chunk_bytes, Buffer) return chunk_bytes async def encode_partial( self, store_path: StorePath, - chunk_array: np.ndarray, + chunk_array: NDBuffer, selection: SliceSelection, chunk_spec: ArraySpec, ) -> None: diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index e94074e63e..b63d1e499b 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -37,6 +37,7 @@ ArrayMetadata, parse_codecs, ) +from zarr.buffer import Buffer, NDBuffer if TYPE_CHECKING: from typing import Awaitable, Callable, Dict, Iterator, List, Optional, Set, Tuple @@ -46,7 +47,6 @@ from zarr.common import ( JSON, ChunkCoords, - BytesLike, SliceSelection, ) @@ -127,15 +127,15 @@ def create_empty(cls, chunks_per_shard: ChunkCoords) -> _ShardIndex: class _ShardProxy(Mapping): index: _ShardIndex - buf: BytesLike + buf: Buffer @classmethod async def from_bytes( - cls, buf: BytesLike, codec: ShardingCodec, chunks_per_shard: ChunkCoords + cls, buf: Buffer, codec: ShardingCodec, chunks_per_shard: ChunkCoords ) -> _ShardProxy: shard_index_size = codec._shard_index_size(chunks_per_shard) obj = cls() - obj.buf = memoryview(buf) + obj.buf = buf if codec.index_location == ShardingCodecIndexLocation.start: shard_index_bytes = obj.buf[:shard_index_size] else: @@ -148,11 +148,11 @@ async def from_bytes( def create_empty(cls, chunks_per_shard: ChunkCoords) -> _ShardProxy: index = _ShardIndex.create_empty(chunks_per_shard) obj = cls() - obj.buf = memoryview(b"") + obj.buf = Buffer.create_zero_length() obj.index = index return obj - def __getitem__(self, chunk_coords: ChunkCoords) -> Optional[BytesLike]: + def __getitem__(self, chunk_coords: ChunkCoords) -> Optional[Buffer]: chunk_byte_slice = self.index.get_chunk_slice(chunk_coords) if chunk_byte_slice: return self.buf[chunk_byte_slice[0] : chunk_byte_slice[1]] @@ -166,7 +166,7 @@ def __iter__(self) -> Iterator[ChunkCoords]: class _ShardBuilder(_ShardProxy): - buf: bytearray + buf: Buffer index: _ShardIndex @classmethod @@ -174,7 +174,7 @@ def merge_with_morton_order( cls, chunks_per_shard: ChunkCoords, tombstones: Set[ChunkCoords], - *shard_dicts: Mapping[ChunkCoords, BytesLike], + *shard_dicts: Mapping[ChunkCoords, Buffer], ) -> _ShardBuilder: obj = cls.create_empty(chunks_per_shard) for chunk_coords in morton_order_iter(chunks_per_shard): @@ -190,30 +190,28 @@ def merge_with_morton_order( @classmethod def create_empty(cls, chunks_per_shard: ChunkCoords) -> _ShardBuilder: obj = cls() - obj.buf = bytearray() + obj.buf = Buffer.create_zero_length() obj.index = _ShardIndex.create_empty(chunks_per_shard) return obj - def append(self, chunk_coords: ChunkCoords, value: BytesLike) -> None: + def append(self, chunk_coords: ChunkCoords, value: Buffer) -> None: chunk_start = len(self.buf) chunk_length = len(value) - self.buf.extend(value) + self.buf = self.buf + value self.index.set_chunk_slice(chunk_coords, slice(chunk_start, chunk_start + chunk_length)) async def finalize( self, index_location: ShardingCodecIndexLocation, - index_encoder: Callable[[_ShardIndex], Awaitable[BytesLike]], - ) -> BytesLike: + index_encoder: Callable[[_ShardIndex], Awaitable[Buffer]], + ) -> Buffer: index_bytes = await index_encoder(self.index) if index_location == ShardingCodecIndexLocation.start: self.index.offsets_and_lengths[..., 0] += len(index_bytes) index_bytes = await index_encoder(self.index) # encode again with corrected offsets - out_buf = bytearray(index_bytes) - out_buf.extend(self.buf) + out_buf = index_bytes + self.buf else: - out_buf = self.buf - out_buf.extend(index_bytes) + out_buf = self.buf + index_bytes return out_buf @@ -299,9 +297,9 @@ def validate(self, array_metadata: ArrayMetadata) -> None: async def decode( self, - shard_bytes: BytesLike, + shard_bytes: Buffer, shard_spec: ArraySpec, - ) -> np.ndarray: + ) -> NDBuffer: # print("decode") shard_shape = shard_spec.shape chunk_shape = self.chunk_shape @@ -314,10 +312,8 @@ async def decode( ) # setup output array - out = np.zeros( - shard_shape, - dtype=shard_spec.dtype, - order=shard_spec.order, + out = NDBuffer.create( + shape=shard_shape, dtype=shard_spec.dtype, order=shard_spec.order, fill_value=0 ) shard_dict = await _ShardProxy.from_bytes(shard_bytes, self, chunks_per_shard) @@ -349,7 +345,7 @@ async def decode_partial( store_path: StorePath, selection: SliceSelection, shard_spec: ArraySpec, - ) -> Optional[np.ndarray]: + ) -> Optional[NDBuffer]: shard_shape = shard_spec.shape chunk_shape = self.chunk_shape chunks_per_shard = self._get_chunks_per_shard(shard_spec) @@ -361,17 +357,15 @@ async def decode_partial( ) # setup output array - out = np.zeros( - indexer.shape, - dtype=shard_spec.dtype, - order=shard_spec.order, + out = NDBuffer.create( + shape=indexer.shape, dtype=shard_spec.dtype, order=shard_spec.order, fill_value=0 ) indexed_chunks = list(indexer) all_chunk_coords = set(chunk_coords for chunk_coords, _, _ in indexed_chunks) # reading bytes of all requested chunks - shard_dict: Mapping[ChunkCoords, BytesLike] = {} + shard_dict: Mapping[ChunkCoords, Buffer] = {} if self._is_total_shard(all_chunk_coords, chunks_per_shard): # read entire shard shard_dict_maybe = await self._load_full_shard_maybe(store_path, chunks_per_shard) @@ -407,17 +401,16 @@ async def decode_partial( self._read_chunk, config.get("async.concurrency"), ) - return out async def _read_chunk( self, - shard_dict: Mapping[ChunkCoords, Optional[BytesLike]], + shard_dict: Mapping[ChunkCoords, Optional[Buffer]], chunk_coords: ChunkCoords, chunk_selection: SliceSelection, out_selection: SliceSelection, shard_spec: ArraySpec, - out: np.ndarray, + out: NDBuffer, ) -> None: chunk_spec = self._get_chunk_spec(shard_spec) chunk_bytes = shard_dict.get(chunk_coords, None) @@ -430,9 +423,9 @@ async def _read_chunk( async def encode( self, - shard_array: np.ndarray, + shard_array: NDBuffer, shard_spec: ArraySpec, - ) -> Optional[BytesLike]: + ) -> Optional[Buffer]: shard_shape = shard_spec.shape chunk_shape = self.chunk_shape chunks_per_shard = self._get_chunks_per_shard(shard_spec) @@ -446,22 +439,23 @@ async def encode( ) async def _write_chunk( - shard_array: np.ndarray, + shard_array: NDBuffer, chunk_coords: ChunkCoords, chunk_selection: SliceSelection, out_selection: SliceSelection, - ) -> Tuple[ChunkCoords, Optional[BytesLike]]: + ) -> Tuple[ChunkCoords, Optional[Buffer]]: + assert isinstance(shard_array, NDBuffer) if is_total_slice(chunk_selection, chunk_shape): chunk_array = shard_array[out_selection] else: # handling writing partial chunks - chunk_array = np.empty( - chunk_shape, + chunk_array = NDBuffer.create( + shape=chunk_shape, dtype=shard_spec.dtype, ) chunk_array.fill(shard_spec.fill_value) chunk_array[chunk_selection] = shard_array[out_selection] - if not np.array_equiv(chunk_array, shard_spec.fill_value): + if not chunk_array.all_equal(shard_spec.fill_value): chunk_spec = self._get_chunk_spec(shard_spec) return ( chunk_coords, @@ -470,7 +464,7 @@ async def _write_chunk( return (chunk_coords, None) # assembling and encoding chunks within the shard - encoded_chunks: List[Tuple[ChunkCoords, Optional[BytesLike]]] = await concurrent_map( + encoded_chunks: List[Tuple[ChunkCoords, Optional[Buffer]]] = await concurrent_map( [ (shard_array, chunk_coords, chunk_selection, out_selection) for chunk_coords, chunk_selection, out_selection in indexer @@ -491,7 +485,7 @@ async def _write_chunk( async def encode_partial( self, store_path: StorePath, - shard_array: np.ndarray, + shard_array: NDBuffer, selection: SliceSelection, shard_spec: ArraySpec, ) -> None: @@ -519,8 +513,7 @@ async def _write_chunk( chunk_coords: ChunkCoords, chunk_selection: SliceSelection, out_selection: SliceSelection, - ) -> Tuple[ChunkCoords, Optional[BytesLike]]: - chunk_array = None + ) -> Tuple[ChunkCoords, Optional[Buffer]]: if is_total_slice(chunk_selection, self.chunk_shape): chunk_array = shard_array[out_selection] else: @@ -530,8 +523,8 @@ async def _write_chunk( # merge new value if chunk_bytes is None: - chunk_array = np.empty( - self.chunk_shape, + chunk_array = NDBuffer.create( + shape=self.chunk_shape, dtype=shard_spec.dtype, ) chunk_array.fill(shard_spec.fill_value) @@ -541,7 +534,7 @@ async def _write_chunk( ).copy() # make a writable copy chunk_array[chunk_selection] = shard_array[out_selection] - if not np.array_equiv(chunk_array, shard_spec.fill_value): + if not chunk_array.all_equal(shard_spec.fill_value): return ( chunk_coords, await self.codecs.encode(chunk_array, chunk_spec), @@ -549,7 +542,7 @@ async def _write_chunk( else: return (chunk_coords, None) - encoded_chunks: List[Tuple[ChunkCoords, Optional[BytesLike]]] = await concurrent_map( + encoded_chunks: List[Tuple[ChunkCoords, Optional[Buffer]]] = await concurrent_map( [ ( chunk_coords, @@ -593,21 +586,24 @@ def _is_total_shard( ) async def _decode_shard_index( - self, index_bytes: BytesLike, chunks_per_shard: ChunkCoords + self, index_bytes: Buffer, chunks_per_shard: ChunkCoords ) -> _ShardIndex: return _ShardIndex( - await self.index_codecs.decode( - index_bytes, - self._get_index_chunk_spec(chunks_per_shard), - ) + ( + await self.index_codecs.decode( + index_bytes, + self._get_index_chunk_spec(chunks_per_shard), + ) + ).as_numpy_array() ) - async def _encode_shard_index(self, index: _ShardIndex) -> BytesLike: + async def _encode_shard_index(self, index: _ShardIndex) -> Buffer: index_bytes = await self.index_codecs.encode( - index.offsets_and_lengths, + NDBuffer.from_numpy_array(index.offsets_and_lengths), self._get_index_chunk_spec(index.chunks_per_shard), ) assert index_bytes is not None + assert isinstance(index_bytes, Buffer) return index_bytes def _shard_index_size(self, chunks_per_shard: ChunkCoords) -> int: diff --git a/src/zarr/codecs/transpose.py b/src/zarr/codecs/transpose.py index a13708955c..70ae30f908 100644 --- a/src/zarr/codecs/transpose.py +++ b/src/zarr/codecs/transpose.py @@ -3,13 +3,13 @@ from dataclasses import dataclass, replace +from zarr.buffer import NDBuffer from zarr.common import JSON, ArraySpec, ChunkCoordsLike, parse_named_configuration if TYPE_CHECKING: from typing import TYPE_CHECKING, Optional, Tuple from typing_extensions import Self -import numpy as np from zarr.abc.codec import ArrayArrayCodec from zarr.codecs.registry import register_codec @@ -75,9 +75,9 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: async def decode( self, - chunk_array: np.ndarray, + chunk_array: NDBuffer, chunk_spec: ArraySpec, - ) -> np.ndarray: + ) -> NDBuffer: inverse_order = [0] * chunk_spec.ndim for x, i in enumerate(self.order): inverse_order[x] = i @@ -86,9 +86,9 @@ async def decode( async def encode( self, - chunk_array: np.ndarray, + chunk_array: NDBuffer, chunk_spec: ArraySpec, - ) -> Optional[np.ndarray]: + ) -> Optional[NDBuffer]: chunk_array = chunk_array.transpose(self.order) return chunk_array diff --git a/src/zarr/codecs/zstd.py b/src/zarr/codecs/zstd.py index ad10a7fdb8..0cc99a0368 100644 --- a/src/zarr/codecs/zstd.py +++ b/src/zarr/codecs/zstd.py @@ -6,13 +6,14 @@ from zstandard import ZstdCompressor, ZstdDecompressor from zarr.abc.codec import BytesBytesCodec +from zarr.buffer import Buffer, as_numpy_array_wrapper from zarr.codecs.registry import register_codec from zarr.common import parse_named_configuration, to_thread if TYPE_CHECKING: from typing import Dict, Optional from typing_extensions import Self - from zarr.common import BytesLike, JSON, ArraySpec + from zarr.common import JSON, ArraySpec def parse_zstd_level(data: JSON) -> int: @@ -61,17 +62,17 @@ def _decompress(self, data: bytes) -> bytes: async def decode( self, - chunk_bytes: bytes, + chunk_bytes: Buffer, _chunk_spec: ArraySpec, - ) -> BytesLike: - return await to_thread(self._decompress, chunk_bytes) + ) -> Buffer: + return await to_thread(as_numpy_array_wrapper, self._decompress, chunk_bytes) async def encode( self, - chunk_bytes: bytes, + chunk_bytes: Buffer, _chunk_spec: ArraySpec, - ) -> Optional[BytesLike]: - return await to_thread(self._compress, chunk_bytes) + ) -> Optional[Buffer]: + return await to_thread(as_numpy_array_wrapper, self._compress, chunk_bytes) def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: raise NotImplementedError diff --git a/src/zarr/group.py b/src/zarr/group.py index f8d57e3fba..d344b3db00 100644 --- a/src/zarr/group.py +++ b/src/zarr/group.py @@ -7,6 +7,8 @@ import logging import numpy.typing as npt +from zarr.buffer import Buffer + if TYPE_CHECKING: from typing import Any, AsyncGenerator, Literal, Iterable from zarr.abc.codec import Codec @@ -159,13 +161,13 @@ async def open( if zarr_format == 2: # V2 groups are comprised of a .zgroup and .zattrs objects assert zgroup_bytes is not None - zgroup = json.loads(zgroup_bytes) - zattrs = json.loads(zattrs_bytes) if zattrs_bytes is not None else {} + zgroup = json.loads(zgroup_bytes.to_bytes()) + zattrs = json.loads(zattrs_bytes.to_bytes()) if zattrs_bytes is not None else {} group_metadata = {**zgroup, "attributes": zattrs} else: # V3 groups are comprised of a zarr.json object assert zarr_json_bytes is not None - group_metadata = json.loads(zarr_json_bytes) + group_metadata = json.loads(zarr_json_bytes.to_bytes()) return cls.from_dict(store_path, group_metadata) @@ -199,7 +201,7 @@ async def getitem( if zarr_json_bytes is None: raise KeyError(key) else: - zarr_json = json.loads(zarr_json_bytes) + zarr_json = json.loads(zarr_json_bytes.to_bytes()) if zarr_json["node_type"] == "group": return type(self).from_dict(store_path, zarr_json) elif zarr_json["node_type"] == "array": @@ -219,9 +221,9 @@ async def getitem( raise KeyError(key) # unpack the zarray, if this is None then we must be opening a group - zarray = json.loads(zarray_bytes) if zarray_bytes else None + zarray = json.loads(zarray_bytes.to_bytes()) if zarray_bytes else None # unpack the zattrs, this can be None if no attrs were written - zattrs = json.loads(zattrs_bytes) if zattrs_bytes is not None else {} + zattrs = json.loads(zattrs_bytes.to_bytes()) if zattrs_bytes is not None else {} if zarray is not None: # TODO: update this once the V2 array support is part of the primary array class @@ -229,7 +231,7 @@ async def getitem( return AsyncArray.from_dict(store_path, zarray) else: zgroup = ( - json.loads(zgroup_bytes) + json.loads(zgroup_bytes.to_bytes()) if zgroup_bytes is not None else {"zarr_format": self.metadata.zarr_format} ) @@ -252,7 +254,9 @@ async def delitem(self, key: str) -> None: async def _save_metadata(self) -> None: to_save = self.metadata.to_bytes() - awaitables = [(self.store_path / key).set(value) for key, value in to_save.items()] + awaitables = [ + (self.store_path / key).set(Buffer.from_bytes(value)) for key, value in to_save.items() + ] await asyncio.gather(*awaitables) @property @@ -310,9 +314,9 @@ async def update_attributes(self, new_attributes: dict[str, Any]) -> "AsyncGroup to_save = self.metadata.to_bytes() if self.metadata.zarr_format == 2: # only save the .zattrs object - await (self.store_path / ZATTRS_JSON).set(to_save[ZATTRS_JSON]) + await (self.store_path / ZATTRS_JSON).set(Buffer.from_bytes(to_save[ZATTRS_JSON])) else: - await (self.store_path / ZARR_JSON).set(to_save[ZARR_JSON]) + await (self.store_path / ZARR_JSON).set(Buffer.from_bytes(to_save[ZARR_JSON])) self.metadata.attributes.clear() self.metadata.attributes.update(new_attributes) @@ -480,7 +484,9 @@ async def update_attributes_async(self, new_attributes: dict[str, Any]) -> Group # Write new metadata to_save = new_metadata.to_bytes() - awaitables = [(self.store_path / key).set(value) for key, value in to_save.items()] + awaitables = [ + (self.store_path / key).set(Buffer.from_bytes(value)) for key, value in to_save.items() + ] await asyncio.gather(*awaitables) async_group = replace(self._async_group, metadata=new_metadata) diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index 3903bacd42..098ab34b86 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -6,6 +6,7 @@ import numpy as np import numpy.typing as npt +from zarr.buffer import Buffer from zarr.chunk_grids import ChunkGrid, RegularChunkGrid from zarr.chunk_key_encodings import ChunkKeyEncoding, parse_separator @@ -289,7 +290,7 @@ def __init__( def ndim(self) -> int: return len(self.shape) - def to_bytes(self) -> bytes: + def to_bytes(self) -> Buffer: def _json_convert(o): if isinstance(o, np.dtype): if o.fields is None: @@ -298,7 +299,7 @@ def _json_convert(o): return o.descr raise TypeError - return json.dumps(self.to_dict(), default=_json_convert).encode() + return Buffer.from_bytes(json.dumps(self.to_dict(), default=_json_convert).encode()) @classmethod def from_dict(cls, data: Dict[str, Any]) -> ArrayV2Metadata: diff --git a/src/zarr/store/core.py b/src/zarr/store/core.py index cc017ec982..c6ffbc6c05 100644 --- a/src/zarr/store/core.py +++ b/src/zarr/store/core.py @@ -3,8 +3,8 @@ from pathlib import Path from typing import Any, Optional, Tuple, Union -from zarr.common import BytesLike from zarr.abc.store import Store +from zarr.buffer import Buffer from zarr.store.local import LocalStore @@ -25,12 +25,10 @@ def __init__(self, store: Store, path: Optional[str] = None): self.store = store self.path = path or "" - async def get( - self, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[BytesLike]: + async def get(self, byte_range: Optional[Tuple[int, Optional[int]]] = None) -> Optional[Buffer]: return await self.store.get(self.path, byte_range) - async def set(self, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None) -> None: + async def set(self, value: Buffer, byte_range: Optional[Tuple[int, int]] = None) -> None: if byte_range is not None: raise NotImplementedError("Store.set does not have partial writes yet") await self.store.set(self.path, value) diff --git a/src/zarr/store/local.py b/src/zarr/store/local.py index a3dd65979b..f27b832a39 100644 --- a/src/zarr/store/local.py +++ b/src/zarr/store/local.py @@ -6,10 +6,11 @@ from pathlib import Path from zarr.abc.store import Store -from zarr.common import BytesLike, concurrent_map, to_thread +from zarr.buffer import Buffer +from zarr.common import concurrent_map, to_thread -def _get(path: Path, byte_range: tuple[int, int | None] | None) -> bytes: +def _get(path: Path, byte_range: tuple[int, int | None] | None) -> Buffer: """ Fetch a contiguous region of bytes from a file. @@ -31,7 +32,7 @@ def _get(path: Path, byte_range: tuple[int, int | None] | None) -> bytes: end = (start + byte_range[1]) if byte_range[1] is not None else None else: - return path.read_bytes() + return Buffer.from_bytes(path.read_bytes()) with path.open("rb") as f: size = f.seek(0, io.SEEK_END) if start is not None: @@ -42,13 +43,13 @@ def _get(path: Path, byte_range: tuple[int, int | None] | None) -> bytes: if end is not None: if end < 0: end = size + end - return f.read(end - f.tell()) - return f.read() + return Buffer.from_bytes(f.read(end - f.tell())) + return Buffer.from_bytes(f.read()) def _put( path: Path, - value: BytesLike, + value: Buffer, start: int | None = None, auto_mkdir: bool = True, ) -> int | None: @@ -57,10 +58,10 @@ def _put( if start is not None: with path.open("r+b") as f: f.seek(start) - f.write(value) + f.write(value.as_numpy_array()) return None else: - return path.write_bytes(value) + return path.write_bytes(value.as_numpy_array()) class LocalStore(Store): @@ -88,7 +89,9 @@ def __repr__(self) -> str: def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and self.root == other.root - async def get(self, key: str, byte_range: tuple[int, int | None] | None = None) -> bytes | None: + async def get( + self, key: str, byte_range: tuple[int, int | None] | None = None + ) -> Buffer | None: assert isinstance(key, str) path = self.root / key @@ -99,7 +102,7 @@ async def get(self, key: str, byte_range: tuple[int, int | None] | None = None) async def get_partial_values( self, key_ranges: list[tuple[str, tuple[int, int]]] - ) -> list[bytes | None]: + ) -> list[Buffer | None]: """ Read byte ranges from multiple keys. Parameters @@ -117,8 +120,13 @@ async def get_partial_values( args.append((_get, path, byte_range)) return await concurrent_map(args, to_thread, limit=None) # TODO: fix limit - async def set(self, key: str, value: BytesLike) -> None: + async def set(self, key: str, value: Buffer) -> None: assert isinstance(key, str) + if isinstance(value, (bytes, bytearray)): + # TODO: to support the v2 tests, we convert bytes to Buffer here + value = Buffer.from_bytes(value) + if not isinstance(value, Buffer): + raise TypeError("LocalStore.set(): `value` must a Buffer instance") path = self.root / key await to_thread(_put, path, value, auto_mkdir=self.auto_mkdir) diff --git a/src/zarr/store/memory.py b/src/zarr/store/memory.py index 9730d635d5..c053f941ef 100644 --- a/src/zarr/store/memory.py +++ b/src/zarr/store/memory.py @@ -3,20 +3,21 @@ from collections.abc import AsyncGenerator from typing import Optional, MutableMapping, List, Tuple -from zarr.common import BytesLike, concurrent_map +from zarr.common import concurrent_map from zarr.abc.store import Store +from zarr.buffer import Buffer -# TODO: this store could easily be extended to wrap any MutuableMapping store from v2 +# TODO: this store could easily be extended to wrap any MutableMapping store from v2 # When that is done, the `MemoryStore` will just be a store that wraps a dict. class MemoryStore(Store): supports_writes: bool = True supports_partial_writes: bool = True supports_listing: bool = True - _store_dict: MutableMapping[str, bytes] + _store_dict: MutableMapping[str, Buffer] - def __init__(self, store_dict: Optional[MutableMapping[str, bytes]] = None): + def __init__(self, store_dict: Optional[MutableMapping[str, Buffer]] = None): self._store_dict = store_dict or {} def __str__(self) -> str: @@ -27,7 +28,7 @@ def __repr__(self) -> str: async def get( self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[BytesLike]: + ) -> Optional[Buffer]: assert isinstance(key, str) try: value = self._store_dict[key] @@ -39,7 +40,7 @@ async def get( async def get_partial_values( self, key_ranges: List[Tuple[str, Tuple[int, int]]] - ) -> List[Optional[BytesLike]]: + ) -> List[Optional[Buffer]]: vals = await concurrent_map(key_ranges, self.get, limit=None) return vals @@ -47,14 +48,17 @@ async def exists(self, key: str) -> bool: return key in self._store_dict async def set( - self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None + self, key: str, value: Buffer, byte_range: Optional[Tuple[int, int]] = None ) -> None: assert isinstance(key, str) - if not isinstance(value, (bytes, bytearray, memoryview)): - raise TypeError(f"Expected BytesLike. Got {type(value)}.") + if isinstance(value, (bytes, bytearray)): + # TODO: to support the v2 tests, we convert bytes to Buffer here + value = Buffer.from_bytes(value) + if not isinstance(value, Buffer): + raise TypeError(f"Expected Buffer. Got {type(value)}.") if byte_range is not None: - buf = bytearray(self._store_dict[key]) + buf = self._store_dict[key] buf[byte_range[0] : byte_range[1]] = value self._store_dict[key] = buf else: diff --git a/src/zarr/store/remote.py b/src/zarr/store/remote.py index fa6cd2167e..35fd2d60b6 100644 --- a/src/zarr/store/remote.py +++ b/src/zarr/store/remote.py @@ -3,8 +3,8 @@ from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union from zarr.abc.store import Store +from zarr.buffer import Buffer from zarr.store.core import _dereference_path -from zarr.common import BytesLike if TYPE_CHECKING: @@ -52,7 +52,7 @@ def _make_fs(self) -> Tuple[AsyncFileSystem, str]: async def get( self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None - ) -> Optional[BytesLike]: + ) -> Optional[Buffer]: assert isinstance(key, str) fs, root = self._make_fs() path = _dereference_path(root, key) @@ -69,7 +69,7 @@ async def get( return value async def set( - self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None + self, key: str, value: Buffer, byte_range: Optional[Tuple[int, int]] = None ) -> None: assert isinstance(key, str) fs, root = self._make_fs() diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index 601ef7f393..99f8021594 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -1,6 +1,7 @@ import pytest from zarr.abc.store import Store +from zarr.buffer import Buffer class StoreTests: @@ -25,14 +26,14 @@ def test_store_capabilities(self, store: Store) -> None: @pytest.mark.parametrize("key", ["c/0", "foo/c/0.0", "foo/0/0"]) @pytest.mark.parametrize("data", [b"\x01\x02\x03\x04", b""]) async def test_set_get_bytes_roundtrip(self, store: Store, key: str, data: bytes) -> None: - await store.set(key, data) + await store.set(key, Buffer.from_bytes(data)) assert await store.get(key) == data @pytest.mark.parametrize("key", ["foo/c/0"]) @pytest.mark.parametrize("data", [b"\x01\x02\x03\x04", b""]) async def test_get_partial_values(self, store: Store, key: str, data: bytes) -> None: # put all of the data - await store.set(key, data) + await store.set(key, Buffer.from_bytes(data)) # read back just part of it vals = await store.get_partial_values([(key, (0, 2))]) assert vals == [data[0:2]] @@ -43,18 +44,18 @@ async def test_get_partial_values(self, store: Store, key: str, data: bytes) -> async def test_exists(self, store: Store) -> None: assert not await store.exists("foo") - await store.set("foo/zarr.json", b"bar") + await store.set("foo/zarr.json", Buffer.from_bytes(b"bar")) assert await store.exists("foo/zarr.json") async def test_delete(self, store: Store) -> None: - await store.set("foo/zarr.json", b"bar") + await store.set("foo/zarr.json", Buffer.from_bytes(b"bar")) assert await store.exists("foo/zarr.json") await store.delete("foo/zarr.json") assert not await store.exists("foo/zarr.json") async def test_list(self, store: Store) -> None: assert [k async for k in store.list()] == [] - await store.set("foo/zarr.json", b"bar") + await store.set("foo/zarr.json", Buffer.from_bytes(b"bar")) keys = [k async for k in store.list()] assert keys == ["foo/zarr.json"], keys @@ -62,7 +63,9 @@ async def test_list(self, store: Store) -> None: for i in range(10): key = f"foo/c/{i}" expected.append(key) - await store.set(f"foo/c/{i}", i.to_bytes(length=3, byteorder="little")) + await store.set( + f"foo/c/{i}", Buffer.from_bytes(i.to_bytes(length=3, byteorder="little")) + ) async def test_list_prefix(self, store: Store) -> None: # TODO: we currently don't use list_prefix anywhere @@ -71,11 +74,11 @@ async def test_list_prefix(self, store: Store) -> None: async def test_list_dir(self, store: Store) -> None: assert [k async for k in store.list_dir("")] == [] assert [k async for k in store.list_dir("foo")] == [] - await store.set("foo/zarr.json", b"bar") - await store.set("foo/c/1", b"\x01") + await store.set("foo/zarr.json", Buffer.from_bytes(b"bar")) + await store.set("foo/c/1", Buffer.from_bytes(b"\x01")) keys = [k async for k in store.list_dir("foo")] - assert keys == ["zarr.json", "c"], keys + assert set(keys) == set(["zarr.json", "c"]), keys keys = [k async for k in store.list_dir("foo/")] - assert keys == ["zarr.json", "c"], keys + assert set(keys) == set(["zarr.json", "c"]), keys diff --git a/tests/v3/test_buffer.py b/tests/v3/test_buffer.py new file mode 100644 index 0000000000..a56c768782 --- /dev/null +++ b/tests/v3/test_buffer.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Iterable, Literal, Optional + +import numpy as np +import numpy.typing as npt +import pytest + +from zarr.array import AsyncArray +from zarr.buffer import NDBuffer +from zarr.store.core import StorePath +from zarr.store.memory import MemoryStore + +if TYPE_CHECKING: + from typing_extensions import Self + + +class MyNDArrayLike(np.ndarray): + """An example of a ndarray-like class""" + + pass + + +class MyNDBuffer(NDBuffer): + """Example of a custom NDBuffer that handles MyNDArrayLike""" + + @classmethod + def create( + cls, + *, + shape: Iterable[int], + dtype: npt.DTypeLike, + order: Literal["C", "F"] = "C", + fill_value: Optional[Any] = None, + ) -> Self: + """Overwrite `NDBuffer.create` to create an MyNDArrayLike instance""" + ret = cls(MyNDArrayLike(shape=shape, dtype=dtype, order=order)) + if fill_value is not None: + ret.fill(fill_value) + return ret + + +@pytest.mark.asyncio +async def test_async_array_factory(): + store = StorePath(MemoryStore()) + expect = np.zeros((9, 9), dtype="uint16", order="F") + a = await AsyncArray.create( + store / "test_async_array", + shape=expect.shape, + chunk_shape=(5, 5), + dtype=expect.dtype, + fill_value=0, + ) + expect[1:4, 3:6] = np.ones((3, 3)) + + await a.setitem( + selection=(slice(1, 4), slice(3, 6)), + value=np.ones((3, 3)), + factory=MyNDBuffer.from_ndarray_like, + ) + got = await a.getitem(selection=(slice(0, 9), slice(0, 9)), factory=MyNDBuffer.create) + assert isinstance(got, MyNDArrayLike) + assert np.array_equal(expect, got) diff --git a/tests/v3/test_codecs.py b/tests/v3/test_codecs.py index fc209bd5e6..665e3124c0 100644 --- a/tests/v3/test_codecs.py +++ b/tests/v3/test_codecs.py @@ -294,7 +294,7 @@ async def test_order( fill_value=1, ) z[:, :] = data - assert await (store / "order/0.0").get() == z._store["0.0"] + assert (await (store / "order/0.0").get()) == z._store["0.0"] @pytest.mark.parametrize("input_order", ["F", "C"]) @@ -730,9 +730,9 @@ async def test_dimension_names(store: Store): ) assert (await AsyncArray.open(store / "dimension_names2")).metadata.dimension_names is None - zarr_json_bytes = await (store / "dimension_names2" / "zarr.json").get() - assert zarr_json_bytes is not None - assert "dimension_names" not in json.loads(zarr_json_bytes) + zarr_json_buffer = await (store / "dimension_names2" / "zarr.json").get() + assert zarr_json_buffer is not None + assert "dimension_names" not in json.loads(zarr_json_buffer.to_bytes()) def test_gzip(store: Store): @@ -954,7 +954,7 @@ async def test_blosc_evolve(store: Store): codecs=[BytesCodec(), BloscCodec()], ) - zarr_json = json.loads(await (store / "blosc_evolve_u1" / "zarr.json").get()) + zarr_json = json.loads((await (store / "blosc_evolve_u1" / "zarr.json").get()).to_bytes()) blosc_configuration_json = zarr_json["codecs"][1]["configuration"] assert blosc_configuration_json["typesize"] == 1 assert blosc_configuration_json["shuffle"] == "bitshuffle" @@ -968,7 +968,7 @@ async def test_blosc_evolve(store: Store): codecs=[BytesCodec(), BloscCodec()], ) - zarr_json = json.loads(await (store / "blosc_evolve_u2" / "zarr.json").get()) + zarr_json = json.loads((await (store / "blosc_evolve_u2" / "zarr.json").get()).to_bytes()) blosc_configuration_json = zarr_json["codecs"][1]["configuration"] assert blosc_configuration_json["typesize"] == 2 assert blosc_configuration_json["shuffle"] == "shuffle" @@ -982,7 +982,7 @@ async def test_blosc_evolve(store: Store): codecs=[ShardingCodec(chunk_shape=(16, 16), codecs=[BytesCodec(), BloscCodec()])], ) - zarr_json = json.loads(await (store / "sharding_blosc_evolve" / "zarr.json").get()) + zarr_json = json.loads((await (store / "sharding_blosc_evolve" / "zarr.json").get()).to_bytes()) blosc_configuration_json = zarr_json["codecs"][0]["configuration"]["codecs"][1]["configuration"] assert blosc_configuration_json["typesize"] == 2 assert blosc_configuration_json["shuffle"] == "shuffle" diff --git a/tests/v3/test_group.py b/tests/v3/test_group.py index 710eb3e527..16e4ceeecf 100644 --- a/tests/v3/test_group.py +++ b/tests/v3/test_group.py @@ -1,6 +1,8 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any +from zarr.buffer import Buffer +from zarr.sync import sync from zarr.array import AsyncArray from zarr.store.core import make_store_path @@ -13,7 +15,6 @@ from zarr.group import AsyncGroup, Group, GroupMetadata from zarr.store import StorePath -from zarr.sync import sync # todo: put RemoteStore in here @@ -43,7 +44,7 @@ def test_group_children(store: MemoryStore | LocalStore) -> None: # add an extra object to the domain of the group. # the list of children should ignore this object. - sync(store.set(f"{path}/extra_object-1", b"000000")) + sync(store.set(f"{path}/extra_object-1", Buffer.from_bytes(b"000000"))) # add an extra object under a directory-like prefix in the domain of the group. # this creates a directory with a random key in it # this should not show up as a member