From e78df0665220e5d947bcfe897efb7331df708cfa Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Thu, 30 May 2024 18:44:25 +0200 Subject: [PATCH 1/9] initial work toward pushing codecpipeline higher in the stacK --- src/zarr/abc/codec.py | 2 +- src/zarr/codecs/pipeline.py | 105 ++++++++++++++++++++---------------- src/zarr/metadata.py | 28 ++++++++-- 3 files changed, 85 insertions(+), 50 deletions(-) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index 028d1757ce..ca30fa725a 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -229,7 +229,7 @@ async def encode_partial( ) -class CodecPipeline(Metadata): +class CodecPipeline: """Base class for implementing CodecPipeline. A CodecPipeline implements the read and write paths for chunk data. On the read path, it is responsible for fetching chunks from a store (via ByteGetter), diff --git a/src/zarr/codecs/pipeline.py b/src/zarr/codecs/pipeline.py index 893cbc8b4b..6a80e7051f 100644 --- a/src/zarr/codecs/pipeline.py +++ b/src/zarr/codecs/pipeline.py @@ -2,7 +2,7 @@ from collections.abc import Iterable, Iterator from dataclasses import dataclass -from itertools import islice +from itertools import islice, pairwise from typing import TYPE_CHECKING, TypeVar from warnings import warn @@ -90,52 +90,9 @@ def to_dict(self) -> JSON: def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: return type(self).from_list([c.evolve_from_array_spec(array_spec) for c in self]) - @staticmethod - def codecs_from_list( - codecs: list[Codec], - ) -> tuple[tuple[ArrayArrayCodec, ...], ArrayBytesCodec, tuple[BytesBytesCodec, ...]]: - from zarr.codecs.sharding import ShardingCodec - - if not any(isinstance(codec, ArrayBytesCodec) for codec in codecs): - raise ValueError("Exactly one array-to-bytes codec is required.") - - prev_codec: Codec | None = None - for codec in codecs: - if prev_codec is not None: - if isinstance(codec, ArrayBytesCodec) and isinstance(prev_codec, ArrayBytesCodec): - raise ValueError( - f"ArrayBytesCodec '{type(codec)}' cannot follow after ArrayBytesCodec '{type(prev_codec)}' because exactly 1 ArrayBytesCodec is allowed." - ) - if isinstance(codec, ArrayBytesCodec) and isinstance(prev_codec, BytesBytesCodec): - raise ValueError( - f"ArrayBytesCodec '{type(codec)}' cannot follow after BytesBytesCodec '{type(prev_codec)}'." - ) - if isinstance(codec, ArrayArrayCodec) and isinstance(prev_codec, ArrayBytesCodec): - raise ValueError( - f"ArrayArrayCodec '{type(codec)}' cannot follow after ArrayBytesCodec '{type(prev_codec)}'." - ) - if isinstance(codec, ArrayArrayCodec) and isinstance(prev_codec, BytesBytesCodec): - raise ValueError( - f"ArrayArrayCodec '{type(codec)}' cannot follow after BytesBytesCodec '{type(prev_codec)}'." - ) - prev_codec = codec - - if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(codecs) > 1: - warn( - "Combining a `sharding_indexed` codec disables partial reads and " - "writes, which may lead to inefficient performance.", - stacklevel=3, - ) - - return ( - tuple(codec for codec in codecs if isinstance(codec, ArrayArrayCodec)), - next(codec for codec in codecs if isinstance(codec, ArrayBytesCodec)), - tuple(codec for codec in codecs if isinstance(codec, BytesBytesCodec)), - ) - @classmethod def from_list(cls, codecs: list[Codec], *, batch_size: int | None = None) -> Self: - array_array_codecs, array_bytes_codec, bytes_bytes_codecs = cls.codecs_from_list(codecs) + array_array_codecs, array_bytes_codec, bytes_bytes_codecs = codecs_from_list(codecs) return cls( array_array_codecs=array_array_codecs, @@ -476,3 +433,61 @@ async def write( self.write_batch, config.get("async.concurrency"), ) + + +def codecs_from_list( + codecs: list[Codec], +) -> tuple[tuple[ArrayArrayCodec, ...], ArrayBytesCodec, tuple[BytesBytesCodec, ...]]: + from zarr.codecs.sharding import ShardingCodec + + array_array: tuple[ArrayArrayCodec] = () + array_bytes: ArrayBytesCodec = None + bytes_bytes: tuple[BytesBytesCodec] = () + + if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(codecs) > 1: + warn( + "Combining a `sharding_indexed` codec disables partial reads and " + "writes, which may lead to inefficient performance.", + stacklevel=3, + ) + + for prev_codec, cur_codec in pairwise((None, *codecs)): + if isinstance(cur_codec, ArrayArrayCodec): + if isinstance(prev_codec, (ArrayBytesCodec, BytesBytesCodec)): + msg = ( + f"Invalid codec order. ArrayArrayCodec {cur_codec}" + "must be preceded by another ArrayArrayCodec. " + f"Got {type(prev_codec)} instead." + ) + raise ValueError(msg) + array_array += (cur_codec,) + + elif isinstance(cur_codec, ArrayBytesCodec): + if isinstance(prev_codec, BytesBytesCodec): + msg = ( + f"Invalid codec order. ArrayBytes codec {cur_codec}" + f" must be preceded by an ArrayArrayCodec. Got {type(prev_codec)} instead." + ) + raise ValueError(msg) + if array_bytes is not None: + msg = ( + f"Got two instances of ArrayBytesCodec: {array_bytes} and {cur_codec}. " + "Only one array-to-bytes codec is allowed." + ) + raise ValueError(msg) + array_bytes = cur_codec + + elif isinstance(cur_codec, BytesBytesCodec): + if isinstance(prev_codec, ArrayArrayCodec): + msg = ( + f"Invalid codec order. BytesBytesCodec {cur_codec}" + "must be preceded by either another BytesBytesCodec, or an ArrayBytesCodec. " + f"Got {type(prev_codec)} instead." + ) + bytes_bytes += (cur_codec,) + else: + assert False + + if array_bytes is None: + raise ValueError("Required ArrayBytesCodec was not found.") + return array_array, array_bytes, bytes_bytes diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index 58cc276c29..77cc23888e 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -10,12 +10,13 @@ import numpy as np import numpy.typing as npt -from zarr.abc.codec import Codec, CodecPipeline +from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec, Codec, CodecPipeline from zarr.abc.metadata import Metadata from zarr.buffer import Buffer from zarr.chunk_grids import ChunkGrid, RegularChunkGrid from zarr.chunk_key_encodings import ChunkKeyEncoding, parse_separator from zarr.codecs._v2 import V2Compressor, V2Filters +from zarr.codecs.registry import get_codec_class if TYPE_CHECKING: from typing import Literal @@ -33,6 +34,7 @@ ChunkCoords, parse_dtype, parse_fill_value, + parse_named_configuration, parse_shapelike, ) from zarr.config import parse_indexing_order @@ -160,7 +162,7 @@ class ArrayV3Metadata(ArrayMetadata): chunk_grid: ChunkGrid chunk_key_encoding: ChunkKeyEncoding fill_value: Any - codecs: CodecPipeline + codecs: tuple[Codec, ...] attributes: dict[str, Any] = field(default_factory=dict) dimension_names: tuple[str, ...] | None = None zarr_format: Literal[3] = field(default=3, init=False) @@ -195,7 +197,7 @@ def __init__( fill_value=fill_value_parsed, order="C", # TODO: order is not needed here. ) - codecs_parsed = parse_codecs(codecs).evolve_from_array_spec(array_spec) + codecs_parsed = parse_codecs(codecs) object.__setattr__(self, "shape", shape_parsed) object.__setattr__(self, "data_type", data_type_parsed) @@ -485,9 +487,27 @@ def parse_v2_metadata(data: ArrayV2Metadata) -> ArrayV2Metadata: return data -def parse_codecs(data: Iterable[Codec | JSON]) -> CodecPipeline: +def create_pipeline(data: Iterable[Codec | JSON]) -> CodecPipeline: from zarr.codecs import BatchedCodecPipeline if not isinstance(data, Iterable): raise TypeError(f"Expected iterable, got {type(data)}") return BatchedCodecPipeline.from_dict(data) + + +def parse_codecs(data: Iterable[Codec | JSON]) -> tuple[Codec, ...]: + out: tuple[Codec] = () + + if not isinstance(data, Iterable): + raise TypeError(f"Expected iterable, got {type(data)}") + + for c in data: + if isinstance( + c, ArrayArrayCodec | ArrayBytesCodec | BytesBytesCodec + ): # Can't use Codec here because of mypy limitation + out += (c,) + else: + name_parsed, _ = parse_named_configuration(c, require_configuration=False) + out += (get_codec_class(name_parsed).from_dict(c),) + + return out From 740af0d15bf5268f07cf85468c79d54859db264d Mon Sep 17 00:00:00 2001 From: Davis Bennett Date: Thu, 30 May 2024 21:58:40 +0200 Subject: [PATCH 2/9] remove codecpipeline from metadata, add it to AsyncArray and or create it dynamically --- src/zarr/abc/codec.py | 29 ++++++++++++----- src/zarr/array.py | 27 ++++++++++++---- src/zarr/codecs/blosc.py | 8 ++--- src/zarr/codecs/pipeline.py | 41 ++++++++++++----------- src/zarr/codecs/sharding.py | 63 ++++++++++++++---------------------- src/zarr/codecs/transpose.py | 19 ++++++++++- src/zarr/metadata.py | 38 +++++++--------------- tests/v3/test_codecs.py | 35 ++++++++++++++------ 8 files changed, 149 insertions(+), 111 deletions(-) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index ca30fa725a..2a7f4dfa90 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -2,19 +2,21 @@ from abc import abstractmethod from collections.abc import Awaitable, Callable, Iterable -from typing import TYPE_CHECKING, Generic, TypeVar +from typing import TYPE_CHECKING, Any, Generic, TypeVar + +import numpy as np from zarr.abc.metadata import Metadata from zarr.abc.store import ByteGetter, ByteSetter from zarr.buffer import Buffer, NDBuffer -from zarr.common import concurrent_map +from zarr.chunk_grids import ChunkGrid +from zarr.common import ChunkCoords, concurrent_map from zarr.config import config if TYPE_CHECKING: from typing_extensions import Self from zarr.common import ArraySpec, SliceSelection - from zarr.metadata import ArrayMetadata CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer) @@ -75,13 +77,19 @@ def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: """ return self - def validate(self, array_metadata: ArrayMetadata) -> None: - """Validates that the codec configuration is compatible with the array metadata. + def validate(self, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None: + """ + Validates that the codec configuration is compatible with the array metadata. Raises errors when the codec configuration is not compatible. Parameters ---------- - array_metadata : ArrayMetadata + shape: tuple[int, ...] + The shape of the array. + dtype: np.dtype + The datatype of the array. + chunk_grid: ChunkGrid + The chunk grid of the array. """ ... @@ -275,13 +283,18 @@ def supports_partial_decode(self) -> bool: ... def supports_partial_encode(self) -> bool: ... @abstractmethod - def validate(self, array_metadata: ArrayMetadata) -> None: + def validate(self, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None: """Validates that all codec configurations are compatible with the array metadata. Raises errors when a codec configuration is not compatible. Parameters ---------- - array_metadata : ArrayMetadata + shape: tuple[int, ...] + The shape of the array. + dtype: np.dtype + The datatype of the array. + chunk_grid: ChunkGrid + The chunk grid of the array. """ ... diff --git a/src/zarr/array.py b/src/zarr/array.py index 7da39c285e..3c85cef211 100644 --- a/src/zarr/array.py +++ b/src/zarr/array.py @@ -11,19 +11,21 @@ # 1. Was splitting the array into two classes really necessary? from asyncio import gather from collections.abc import Iterable -from dataclasses import dataclass, replace +from dataclasses import dataclass, field, replace from typing import Any, Literal import numpy as np import numpy.typing as npt -from zarr.abc.codec import Codec +from zarr.abc.codec import Codec, CodecPipeline from zarr.abc.store import set_or_delete from zarr.attributes import Attributes from zarr.buffer import Factory, NDArrayLike, NDBuffer from zarr.chunk_grids import RegularChunkGrid from zarr.chunk_key_encodings import ChunkKeyEncoding, DefaultChunkKeyEncoding, V2ChunkKeyEncoding from zarr.codecs import BytesCodec +from zarr.codecs._v2 import V2Compressor, V2Filters +from zarr.codecs.pipeline import BatchedCodecPipeline from zarr.common import ( JSON, ZARR_JSON, @@ -41,8 +43,8 @@ from zarr.sync import sync -def parse_array_metadata(data: Any) -> ArrayMetadata: - if isinstance(data, ArrayMetadata): +def parse_array_metadata(data: Any) -> ArrayV2Metadata | ArrayV3Metadata: + if isinstance(data, ArrayV2Metadata | ArrayV3Metadata): return data elif isinstance(data, dict): if data["zarr_format"] == 3: @@ -52,10 +54,22 @@ def parse_array_metadata(data: Any) -> ArrayMetadata: raise TypeError +def create_codec_pipeline(metadata: ArrayV2Metadata | ArrayV3Metadata) -> BatchedCodecPipeline: + if isinstance(metadata, ArrayV3Metadata): + return BatchedCodecPipeline.from_list(metadata.codecs) + elif isinstance(metadata, ArrayV2Metadata): + return BatchedCodecPipeline.from_list( + [V2Filters(metadata.filters or []), V2Compressor(metadata.compressor)] + ) + else: + raise AssertionError + + @dataclass(frozen=True) class AsyncArray: metadata: ArrayMetadata store_path: StorePath + codec_pipeline: CodecPipeline = field(init=False) order: Literal["C", "F"] def __init__( @@ -70,6 +84,7 @@ def __init__( object.__setattr__(self, "metadata", metadata_parsed) object.__setattr__(self, "store_path", store_path) object.__setattr__(self, "order", order_parsed) + object.__setattr__(self, "codec_pipeline", create_codec_pipeline(metadata=metadata_parsed)) @classmethod async def create( @@ -373,7 +388,7 @@ async def getitem( ) # reading chunks and decoding them - await self.metadata.codec_pipeline.read( + await self.codec_pipeline.read( [ ( self.store_path / self.metadata.encode_chunk_key(chunk_coords), @@ -422,7 +437,7 @@ async def setitem( value_buffer = factory(value) # merging with existing data and encoding chunks - await self.metadata.codec_pipeline.write( + await self.codec_pipeline.write( [ ( self.store_path / self.metadata.encode_chunk_key(chunk_coords), diff --git a/src/zarr/codecs/blosc.py b/src/zarr/codecs/blosc.py index e8921b8beb..8f39a20b3b 100644 --- a/src/zarr/codecs/blosc.py +++ b/src/zarr/codecs/blosc.py @@ -86,16 +86,16 @@ class BloscCodec(BytesBytesCodec): def __init__( self, *, - typesize: int | None = None, + typesize: int, cname: BloscCname | str = BloscCname.zstd, clevel: int = 5, - shuffle: BloscShuffle | str | None = None, + shuffle: BloscShuffle | str = "noshuffle", blocksize: int = 0, ) -> None: - typesize_parsed = parse_typesize(typesize) if typesize is not None else None + typesize_parsed = parse_typesize(typesize) cname_parsed = parse_enum(cname, BloscCname) clevel_parsed = parse_clevel(clevel) - shuffle_parsed = parse_enum(shuffle, BloscShuffle) if shuffle is not None else None + shuffle_parsed = parse_enum(shuffle, BloscShuffle) blocksize_parsed = parse_blocksize(blocksize) object.__setattr__(self, "typesize", typesize_parsed) diff --git a/src/zarr/codecs/pipeline.py b/src/zarr/codecs/pipeline.py index 6a80e7051f..2d57317581 100644 --- a/src/zarr/codecs/pipeline.py +++ b/src/zarr/codecs/pipeline.py @@ -3,9 +3,11 @@ from collections.abc import Iterable, Iterator from dataclasses import dataclass from itertools import islice, pairwise -from typing import TYPE_CHECKING, TypeVar +from typing import TYPE_CHECKING, Any, TypeVar from warnings import warn +import numpy as np + from zarr.abc.codec import ( ArrayArrayCodec, ArrayBytesCodec, @@ -18,11 +20,11 @@ CodecPipeline, ) from zarr.buffer import Buffer, NDBuffer +from zarr.chunk_grids import ChunkGrid from zarr.codecs.registry import get_codec_class from zarr.common import JSON, concurrent_map, parse_named_configuration from zarr.config import config from zarr.indexing import is_total_slice -from zarr.metadata import ArrayMetadata if TYPE_CHECKING: from typing_extensions import Self @@ -91,7 +93,7 @@ def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: return type(self).from_list([c.evolve_from_array_spec(array_spec) for c in self]) @classmethod - def from_list(cls, codecs: list[Codec], *, batch_size: int | None = None) -> Self: + def from_list(cls, codecs: Iterable[Codec], *, batch_size: int | None = None) -> Self: array_array_codecs, array_bytes_codec, bytes_bytes_codecs = codecs_from_list(codecs) return cls( @@ -138,9 +140,9 @@ def __iter__(self) -> Iterator[Codec]: yield self.array_bytes_codec yield from self.bytes_bytes_codecs - def validate(self, array_metadata: ArrayMetadata) -> None: + def validate(self, shape: tuple[int, ...], dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None: for codec in self: - codec.validate(array_metadata) + codec.validate(shape=shape, dtype=dtype, chunk_grid=chunk_grid) def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: for codec in self: @@ -436,15 +438,15 @@ async def write( def codecs_from_list( - codecs: list[Codec], + codecs: Iterable[Codec], ) -> tuple[tuple[ArrayArrayCodec, ...], ArrayBytesCodec, tuple[BytesBytesCodec, ...]]: from zarr.codecs.sharding import ShardingCodec - array_array: tuple[ArrayArrayCodec] = () - array_bytes: ArrayBytesCodec = None - bytes_bytes: tuple[BytesBytesCodec] = () + array_array: tuple[ArrayArrayCodec, ...] = () + array_bytes_maybe: ArrayBytesCodec | None = None + bytes_bytes: tuple[BytesBytesCodec, ...] = () - if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(codecs) > 1: + if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(tuple(codecs)) > 1: warn( "Combining a `sharding_indexed` codec disables partial reads and " "writes, which may lead to inefficient performance.", @@ -453,7 +455,7 @@ def codecs_from_list( for prev_codec, cur_codec in pairwise((None, *codecs)): if isinstance(cur_codec, ArrayArrayCodec): - if isinstance(prev_codec, (ArrayBytesCodec, BytesBytesCodec)): + if isinstance(prev_codec, ArrayBytesCodec | BytesBytesCodec): msg = ( f"Invalid codec order. ArrayArrayCodec {cur_codec}" "must be preceded by another ArrayArrayCodec. " @@ -469,13 +471,15 @@ def codecs_from_list( f" must be preceded by an ArrayArrayCodec. Got {type(prev_codec)} instead." ) raise ValueError(msg) - if array_bytes is not None: + + if array_bytes_maybe is not None: msg = ( - f"Got two instances of ArrayBytesCodec: {array_bytes} and {cur_codec}. " + f"Got two instances of ArrayBytesCodec: {array_bytes_maybe} and {cur_codec}. " "Only one array-to-bytes codec is allowed." ) raise ValueError(msg) - array_bytes = cur_codec + + array_bytes_maybe = cur_codec elif isinstance(cur_codec, BytesBytesCodec): if isinstance(prev_codec, ArrayArrayCodec): @@ -486,8 +490,9 @@ def codecs_from_list( ) bytes_bytes += (cur_codec,) else: - assert False + raise AssertionError - if array_bytes is None: - raise ValueError("Required ArrayBytesCodec was not found.") - return array_array, array_bytes, bytes_bytes + if array_bytes_maybe is None: + raise ValueError("Required ArrayBytesCodec was not found.") + else: + return array_array, array_bytes_maybe, bytes_bytes diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index a68577be68..3a5b2fa983 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -5,7 +5,7 @@ from enum import Enum from functools import lru_cache from operator import itemgetter -from typing import TYPE_CHECKING, NamedTuple +from typing import TYPE_CHECKING, Any, NamedTuple import numpy as np import numpy.typing as npt @@ -17,10 +17,9 @@ ByteGetter, ByteSetter, Codec, - CodecPipeline, ) from zarr.buffer import Buffer, NDBuffer -from zarr.chunk_grids import RegularChunkGrid +from zarr.chunk_grids import ChunkGrid, RegularChunkGrid from zarr.codecs.bytes import BytesCodec from zarr.codecs.crc32c_ import Crc32cCodec from zarr.codecs.pipeline import BatchedCodecPipeline @@ -39,7 +38,7 @@ c_order_iter, morton_order_iter, ) -from zarr.metadata import ArrayMetadata, parse_codecs +from zarr.metadata import parse_codecs if TYPE_CHECKING: from collections.abc import Awaitable, Callable, Iterator @@ -298,34 +297,22 @@ class ShardingCodec( ArrayBytesCodec, ArrayBytesCodecPartialDecodeMixin, ArrayBytesCodecPartialEncodeMixin ): chunk_shape: ChunkCoords - codecs: CodecPipeline - index_codecs: CodecPipeline + codecs: tuple[Codec, ...] + index_codecs: tuple[Codec, ...] index_location: ShardingCodecIndexLocation = ShardingCodecIndexLocation.end def __init__( self, *, chunk_shape: ChunkCoordsLike, - codecs: Iterable[Codec | JSON] | None = None, - index_codecs: Iterable[Codec | JSON] | None = None, - index_location: ShardingCodecIndexLocation | None = ShardingCodecIndexLocation.end, + codecs: Iterable[Codec | dict[str, JSON]] = (BytesCodec(),), + index_codecs: Iterable[Codec | dict[str, JSON]] = (BytesCodec(), Crc32cCodec()), + index_location: ShardingCodecIndexLocation = ShardingCodecIndexLocation.end, ) -> None: chunk_shape_parsed = parse_shapelike(chunk_shape) - codecs_parsed = ( - parse_codecs(codecs) - if codecs is not None - else BatchedCodecPipeline.from_list([BytesCodec()]) - ) - index_codecs_parsed = ( - parse_codecs(index_codecs) - if index_codecs is not None - else BatchedCodecPipeline.from_list([BytesCodec(), Crc32cCodec()]) - ) - index_location_parsed = ( - parse_index_location(index_location) - if index_location is not None - else ShardingCodecIndexLocation.end - ) + codecs_parsed = parse_codecs(codecs) + index_codecs_parsed = parse_codecs(index_codecs) + index_location_parsed = parse_index_location(index_location) object.__setattr__(self, "chunk_shape", chunk_shape_parsed) object.__setattr__(self, "codecs", codecs_parsed) @@ -347,30 +334,30 @@ def to_dict(self) -> dict[str, JSON]: "name": "sharding_indexed", "configuration": { "chunk_shape": list(self.chunk_shape), - "codecs": self.codecs.to_dict(), - "index_codecs": self.index_codecs.to_dict(), + "codecs": [s.to_dict() for s in self.codecs], + "index_codecs": [s.to_dict() for s in self.index_codecs], "index_location": self.index_location, }, } def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: shard_spec = self._get_chunk_spec(array_spec) - evolved_codecs = self.codecs.evolve_from_array_spec(shard_spec) + evolved_codecs = tuple(c.evolve_from_array_spec(shard_spec) for c in self.codecs) if evolved_codecs != self.codecs: return replace(self, codecs=evolved_codecs) return self - def validate(self, array_metadata: ArrayMetadata) -> None: - if len(self.chunk_shape) != array_metadata.ndim: + def validate(self, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None: + if len(self.chunk_shape) != len(shape): raise ValueError( "The shard's `chunk_shape` and array's `shape` need to have the same number of dimensions." ) - if not isinstance(array_metadata.chunk_grid, RegularChunkGrid): + if not isinstance(chunk_grid, RegularChunkGrid): raise ValueError("Sharding is only compatible with regular chunk grids.") if not all( s % c == 0 for s, c in zip( - array_metadata.chunk_grid.chunk_shape, + chunk_grid.chunk_shape, self.chunk_shape, strict=False, ) @@ -406,7 +393,7 @@ async def _decode_single( return out # decoding chunks and writing them into the output buffer - await self.codecs.read( + await BatchedCodecPipeline.from_list(self.codecs).read( [ ( _ShardingByteGetter(shard_dict, chunk_coords), @@ -468,7 +455,7 @@ async def _decode_partial_single( shard_dict[chunk_coords] = chunk_bytes # decoding chunks and writing them into the output buffer - await self.codecs.read( + await BatchedCodecPipeline.from_list(self.codecs).read( [ ( _ShardingByteGetter(shard_dict, chunk_coords), @@ -502,7 +489,7 @@ async def _encode_single( shard_builder = _ShardBuilder.create_empty(chunks_per_shard) - await self.codecs.write( + await BatchedCodecPipeline.from_list(self.codecs).write( [ ( _ShardingByteSetter(shard_builder, chunk_coords), @@ -543,7 +530,7 @@ async def _encode_partial_single( ) ) - await self.codecs.write( + await BatchedCodecPipeline.from_list(self.codecs).write( [ ( _ShardingByteSetter(shard_dict, chunk_coords), @@ -578,7 +565,7 @@ async def _decode_shard_index( ) -> _ShardIndex: index_array = next( iter( - await self.index_codecs.decode( + await BatchedCodecPipeline.from_list(self.index_codecs).decode( [(index_bytes, self._get_index_chunk_spec(chunks_per_shard))], ) ) @@ -589,7 +576,7 @@ async def _decode_shard_index( async def _encode_shard_index(self, index: _ShardIndex) -> Buffer: index_bytes = next( iter( - await self.index_codecs.encode( + await BatchedCodecPipeline.from_list(self.index_codecs).encode( [ ( NDBuffer.from_numpy_array(index.offsets_and_lengths), @@ -604,7 +591,7 @@ async def _encode_shard_index(self, index: _ShardIndex) -> Buffer: return index_bytes def _shard_index_size(self, chunks_per_shard: ChunkCoords) -> int: - return self.index_codecs.compute_encoded_size( + return BatchedCodecPipeline.from_list(self.index_codecs).compute_encoded_size( 16 * product(chunks_per_shard), self._get_index_chunk_spec(chunks_per_shard) ) diff --git a/src/zarr/codecs/transpose.py b/src/zarr/codecs/transpose.py index 373a27cab9..080c0b9eb3 100644 --- a/src/zarr/codecs/transpose.py +++ b/src/zarr/codecs/transpose.py @@ -2,10 +2,13 @@ from collections.abc import Iterable from dataclasses import dataclass, replace -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Any, cast + +import numpy as np from zarr.abc.codec import ArrayArrayCodec from zarr.buffer import NDBuffer +from zarr.chunk_grids import ChunkGrid from zarr.codecs.registry import register_codec from zarr.common import JSON, ArraySpec, ChunkCoordsLike, parse_named_configuration @@ -42,6 +45,20 @@ def from_dict(cls, data: dict[str, JSON]) -> Self: def to_dict(self) -> dict[str, JSON]: return {"name": "transpose", "configuration": {"order": list(self.order)}} + def validate(self, shape: tuple[int, ...], dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None: + if len(self.order) != len(shape): + raise ValueError( + f"The `order` tuple needs have as many entries as there are dimensions in the array. Got {self.order}." + ) + if len(self.order) != len(set(self.order)): + raise ValueError( + f"There must not be duplicates in the `order` tuple. Got {self.order}." + ) + if not all(0 <= x < len(shape) for x in self.order): + raise ValueError( + f"All entries in the `order` tuple must be between 0 and the number of dimensions in the array. Got {self.order}." + ) + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: if len(self.order) != array_spec.ndim: raise ValueError( diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index 77cc23888e..d6012740e2 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -15,7 +15,6 @@ from zarr.buffer import Buffer from zarr.chunk_grids import ChunkGrid, RegularChunkGrid from zarr.chunk_key_encodings import ChunkKeyEncoding, parse_separator -from zarr.codecs._v2 import V2Compressor, V2Filters from zarr.codecs.registry import get_codec_class if TYPE_CHECKING: @@ -129,11 +128,6 @@ def dtype(self) -> np.dtype[Any]: def ndim(self) -> int: pass - @property - @abstractmethod - def codec_pipeline(self) -> CodecPipeline: - pass - @abstractmethod def get_chunk_spec(self, _chunk_coords: ChunkCoords, order: Literal["C", "F"]) -> ArraySpec: pass @@ -176,7 +170,7 @@ def __init__( chunk_grid: dict[str, JSON] | ChunkGrid, chunk_key_encoding: dict[str, JSON] | ChunkKeyEncoding, fill_value: Any, - codecs: Iterable[Codec | JSON], + codecs: Iterable[Codec | dict[str, JSON]], attributes: None | dict[str, JSON], dimension_names: None | Iterable[str], ) -> None: @@ -190,13 +184,6 @@ def __init__( dimension_names_parsed = parse_dimension_names(dimension_names) fill_value_parsed = parse_fill_value(fill_value) attributes_parsed = parse_attributes(attributes) - - array_spec = ArraySpec( - shape=shape_parsed, - dtype=data_type_parsed, - fill_value=fill_value_parsed, - order="C", # TODO: order is not needed here. - ) codecs_parsed = parse_codecs(codecs) object.__setattr__(self, "shape", shape_parsed) @@ -223,7 +210,8 @@ def _validate_metadata(self) -> None: ) if self.fill_value is None: raise ValueError("`fill_value` is required.") - self.codecs.validate(self) + for codec in self.codecs: + codec.validate(shape=self.shape, dtype=self.dtype, chunk_grid=self.chunk_grid) @property def dtype(self) -> np.dtype[Any]: @@ -233,10 +221,6 @@ def dtype(self) -> np.dtype[Any]: def ndim(self) -> int: return len(self.shape) - @property - def codec_pipeline(self) -> CodecPipeline: - return self.codecs - def get_chunk_spec(self, _chunk_coords: ChunkCoords, order: Literal["C", "F"]) -> ArraySpec: assert isinstance( self.chunk_grid, RegularChunkGrid @@ -362,13 +346,13 @@ def dtype(self) -> np.dtype[Any]: def chunks(self) -> ChunkCoords: return self.chunk_grid.chunk_shape - @property - def codec_pipeline(self) -> CodecPipeline: - from zarr.codecs import BatchedCodecPipeline + # @property + # def codec_pipeline(self) -> CodecPipeline: + # from zarr.codecs import BatchedCodecPipeline - return BatchedCodecPipeline.from_list( - [V2Filters(self.filters or []), V2Compressor(self.compressor)] - ) + # return BatchedCodecPipeline.from_list( + # [V2Filters(self.filters or []), V2Compressor(self.compressor)] + # ) def to_buffer_dict(self) -> dict[str, Buffer]: def _json_convert( @@ -495,8 +479,8 @@ def create_pipeline(data: Iterable[Codec | JSON]) -> CodecPipeline: return BatchedCodecPipeline.from_dict(data) -def parse_codecs(data: Iterable[Codec | JSON]) -> tuple[Codec, ...]: - out: tuple[Codec] = () +def parse_codecs(data: Iterable[Codec | dict[str, JSON]]) -> tuple[Codec, ...]: + out: tuple[Codec, ...] = () if not isinstance(data, Iterable): raise TypeError(f"Expected iterable, got {type(data)}") diff --git a/tests/v3/test_codecs.py b/tests/v3/test_codecs.py index 251570f767..d456e17ee9 100644 --- a/tests/v3/test_codecs.py +++ b/tests/v3/test_codecs.py @@ -81,7 +81,7 @@ def test_sharding( codecs=[ TransposeCodec(order=order_from_dim("F", sample_data.ndim)), BytesCodec(), - BloscCodec(cname="lz4"), + BloscCodec(cname="lz4", typesize=sample_data.dtype.itemsize, shuffle="shuffle"), ], index_location=index_location, ) @@ -111,7 +111,9 @@ def test_sharding_partial( codecs=[ TransposeCodec(order=order_from_dim("F", sample_data.ndim)), BytesCodec(), - BloscCodec(cname="lz4"), + BloscCodec( + cname="lz4", typesize=sample_data.dtype.itemsize, shuffle="noshuffle" + ), ], index_location=index_location, ) @@ -144,7 +146,9 @@ def test_sharding_partial_read( codecs=[ TransposeCodec(order=order_from_dim("F", sample_data.ndim)), BytesCodec(), - BloscCodec(cname="lz4"), + BloscCodec( + cname="lz4", typesize=sample_data.dtype.itemsize, shuffle="noshuffle" + ), ], index_location=index_location, ) @@ -173,7 +177,9 @@ def test_sharding_partial_overwrite( codecs=[ TransposeCodec(order=order_from_dim("F", data.ndim)), BytesCodec(), - BloscCodec(cname="lz4"), + BloscCodec( + cname="lz4", typesize=sample_data.dtype.itemsize, shuffle="noshuffle" + ), ], index_location=index_location, ) @@ -449,7 +455,7 @@ def test_open_sharding(store: Store): codecs=[ TransposeCodec(order=order_from_dim("F", 2)), BytesCodec(), - BloscCodec(), + BloscCodec(typesize=np.dtype("int32").itemsize, shuffle="noshuffle"), ], ) ], @@ -591,7 +597,7 @@ def test_write_partial_sharded_chunks(store: Store): chunk_shape=(10, 10), codecs=[ BytesCodec(), - BloscCodec(), + BloscCodec(typesize=data.dtype.itemsize), ], ) ], @@ -952,7 +958,10 @@ async def test_blosc_evolve(store: Store): chunk_shape=(16, 16), dtype="uint8", fill_value=0, - codecs=[BytesCodec(), BloscCodec()], + codecs=[ + BytesCodec(), + BloscCodec(typesize=np.dtype("uint8").itemsize, shuffle="bitshuffle"), + ], ) zarr_json = json.loads((await (store / "blosc_evolve_u1" / "zarr.json").get()).to_bytes()) @@ -966,7 +975,7 @@ async def test_blosc_evolve(store: Store): chunk_shape=(16, 16), dtype="uint16", fill_value=0, - codecs=[BytesCodec(), BloscCodec()], + codecs=[BytesCodec(), BloscCodec(typesize=np.dtype("uint16").itemsize, shuffle="shuffle")], ) zarr_json = json.loads((await (store / "blosc_evolve_u2" / "zarr.json").get()).to_bytes()) @@ -980,7 +989,15 @@ async def test_blosc_evolve(store: Store): chunk_shape=(16, 16), dtype="uint16", fill_value=0, - codecs=[ShardingCodec(chunk_shape=(16, 16), codecs=[BytesCodec(), BloscCodec()])], + codecs=[ + ShardingCodec( + chunk_shape=(16, 16), + codecs=[ + BytesCodec(), + BloscCodec(typesize=np.dtype("uint16").itemsize, shuffle="shuffle"), + ], + ) + ], ) zarr_json = json.loads((await (store / "sharding_blosc_evolve" / "zarr.json").get()).to_bytes()) From e3ced40d6838e2ff28bbaa6e537a9af617453a0e Mon Sep 17 00:00:00 2001 From: Davis Bennett Date: Fri, 31 May 2024 17:45:50 +0200 Subject: [PATCH 3/9] revert changes to blosc.py --- src/zarr/codecs/blosc.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/zarr/codecs/blosc.py b/src/zarr/codecs/blosc.py index 8f39a20b3b..e8921b8beb 100644 --- a/src/zarr/codecs/blosc.py +++ b/src/zarr/codecs/blosc.py @@ -86,16 +86,16 @@ class BloscCodec(BytesBytesCodec): def __init__( self, *, - typesize: int, + typesize: int | None = None, cname: BloscCname | str = BloscCname.zstd, clevel: int = 5, - shuffle: BloscShuffle | str = "noshuffle", + shuffle: BloscShuffle | str | None = None, blocksize: int = 0, ) -> None: - typesize_parsed = parse_typesize(typesize) + typesize_parsed = parse_typesize(typesize) if typesize is not None else None cname_parsed = parse_enum(cname, BloscCname) clevel_parsed = parse_clevel(clevel) - shuffle_parsed = parse_enum(shuffle, BloscShuffle) + shuffle_parsed = parse_enum(shuffle, BloscShuffle) if shuffle is not None else None blocksize_parsed = parse_blocksize(blocksize) object.__setattr__(self, "typesize", typesize_parsed) From 6cad24f079bdd7fda8c13e117ef96666b68acc2c Mon Sep 17 00:00:00 2001 From: Davis Bennett Date: Fri, 31 May 2024 17:49:09 +0200 Subject: [PATCH 4/9] revert changes to test_codecs.py --- tests/v3/test_codecs.py | 35 +++++++++-------------------------- 1 file changed, 9 insertions(+), 26 deletions(-) diff --git a/tests/v3/test_codecs.py b/tests/v3/test_codecs.py index d456e17ee9..251570f767 100644 --- a/tests/v3/test_codecs.py +++ b/tests/v3/test_codecs.py @@ -81,7 +81,7 @@ def test_sharding( codecs=[ TransposeCodec(order=order_from_dim("F", sample_data.ndim)), BytesCodec(), - BloscCodec(cname="lz4", typesize=sample_data.dtype.itemsize, shuffle="shuffle"), + BloscCodec(cname="lz4"), ], index_location=index_location, ) @@ -111,9 +111,7 @@ def test_sharding_partial( codecs=[ TransposeCodec(order=order_from_dim("F", sample_data.ndim)), BytesCodec(), - BloscCodec( - cname="lz4", typesize=sample_data.dtype.itemsize, shuffle="noshuffle" - ), + BloscCodec(cname="lz4"), ], index_location=index_location, ) @@ -146,9 +144,7 @@ def test_sharding_partial_read( codecs=[ TransposeCodec(order=order_from_dim("F", sample_data.ndim)), BytesCodec(), - BloscCodec( - cname="lz4", typesize=sample_data.dtype.itemsize, shuffle="noshuffle" - ), + BloscCodec(cname="lz4"), ], index_location=index_location, ) @@ -177,9 +173,7 @@ def test_sharding_partial_overwrite( codecs=[ TransposeCodec(order=order_from_dim("F", data.ndim)), BytesCodec(), - BloscCodec( - cname="lz4", typesize=sample_data.dtype.itemsize, shuffle="noshuffle" - ), + BloscCodec(cname="lz4"), ], index_location=index_location, ) @@ -455,7 +449,7 @@ def test_open_sharding(store: Store): codecs=[ TransposeCodec(order=order_from_dim("F", 2)), BytesCodec(), - BloscCodec(typesize=np.dtype("int32").itemsize, shuffle="noshuffle"), + BloscCodec(), ], ) ], @@ -597,7 +591,7 @@ def test_write_partial_sharded_chunks(store: Store): chunk_shape=(10, 10), codecs=[ BytesCodec(), - BloscCodec(typesize=data.dtype.itemsize), + BloscCodec(), ], ) ], @@ -958,10 +952,7 @@ async def test_blosc_evolve(store: Store): chunk_shape=(16, 16), dtype="uint8", fill_value=0, - codecs=[ - BytesCodec(), - BloscCodec(typesize=np.dtype("uint8").itemsize, shuffle="bitshuffle"), - ], + codecs=[BytesCodec(), BloscCodec()], ) zarr_json = json.loads((await (store / "blosc_evolve_u1" / "zarr.json").get()).to_bytes()) @@ -975,7 +966,7 @@ async def test_blosc_evolve(store: Store): chunk_shape=(16, 16), dtype="uint16", fill_value=0, - codecs=[BytesCodec(), BloscCodec(typesize=np.dtype("uint16").itemsize, shuffle="shuffle")], + codecs=[BytesCodec(), BloscCodec()], ) zarr_json = json.loads((await (store / "blosc_evolve_u2" / "zarr.json").get()).to_bytes()) @@ -989,15 +980,7 @@ async def test_blosc_evolve(store: Store): chunk_shape=(16, 16), dtype="uint16", fill_value=0, - codecs=[ - ShardingCodec( - chunk_shape=(16, 16), - codecs=[ - BytesCodec(), - BloscCodec(typesize=np.dtype("uint16").itemsize, shuffle="shuffle"), - ], - ) - ], + codecs=[ShardingCodec(chunk_shape=(16, 16), codecs=[BytesCodec(), BloscCodec()])], ) zarr_json = json.loads((await (store / "sharding_blosc_evolve" / "zarr.json").get()).to_bytes()) From 29f907bb5b34ebd5923f17bb614019749a710de5 Mon Sep 17 00:00:00 2001 From: Davis Bennett Date: Fri, 31 May 2024 18:23:22 +0200 Subject: [PATCH 5/9] consistent expanded function signature for evolve_from_array_spec --- src/zarr/abc/codec.py | 22 ++++++++++++++++++---- src/zarr/codecs/blosc.py | 18 +++++++++--------- src/zarr/codecs/bytes.py | 11 +++++++---- src/zarr/codecs/pipeline.py | 13 ++++++++++--- src/zarr/codecs/sharding.py | 10 +++++++--- src/zarr/codecs/transpose.py | 10 ++++++---- src/zarr/metadata.py | 8 +++++++- 7 files changed, 64 insertions(+), 28 deletions(-) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index 2a7f4dfa90..7c0a12ab98 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -63,13 +63,20 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: """ return chunk_spec - def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: + def evolve_from_array_spec( + self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid + ) -> Self: """Fills in codec configuration parameters that can be automatically inferred from the array metadata. Parameters ---------- - chunk_spec : ArraySpec + shape: tuple[int, ...] + The shape of the array. + dtype: np.dtype + The datatype of the array. + chunk_grid: ChunkGrid + The chunk grid of the array. Returns ------- @@ -245,13 +252,20 @@ class CodecPipeline: and writes them to a store (via ByteSetter).""" @abstractmethod - def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: + def evolve_from_array_spec( + self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid + ) -> Self: """Fills in codec configuration parameters that can be automatically inferred from the array metadata. Parameters ---------- - array_spec : ArraySpec + shape: tuple[int, ...] + The shape of the array. + dtype: np.dtype + The datatype of the array. + chunk_grid: ChunkGrid + The chunk grid of the array. Returns ------- diff --git a/src/zarr/codecs/blosc.py b/src/zarr/codecs/blosc.py index e8921b8beb..bdb60307fd 100644 --- a/src/zarr/codecs/blosc.py +++ b/src/zarr/codecs/blosc.py @@ -3,15 +3,17 @@ from dataclasses import dataclass, replace from enum import Enum from functools import cached_property -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import numcodecs +import numpy as np from numcodecs.blosc import Blosc from zarr.abc.codec import BytesBytesCodec from zarr.buffer import Buffer, as_numpy_array_wrapper +from zarr.chunk_grids import ChunkGrid from zarr.codecs.registry import register_codec -from zarr.common import parse_enum, parse_named_configuration, to_thread +from zarr.common import ChunkCoords, parse_enum, parse_named_configuration, to_thread if TYPE_CHECKING: from typing_extensions import Self @@ -125,18 +127,16 @@ def to_dict(self) -> dict[str, JSON]: }, } - def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: + def evolve_from_array_spec( + self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid + ) -> Self: new_codec = self if new_codec.typesize is None: - new_codec = replace(new_codec, typesize=array_spec.dtype.itemsize) + new_codec = replace(new_codec, typesize=dtype.itemsize) if new_codec.shuffle is None: new_codec = replace( new_codec, - shuffle=( - BloscShuffle.bitshuffle - if array_spec.dtype.itemsize == 1 - else BloscShuffle.shuffle - ), + shuffle=(BloscShuffle.bitshuffle if dtype.itemsize == 1 else BloscShuffle.shuffle), ) return new_codec diff --git a/src/zarr/codecs/bytes.py b/src/zarr/codecs/bytes.py index f275ae37d1..65841ccc05 100644 --- a/src/zarr/codecs/bytes.py +++ b/src/zarr/codecs/bytes.py @@ -3,14 +3,15 @@ import sys from dataclasses import dataclass, replace from enum import Enum -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import numpy as np from zarr.abc.codec import ArrayBytesCodec from zarr.buffer import Buffer, NDArrayLike, NDBuffer +from zarr.chunk_grids import ChunkGrid from zarr.codecs.registry import register_codec -from zarr.common import parse_enum, parse_named_configuration +from zarr.common import ChunkCoords, parse_enum, parse_named_configuration if TYPE_CHECKING: from typing_extensions import Self @@ -51,8 +52,10 @@ def to_dict(self) -> dict[str, JSON]: else: return {"name": "bytes", "configuration": {"endian": self.endian}} - def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: - if array_spec.dtype.itemsize == 0: + def evolve_from_array_spec( + self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid + ) -> Self: + if dtype.itemsize == 0: if self.endian is not None: return replace(self, endian=None) elif self.endian is None: diff --git a/src/zarr/codecs/pipeline.py b/src/zarr/codecs/pipeline.py index 2d57317581..6787f3ee22 100644 --- a/src/zarr/codecs/pipeline.py +++ b/src/zarr/codecs/pipeline.py @@ -22,7 +22,7 @@ from zarr.buffer import Buffer, NDBuffer from zarr.chunk_grids import ChunkGrid from zarr.codecs.registry import get_codec_class -from zarr.common import JSON, concurrent_map, parse_named_configuration +from zarr.common import JSON, ChunkCoords, concurrent_map, parse_named_configuration from zarr.config import config from zarr.indexing import is_total_slice @@ -89,8 +89,15 @@ def from_dict(cls, data: Iterable[JSON | Codec], *, batch_size: int | None = Non def to_dict(self) -> JSON: return [c.to_dict() for c in self] - def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: - return type(self).from_list([c.evolve_from_array_spec(array_spec) for c in self]) + def evolve_from_array_spec( + self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid + ) -> Self: + return type(self).from_list( + [ + c.evolve_from_array_spec(shape=shape, dtype=dtype, chunk_grid=chunk_grid) + for c in self + ] + ) @classmethod def from_list(cls, codecs: Iterable[Codec], *, batch_size: int | None = None) -> Self: diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index 3a5b2fa983..02c8cebe4d 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -340,9 +340,13 @@ def to_dict(self) -> dict[str, JSON]: }, } - def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: - shard_spec = self._get_chunk_spec(array_spec) - evolved_codecs = tuple(c.evolve_from_array_spec(shard_spec) for c in self.codecs) + def evolve_from_array_spec( + self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid + ) -> Self: + evolved_codecs = tuple( + c.evolve_from_array_spec(shape=self.chunk_shape, dtype=dtype, chunk_grid=chunk_grid) + for c in self.codecs + ) if evolved_codecs != self.codecs: return replace(self, codecs=evolved_codecs) return self diff --git a/src/zarr/codecs/transpose.py b/src/zarr/codecs/transpose.py index 080c0b9eb3..ed7851e248 100644 --- a/src/zarr/codecs/transpose.py +++ b/src/zarr/codecs/transpose.py @@ -10,7 +10,7 @@ from zarr.buffer import NDBuffer from zarr.chunk_grids import ChunkGrid from zarr.codecs.registry import register_codec -from zarr.common import JSON, ArraySpec, ChunkCoordsLike, parse_named_configuration +from zarr.common import JSON, ArraySpec, ChunkCoords, ChunkCoordsLike, parse_named_configuration if TYPE_CHECKING: from typing import TYPE_CHECKING @@ -59,8 +59,10 @@ def validate(self, shape: tuple[int, ...], dtype: np.dtype[Any], chunk_grid: Chu f"All entries in the `order` tuple must be between 0 and the number of dimensions in the array. Got {self.order}." ) - def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: - if len(self.order) != array_spec.ndim: + def evolve_from_array_spec( + self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid + ) -> Self: + if len(self.order) != len(shape): raise ValueError( f"The `order` tuple needs have as many entries as there are dimensions in the array. Got {self.order}." ) @@ -68,7 +70,7 @@ def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: raise ValueError( f"There must not be duplicates in the `order` tuple. Got {self.order}." ) - if not all(0 <= x < array_spec.ndim for x in self.order): + if not all(0 <= x < len(shape) for x in self.order): raise ValueError( f"All entries in the `order` tuple must be between 0 and the number of dimensions in the array. Got {self.order}." ) diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index d6012740e2..762b0b97cc 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -184,7 +184,13 @@ def __init__( dimension_names_parsed = parse_dimension_names(dimension_names) fill_value_parsed = parse_fill_value(fill_value) attributes_parsed = parse_attributes(attributes) - codecs_parsed = parse_codecs(codecs) + codecs_parsed_partial = parse_codecs(codecs) + codecs_parsed = [ + c.evolve_from_array_spec( + shape=shape_parsed, dtype=data_type_parsed, chunk_grid=chunk_grid_parsed + ) + for c in codecs_parsed_partial + ] object.__setattr__(self, "shape", shape_parsed) object.__setattr__(self, "data_type", data_type_parsed) From 50ecc7aa37a52909c6dfea6496336d1f362207cb Mon Sep 17 00:00:00 2001 From: Davis Bennett Date: Fri, 31 May 2024 18:55:21 +0200 Subject: [PATCH 6/9] restore wider function signature for codec.validate to avoid self-referential function call --- src/zarr/abc/codec.py | 51 +++++++++++++----------------------- src/zarr/codecs/blosc.py | 11 +++----- src/zarr/codecs/bytes.py | 11 +++----- src/zarr/codecs/pipeline.py | 13 +++------ src/zarr/codecs/sharding.py | 12 +++------ src/zarr/codecs/transpose.py | 11 ++++---- src/zarr/metadata.py | 16 ++++++----- 7 files changed, 47 insertions(+), 78 deletions(-) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index 7c0a12ab98..3508b91d2c 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -63,20 +63,13 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: """ return chunk_spec - def evolve_from_array_spec( - self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid - ) -> Self: + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: """Fills in codec configuration parameters that can be automatically inferred from the array metadata. Parameters ---------- - shape: tuple[int, ...] - The shape of the array. - dtype: np.dtype - The datatype of the array. - chunk_grid: ChunkGrid - The chunk grid of the array. + chunk_spec : ArraySpec Returns ------- @@ -84,19 +77,18 @@ def evolve_from_array_spec( """ return self - def validate(self, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None: - """ - Validates that the codec configuration is compatible with the array metadata. + def validate(self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None: + """Validates that the codec configuration is compatible with the array metadata. Raises errors when the codec configuration is not compatible. Parameters ---------- - shape: tuple[int, ...] - The shape of the array. - dtype: np.dtype - The datatype of the array. + shape: ChunkCoords + The array shape + dtype: np.dtype[Any] + The array data type chunk_grid: ChunkGrid - The chunk grid of the array. + The array chunk grid """ ... @@ -244,7 +236,7 @@ async def encode_partial( ) -class CodecPipeline: +class CodecPipeline(Metadata): """Base class for implementing CodecPipeline. A CodecPipeline implements the read and write paths for chunk data. On the read path, it is responsible for fetching chunks from a store (via ByteGetter), @@ -252,20 +244,13 @@ class CodecPipeline: and writes them to a store (via ByteSetter).""" @abstractmethod - def evolve_from_array_spec( - self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid - ) -> Self: + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: """Fills in codec configuration parameters that can be automatically inferred from the array metadata. Parameters ---------- - shape: tuple[int, ...] - The shape of the array. - dtype: np.dtype - The datatype of the array. - chunk_grid: ChunkGrid - The chunk grid of the array. + array_spec : ArraySpec Returns ------- @@ -297,18 +282,18 @@ def supports_partial_decode(self) -> bool: ... def supports_partial_encode(self) -> bool: ... @abstractmethod - def validate(self, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None: + def validate(self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None: """Validates that all codec configurations are compatible with the array metadata. Raises errors when a codec configuration is not compatible. Parameters ---------- - shape: tuple[int, ...] - The shape of the array. - dtype: np.dtype - The datatype of the array. + shape: ChunkCoords + The array shape + dtype: np.dtype[Any] + The array data type chunk_grid: ChunkGrid - The chunk grid of the array. + The array chunk grid """ ... diff --git a/src/zarr/codecs/blosc.py b/src/zarr/codecs/blosc.py index bdb60307fd..546eadd713 100644 --- a/src/zarr/codecs/blosc.py +++ b/src/zarr/codecs/blosc.py @@ -3,17 +3,15 @@ from dataclasses import dataclass, replace from enum import Enum from functools import cached_property -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import numcodecs -import numpy as np from numcodecs.blosc import Blosc from zarr.abc.codec import BytesBytesCodec from zarr.buffer import Buffer, as_numpy_array_wrapper -from zarr.chunk_grids import ChunkGrid from zarr.codecs.registry import register_codec -from zarr.common import ChunkCoords, parse_enum, parse_named_configuration, to_thread +from zarr.common import parse_enum, parse_named_configuration, to_thread if TYPE_CHECKING: from typing_extensions import Self @@ -127,9 +125,8 @@ def to_dict(self) -> dict[str, JSON]: }, } - def evolve_from_array_spec( - self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid - ) -> Self: + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: + dtype = array_spec.dtype new_codec = self if new_codec.typesize is None: new_codec = replace(new_codec, typesize=dtype.itemsize) diff --git a/src/zarr/codecs/bytes.py b/src/zarr/codecs/bytes.py index 65841ccc05..f275ae37d1 100644 --- a/src/zarr/codecs/bytes.py +++ b/src/zarr/codecs/bytes.py @@ -3,15 +3,14 @@ import sys from dataclasses import dataclass, replace from enum import Enum -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import numpy as np from zarr.abc.codec import ArrayBytesCodec from zarr.buffer import Buffer, NDArrayLike, NDBuffer -from zarr.chunk_grids import ChunkGrid from zarr.codecs.registry import register_codec -from zarr.common import ChunkCoords, parse_enum, parse_named_configuration +from zarr.common import parse_enum, parse_named_configuration if TYPE_CHECKING: from typing_extensions import Self @@ -52,10 +51,8 @@ def to_dict(self) -> dict[str, JSON]: else: return {"name": "bytes", "configuration": {"endian": self.endian}} - def evolve_from_array_spec( - self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid - ) -> Self: - if dtype.itemsize == 0: + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: + if array_spec.dtype.itemsize == 0: if self.endian is not None: return replace(self, endian=None) elif self.endian is None: diff --git a/src/zarr/codecs/pipeline.py b/src/zarr/codecs/pipeline.py index 6787f3ee22..93aba61d83 100644 --- a/src/zarr/codecs/pipeline.py +++ b/src/zarr/codecs/pipeline.py @@ -89,15 +89,8 @@ def from_dict(cls, data: Iterable[JSON | Codec], *, batch_size: int | None = Non def to_dict(self) -> JSON: return [c.to_dict() for c in self] - def evolve_from_array_spec( - self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid - ) -> Self: - return type(self).from_list( - [ - c.evolve_from_array_spec(shape=shape, dtype=dtype, chunk_grid=chunk_grid) - for c in self - ] - ) + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: + return type(self).from_list([c.evolve_from_array_spec(array_spec=array_spec) for c in self]) @classmethod def from_list(cls, codecs: Iterable[Codec], *, batch_size: int | None = None) -> Self: @@ -147,7 +140,7 @@ def __iter__(self) -> Iterator[Codec]: yield self.array_bytes_codec yield from self.bytes_bytes_codecs - def validate(self, shape: tuple[int, ...], dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None: + def validate(self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None: for codec in self: codec.validate(shape=shape, dtype=dtype, chunk_grid=chunk_grid) diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index 02c8cebe4d..389e70fb0b 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -340,18 +340,14 @@ def to_dict(self) -> dict[str, JSON]: }, } - def evolve_from_array_spec( - self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid - ) -> Self: - evolved_codecs = tuple( - c.evolve_from_array_spec(shape=self.chunk_shape, dtype=dtype, chunk_grid=chunk_grid) - for c in self.codecs - ) + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: + shard_spec = self._get_chunk_spec(array_spec) + evolved_codecs = tuple(c.evolve_from_array_spec(array_spec=shard_spec) for c in self.codecs) if evolved_codecs != self.codecs: return replace(self, codecs=evolved_codecs) return self - def validate(self, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None: + def validate(self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid) -> None: if len(self.chunk_shape) != len(shape): raise ValueError( "The shard's `chunk_shape` and array's `shape` need to have the same number of dimensions." diff --git a/src/zarr/codecs/transpose.py b/src/zarr/codecs/transpose.py index ed7851e248..cbb36c733d 100644 --- a/src/zarr/codecs/transpose.py +++ b/src/zarr/codecs/transpose.py @@ -10,7 +10,7 @@ from zarr.buffer import NDBuffer from zarr.chunk_grids import ChunkGrid from zarr.codecs.registry import register_codec -from zarr.common import JSON, ArraySpec, ChunkCoords, ChunkCoordsLike, parse_named_configuration +from zarr.common import JSON, ArraySpec, ChunkCoordsLike, parse_named_configuration if TYPE_CHECKING: from typing import TYPE_CHECKING @@ -59,10 +59,9 @@ def validate(self, shape: tuple[int, ...], dtype: np.dtype[Any], chunk_grid: Chu f"All entries in the `order` tuple must be between 0 and the number of dimensions in the array. Got {self.order}." ) - def evolve_from_array_spec( - self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid - ) -> Self: - if len(self.order) != len(shape): + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: + ndim = array_spec.ndim + if len(self.order) != ndim: raise ValueError( f"The `order` tuple needs have as many entries as there are dimensions in the array. Got {self.order}." ) @@ -70,7 +69,7 @@ def evolve_from_array_spec( raise ValueError( f"There must not be duplicates in the `order` tuple. Got {self.order}." ) - if not all(0 <= x < len(shape) for x in self.order): + if not all(0 <= x < ndim for x in self.order): raise ValueError( f"All entries in the `order` tuple must be between 0 and the number of dimensions in the array. Got {self.order}." ) diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index 762b0b97cc..77678ac799 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -185,12 +185,14 @@ def __init__( fill_value_parsed = parse_fill_value(fill_value) attributes_parsed = parse_attributes(attributes) codecs_parsed_partial = parse_codecs(codecs) - codecs_parsed = [ - c.evolve_from_array_spec( - shape=shape_parsed, dtype=data_type_parsed, chunk_grid=chunk_grid_parsed - ) - for c in codecs_parsed_partial - ] + + array_spec = ArraySpec( + shape=shape_parsed, + dtype=data_type_parsed, + fill_value=fill_value_parsed, + order="C", # TODO: order is not needed here. + ) + codecs_parsed = [c.evolve_from_array_spec(array_spec) for c in codecs_parsed_partial] object.__setattr__(self, "shape", shape_parsed) object.__setattr__(self, "data_type", data_type_parsed) @@ -217,7 +219,7 @@ def _validate_metadata(self) -> None: if self.fill_value is None: raise ValueError("`fill_value` is required.") for codec in self.codecs: - codec.validate(shape=self.shape, dtype=self.dtype, chunk_grid=self.chunk_grid) + codec.validate(shape=self.shape, dtype=self.data_type, chunk_grid=self.chunk_grid) @property def dtype(self) -> np.dtype[Any]: From 54dc2e874996ee5ea3c33039ba730d7e1025fed1 Mon Sep 17 00:00:00 2001 From: Davis Bennett Date: Fri, 31 May 2024 19:02:20 +0200 Subject: [PATCH 7/9] remove commented code block --- src/zarr/metadata.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index 77678ac799..1ef1731fd6 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -354,14 +354,6 @@ def dtype(self) -> np.dtype[Any]: def chunks(self) -> ChunkCoords: return self.chunk_grid.chunk_shape - # @property - # def codec_pipeline(self) -> CodecPipeline: - # from zarr.codecs import BatchedCodecPipeline - - # return BatchedCodecPipeline.from_list( - # [V2Filters(self.filters or []), V2Compressor(self.compressor)] - # ) - def to_buffer_dict(self) -> dict[str, Buffer]: def _json_convert( o: np.dtype[Any], From 2d5a84efca73d7a3b6abe59b7b1692c54399f09d Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Wed, 12 Jun 2024 10:47:19 +0200 Subject: [PATCH 8/9] make codec_pipeline a cached property of sharding codec --- src/zarr/codecs/sharding.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index 7bfe98bf65..f63f3f1f14 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -3,7 +3,7 @@ from collections.abc import Iterable, Mapping, MutableMapping from dataclasses import dataclass, field, replace from enum import Enum -from functools import lru_cache +from functools import cached_property, lru_cache from operator import itemgetter from typing import TYPE_CHECKING, Any, NamedTuple @@ -329,6 +329,10 @@ def from_dict(cls, data: dict[str, JSON]) -> Self: _, configuration_parsed = parse_named_configuration(data, "sharding_indexed") return cls(**configuration_parsed) # type: ignore[arg-type] + @cached_property + def codec_pipeline(self) -> BatchedCodecPipeline: + return BatchedCodecPipeline.from_list(self.codecs) + def to_dict(self) -> dict[str, JSON]: return { "name": "sharding_indexed", @@ -393,7 +397,7 @@ async def _decode_single( return out # decoding chunks and writing them into the output buffer - await BatchedCodecPipeline.from_list(self.codecs).read( + await self.codec_pipeline.read( [ ( _ShardingByteGetter(shard_dict, chunk_coords), @@ -461,7 +465,7 @@ async def _decode_partial_single( shard_dict[chunk_coords] = chunk_bytes # decoding chunks and writing them into the output buffer - await BatchedCodecPipeline.from_list(self.codecs).read( + await self.codec_pipeline.read( [ ( _ShardingByteGetter(shard_dict, chunk_coords), @@ -495,7 +499,7 @@ async def _encode_single( shard_builder = _ShardBuilder.create_empty(chunks_per_shard) - await BatchedCodecPipeline.from_list(self.codecs).write( + await self.codec_pipeline.write( [ ( _ShardingByteSetter(shard_builder, chunk_coords), @@ -538,7 +542,7 @@ async def _encode_partial_single( ) ) - await BatchedCodecPipeline.from_list(self.codecs).write( + await self.codec_pipeline.write( [ ( _ShardingByteSetter(shard_dict, chunk_coords), From 99797c2a414b46eb4a54692d4440ed82c7335f35 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Wed, 12 Jun 2024 11:16:47 +0200 Subject: [PATCH 9/9] cached_property -> vanilla property --- src/zarr/codecs/sharding.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index f63f3f1f14..def95b206d 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -3,7 +3,7 @@ from collections.abc import Iterable, Mapping, MutableMapping from dataclasses import dataclass, field, replace from enum import Enum -from functools import cached_property, lru_cache +from functools import lru_cache from operator import itemgetter from typing import TYPE_CHECKING, Any, NamedTuple @@ -329,7 +329,7 @@ def from_dict(cls, data: dict[str, JSON]) -> Self: _, configuration_parsed = parse_named_configuration(data, "sharding_indexed") return cls(**configuration_parsed) # type: ignore[arg-type] - @cached_property + @property def codec_pipeline(self) -> BatchedCodecPipeline: return BatchedCodecPipeline.from_list(self.codecs)