Skip to content

Refactors v2 codec handling #2425

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 48 additions & 67 deletions src/zarr/codecs/_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@
from typing import TYPE_CHECKING

import numcodecs
from numcodecs.compat import ensure_bytes, ensure_ndarray
from numcodecs.compat import ensure_ndarray_like

from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec
from zarr.core.buffer import Buffer, NDBuffer, default_buffer_prototype
from zarr.abc.codec import ArrayBytesCodec
from zarr.registry import get_ndbuffer_class

if TYPE_CHECKING:
import numcodecs.abc

from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, NDBuffer


@dataclass(frozen=True)
class V2Compressor(ArrayBytesCodec):
class V2Codec(ArrayBytesCodec):
filters: tuple[numcodecs.abc.Codec, ...] | None
compressor: numcodecs.abc.Codec | None

is_fixed_size = False
Expand All @@ -28,81 +29,61 @@ async def _decode_single(
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
if self.compressor is not None:
chunk_numpy_array = ensure_ndarray(
await asyncio.to_thread(self.compressor.decode, chunk_bytes.as_array_like())
)
cdata = chunk_bytes.as_array_like()
# decompress
if self.compressor:
chunk = await asyncio.to_thread(self.compressor.decode, cdata)
else:
chunk_numpy_array = ensure_ndarray(chunk_bytes.as_array_like())
chunk = cdata

# apply filters
if self.filters:
for f in reversed(self.filters):
chunk = await asyncio.to_thread(f.decode, chunk)

# view as numpy array with correct dtype
chunk = ensure_ndarray_like(chunk)
# special case object dtype, because incorrect handling can lead to
# segfaults and other bad things happening
if chunk_spec.dtype != object:
chunk = chunk.view(chunk_spec.dtype)
elif chunk.dtype != object:
# If we end up here, someone must have hacked around with the filters.
# We cannot deal with object arrays unless there is an object
# codec in the filter chain, i.e., a filter that converts from object
# array to something else during encoding, and converts back to object
# array during decoding.
raise RuntimeError("cannot read object array without object codec")

# ensure correct dtype
if str(chunk_numpy_array.dtype) != chunk_spec.dtype and not chunk_spec.dtype.hasobject:
chunk_numpy_array = chunk_numpy_array.view(chunk_spec.dtype)
# ensure correct chunk shape
chunk = chunk.reshape(-1, order="A")
chunk = chunk.reshape(chunk_spec.shape, order=chunk_spec.order)

return get_ndbuffer_class().from_numpy_array(chunk_numpy_array)
return get_ndbuffer_class().from_ndarray_like(chunk)

async def _encode_single(
self,
chunk_array: NDBuffer,
_chunk_spec: ArraySpec,
) -> Buffer | None:
chunk_numpy_array = chunk_array.as_numpy_array()
if self.compressor is not None:
if (
not chunk_numpy_array.flags.c_contiguous
and not chunk_numpy_array.flags.f_contiguous
):
chunk_numpy_array = chunk_numpy_array.copy(order="A")
encoded_chunk_bytes = ensure_bytes(
await asyncio.to_thread(self.compressor.encode, chunk_numpy_array)
)
else:
encoded_chunk_bytes = ensure_bytes(chunk_numpy_array)

return default_buffer_prototype().buffer.from_bytes(encoded_chunk_bytes)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError


@dataclass(frozen=True)
class V2Filters(ArrayArrayCodec):
filters: tuple[numcodecs.abc.Codec, ...] | None

is_fixed_size = False

async def _decode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
chunk_ndarray = chunk_array.as_ndarray_like()
# apply filters in reverse order
if self.filters is not None:
for filter in self.filters[::-1]:
chunk_ndarray = await asyncio.to_thread(filter.decode, chunk_ndarray)

# ensure correct chunk shape
if chunk_ndarray.shape != chunk_spec.shape:
chunk_ndarray = chunk_ndarray.reshape(
chunk_spec.shape,
order=chunk_spec.order,
)
) -> Buffer | None:
chunk = chunk_array.as_ndarray_like()

return get_ndbuffer_class().from_ndarray_like(chunk_ndarray)
# apply filters
if self.filters:
for f in self.filters:
chunk = await asyncio.to_thread(f.encode, chunk)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> NDBuffer | None:
chunk_ndarray = chunk_array.as_ndarray_like().ravel(order=chunk_spec.order)
# check object encoding
if ensure_ndarray_like(chunk).dtype == object:
raise RuntimeError("cannot write object array without object codec")

if self.filters is not None:
for filter in self.filters:
chunk_ndarray = await asyncio.to_thread(filter.encode, chunk_ndarray)
# compress
if self.compressor:
cdata = await asyncio.to_thread(self.compressor.encode, chunk)
else:
cdata = chunk

return get_ndbuffer_class().from_ndarray_like(chunk_ndarray)
return chunk_spec.prototype.buffer.from_bytes(cdata)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError
7 changes: 3 additions & 4 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from zarr._compat import _deprecate_positional_args
from zarr.abc.store import Store, set_or_delete
from zarr.codecs import _get_default_array_bytes_codec
from zarr.codecs._v2 import V2Compressor, V2Filters
from zarr.codecs._v2 import V2Codec
from zarr.core.attributes import Attributes
from zarr.core.buffer import (
BufferPrototype,
Expand Down Expand Up @@ -118,9 +118,8 @@ def create_codec_pipeline(metadata: ArrayMetadata) -> CodecPipeline:
if isinstance(metadata, ArrayV3Metadata):
return get_pipeline_class().from_codecs(metadata.codecs)
elif isinstance(metadata, ArrayV2Metadata):
return get_pipeline_class().from_codecs(
[V2Filters(metadata.filters), V2Compressor(metadata.compressor)]
)
v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor)
return get_pipeline_class().from_codecs([v2_codec])
else:
raise TypeError

Expand Down
9 changes: 9 additions & 0 deletions tests/test_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,12 @@ async def test_create_dtype_str(dtype: Any) -> None:
arr[:] = ["a", "bb", "ccc"]
result = arr[:]
np.testing.assert_array_equal(result, np.array(["a", "bb", "ccc"], dtype="object"))


@pytest.mark.parametrize("filters", [[], [numcodecs.Delta(dtype="<i4")], [numcodecs.Zlib(level=2)]])
def test_v2_filters_codecs(filters: Any) -> None:
array_fixture = [42]
arr = zarr.create(shape=1, dtype="<i4", zarr_format=2, filters=filters)
arr[:] = array_fixture
result = arr[:]
np.testing.assert_array_equal(result, array_fixture)