From 9a47b150dd2c4de607a1cab0203e5f203d037bea Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 31 Jan 2025 08:46:03 -0700 Subject: [PATCH 01/13] Skip reads when completely overwriting boundary chunks Uses `slice(..., None)` to indicate that a `chunk_selection` ends at the boundary of the current chunk. Also does so for a last chunk that is shorter than the chunk size. `is_total_slice` now understands this convention, and correctly detects boundary chunks as total slices. Closes #757 --- src/zarr/core/indexing.py | 26 ++++++++++++++++++++++---- src/zarr/storage/_logging.py | 2 +- tests/test_array.py | 11 ++++++++++- tests/test_indexing.py | 3 +++ 4 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/zarr/core/indexing.py b/src/zarr/core/indexing.py index 733b2464ac..ba89bc412f 100644 --- a/src/zarr/core/indexing.py +++ b/src/zarr/core/indexing.py @@ -403,16 +403,24 @@ def __iter__(self) -> Iterator[ChunkDimProjection]: dim_chunk_sel_start = self.start - dim_offset dim_out_offset = 0 + # Use None to indicate this selection ends at the chunk edge + # This is useful for is_total_slice at boundary chunks, if self.stop > dim_limit: # selection ends after current chunk - dim_chunk_sel_stop = dim_chunk_len + dim_chunk_sel_stop = None # dim_chunk_len else: # selection ends within current chunk - dim_chunk_sel_stop = self.stop - dim_offset + if dim_chunk_ix == (self.nchunks - 1): + # all of the last chunk is included + dim_chunk_sel_stop = None + else: + dim_chunk_sel_stop = self.stop - dim_offset dim_chunk_sel = slice(dim_chunk_sel_start, dim_chunk_sel_stop, self.step) - dim_chunk_nitems = ceildiv((dim_chunk_sel_stop - dim_chunk_sel_start), self.step) + dim_chunk_nitems = ceildiv( + ((dim_chunk_sel_stop or dim_chunk_len) - dim_chunk_sel_start), self.step + ) # If there are no elements on the selection within this chunk, then skip if dim_chunk_nitems == 0: @@ -1378,7 +1386,17 @@ def is_total_slice(item: Selection, shape: ChunkCoords) -> bool: isinstance(dim_sel, slice) and ( (dim_sel == slice(None)) - or ((dim_sel.stop - dim_sel.start == dim_len) and (dim_sel.step in [1, None])) + or ( + dim_sel.stop is not None + and (dim_sel.stop - dim_sel.start == dim_len) + and (dim_sel.step in [1, None]) + ) + # starts exactly at a chunk + or ( + (dim_sel.start % dim_len == 0) + and dim_sel.stop is None + and (dim_sel.step in [1, None]) + ) ) ) for dim_sel, dim_len in zip(item, shape, strict=False) diff --git a/src/zarr/storage/_logging.py b/src/zarr/storage/_logging.py index e9d6211588..5f1a97acd9 100644 --- a/src/zarr/storage/_logging.py +++ b/src/zarr/storage/_logging.py @@ -88,7 +88,7 @@ def log(self, hint: Any = "") -> Generator[None, None, None]: op = f"{type(self._store).__name__}.{method}" if hint: op = f"{op}({hint})" - self.logger.info("Calling %s", op) + self.logger.info(" Calling %s", op) start_time = time.time() try: self.counter[method] += 1 diff --git a/tests/test_array.py b/tests/test_array.py index 80ff8444fc..a1b08e1a63 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -1335,5 +1335,14 @@ async def test_orthogonal_set_total_slice() -> None: """Ensure that a whole chunk overwrite does not read chunks""" store = MemoryStore() array = zarr.create_array(store, shape=(20, 20), chunks=(1, 2), dtype=int, fill_value=-1) - with mock.patch("zarr.storage.MemoryStore.get", side_effect=ValueError): + with mock.patch("zarr.storage.MemoryStore.get", side_effect=RuntimeError): array[0, slice(4, 10)] = np.arange(6) + + array = zarr.create_array( + store, shape=(20, 21), chunks=(1, 2), dtype=int, fill_value=-1, overwrite=True + ) + with mock.patch("zarr.storage.MemoryStore.get", side_effect=RuntimeError): + array[0, :] = np.arange(21) + + with mock.patch("zarr.storage.MemoryStore.get", side_effect=RuntimeError): + array[:] = 1 diff --git a/tests/test_indexing.py b/tests/test_indexing.py index 932c32f1ae..56f1870994 100644 --- a/tests/test_indexing.py +++ b/tests/test_indexing.py @@ -1959,3 +1959,6 @@ def test_vectorized_indexing_incompatible_shape(store) -> None: def test_is_total_slice(): assert is_total_slice((0, slice(4, 6)), (1, 2)) assert is_total_slice((slice(0, 1, None), slice(4, 6)), (1, 2)) + assert is_total_slice((slice(0, 1, None), slice(4, None)), (1, 2)) + # slice(5, None) starts in the middle of a chunk + assert not is_total_slice((slice(0, 1, None), slice(5, None)), (1, 2)) From e1784928dee29f1331d0b04b15b30a03b5ce314d Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 3 Feb 2025 10:49:50 -0700 Subject: [PATCH 02/13] normalize in codec_pipeline --- src/zarr/core/codec_pipeline.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index 583ca01c5e..55fa0be694 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -35,6 +35,22 @@ U = TypeVar("U") +def normalize_slices( + idxr: tuple[int | slice, ...], shape: tuple[int, ...] +) -> tuple[int | slice, ...]: + # replace slice objects with stop==None with size + out = [] + for i, size in zip(idxr, shape, strict=False): + if not isinstance(i, slice): + out.append(i) + continue + if i.step not in [1, None] or i.start not in [0, None]: + out.append(i) + continue + out.append(slice(i.start, i.stop if i.stop is not None else size, i.step)) + return tuple(out) + + def _unzip2(iterable: Iterable[tuple[T, U]]) -> tuple[list[T], list[U]]: out0: list[T] = [] out1: list[U] = [] @@ -279,7 +295,10 @@ async def read_batch( chunk_array_batch, batch_info, strict=False ): if chunk_array is not None: - tmp = chunk_array[chunk_selection] + normalized_selection = normalize_slices( + chunk_selection, out[out_selection].shape + ) + tmp = chunk_array[normalized_selection] if drop_axes != (): tmp = tmp.squeeze(axis=drop_axes) out[out_selection] = tmp @@ -322,7 +341,9 @@ def _merge_chunk_array( for idx in range(chunk_spec.ndim) ) chunk_value = chunk_value[item] - chunk_array[chunk_selection] = chunk_value + + normalized_selection = normalize_slices(chunk_selection, chunk_value.shape) + chunk_array[normalized_selection] = chunk_value return chunk_array async def write_batch( From 26f63a36d5d12b4f93f8ce20dfce1e03c79b0607 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 3 Feb 2025 20:26:28 -0700 Subject: [PATCH 03/13] Revert "normalize in codec_pipeline" This reverts commit 234431cd6efb661c53e2a832a0e4ea4dca772c1b. --- src/zarr/core/codec_pipeline.py | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index 55fa0be694..583ca01c5e 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -35,22 +35,6 @@ U = TypeVar("U") -def normalize_slices( - idxr: tuple[int | slice, ...], shape: tuple[int, ...] -) -> tuple[int | slice, ...]: - # replace slice objects with stop==None with size - out = [] - for i, size in zip(idxr, shape, strict=False): - if not isinstance(i, slice): - out.append(i) - continue - if i.step not in [1, None] or i.start not in [0, None]: - out.append(i) - continue - out.append(slice(i.start, i.stop if i.stop is not None else size, i.step)) - return tuple(out) - - def _unzip2(iterable: Iterable[tuple[T, U]]) -> tuple[list[T], list[U]]: out0: list[T] = [] out1: list[U] = [] @@ -295,10 +279,7 @@ async def read_batch( chunk_array_batch, batch_info, strict=False ): if chunk_array is not None: - normalized_selection = normalize_slices( - chunk_selection, out[out_selection].shape - ) - tmp = chunk_array[normalized_selection] + tmp = chunk_array[chunk_selection] if drop_axes != (): tmp = tmp.squeeze(axis=drop_axes) out[out_selection] = tmp @@ -341,9 +322,7 @@ def _merge_chunk_array( for idx in range(chunk_spec.ndim) ) chunk_value = chunk_value[item] - - normalized_selection = normalize_slices(chunk_selection, chunk_value.shape) - chunk_array[normalized_selection] = chunk_value + chunk_array[chunk_selection] = chunk_value return chunk_array async def write_batch( From 1d11fcb0b0ff0a971cd508a55b2ddd18984f55b9 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 3 Feb 2025 20:27:04 -0700 Subject: [PATCH 04/13] Partially Revert "Skip reads when completely overwriting boundary chunks" This reverts commit edbba372de50bf70eb79c7b1deecf4828eab7340. --- src/zarr/core/indexing.py | 26 ++++---------------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/src/zarr/core/indexing.py b/src/zarr/core/indexing.py index ba89bc412f..733b2464ac 100644 --- a/src/zarr/core/indexing.py +++ b/src/zarr/core/indexing.py @@ -403,24 +403,16 @@ def __iter__(self) -> Iterator[ChunkDimProjection]: dim_chunk_sel_start = self.start - dim_offset dim_out_offset = 0 - # Use None to indicate this selection ends at the chunk edge - # This is useful for is_total_slice at boundary chunks, if self.stop > dim_limit: # selection ends after current chunk - dim_chunk_sel_stop = None # dim_chunk_len + dim_chunk_sel_stop = dim_chunk_len else: # selection ends within current chunk - if dim_chunk_ix == (self.nchunks - 1): - # all of the last chunk is included - dim_chunk_sel_stop = None - else: - dim_chunk_sel_stop = self.stop - dim_offset + dim_chunk_sel_stop = self.stop - dim_offset dim_chunk_sel = slice(dim_chunk_sel_start, dim_chunk_sel_stop, self.step) - dim_chunk_nitems = ceildiv( - ((dim_chunk_sel_stop or dim_chunk_len) - dim_chunk_sel_start), self.step - ) + dim_chunk_nitems = ceildiv((dim_chunk_sel_stop - dim_chunk_sel_start), self.step) # If there are no elements on the selection within this chunk, then skip if dim_chunk_nitems == 0: @@ -1386,17 +1378,7 @@ def is_total_slice(item: Selection, shape: ChunkCoords) -> bool: isinstance(dim_sel, slice) and ( (dim_sel == slice(None)) - or ( - dim_sel.stop is not None - and (dim_sel.stop - dim_sel.start == dim_len) - and (dim_sel.step in [1, None]) - ) - # starts exactly at a chunk - or ( - (dim_sel.start % dim_len == 0) - and dim_sel.stop is None - and (dim_sel.step in [1, None]) - ) + or ((dim_sel.stop - dim_sel.start == dim_len) and (dim_sel.step in [1, None])) ) ) for dim_sel, dim_len in zip(item, shape, strict=False) From 0e00eb3118e18dfd8a0f96823518092f5e3c6464 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 3 Feb 2025 20:42:39 -0700 Subject: [PATCH 05/13] Different approach --- src/zarr/abc/codec.py | 4 +-- src/zarr/codecs/sharding.py | 14 +++++--- src/zarr/core/array.py | 6 ++-- src/zarr/core/codec_pipeline.py | 60 +++++++++++++++++++-------------- src/zarr/core/indexing.py | 59 +++++++++++--------------------- tests/test_indexing.py | 9 ----- 6 files changed, 69 insertions(+), 83 deletions(-) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index fabd042dbe..16400f5f4b 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -357,7 +357,7 @@ async def encode( @abstractmethod async def read( self, - batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]], + batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], out: NDBuffer, drop_axes: tuple[int, ...] = (), ) -> None: @@ -379,7 +379,7 @@ async def read( @abstractmethod async def write( self, - batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]], + batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], value: NDBuffer, drop_axes: tuple[int, ...] = (), ) -> None: diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index e8730c86dd..459805d808 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -455,8 +455,9 @@ async def _decode_single( chunk_spec, chunk_selection, out_selection, + is_complete_shard, ) - for chunk_coords, chunk_selection, out_selection in indexer + for chunk_coords, chunk_selection, out_selection, is_complete_shard in indexer ], out, ) @@ -486,7 +487,7 @@ async def _decode_partial_single( ) indexed_chunks = list(indexer) - all_chunk_coords = {chunk_coords for chunk_coords, _, _ in indexed_chunks} + all_chunk_coords = {chunk_coords for chunk_coords, *_ in indexed_chunks} # reading bytes of all requested chunks shard_dict: ShardMapping = {} @@ -524,8 +525,9 @@ async def _decode_partial_single( chunk_spec, chunk_selection, out_selection, + is_complete_shard, ) - for chunk_coords, chunk_selection, out_selection in indexer + for chunk_coords, chunk_selection, out_selection, is_complete_shard in indexer ], out, ) @@ -558,8 +560,9 @@ async def _encode_single( chunk_spec, chunk_selection, out_selection, + is_complete_shard, ) - for chunk_coords, chunk_selection, out_selection in indexer + for chunk_coords, chunk_selection, out_selection, is_complete_shard in indexer ], shard_array, ) @@ -601,8 +604,9 @@ async def _encode_partial_single( chunk_spec, chunk_selection, out_selection, + is_complete_shard, ) - for chunk_coords, chunk_selection, out_selection in indexer + for chunk_coords, chunk_selection, out_selection, is_complete_shard in indexer ], shard_array, ) diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 4c444a81fa..9e2fdf3733 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -1290,8 +1290,9 @@ async def _get_selection( self.metadata.get_chunk_spec(chunk_coords, _config, prototype=prototype), chunk_selection, out_selection, + is_complete_chunk, ) - for chunk_coords, chunk_selection, out_selection in indexer + for chunk_coords, chunk_selection, out_selection, is_complete_chunk in indexer ], out_buffer, drop_axes=indexer.drop_axes, @@ -1417,8 +1418,9 @@ async def _set_selection( self.metadata.get_chunk_spec(chunk_coords, _config, prototype), chunk_selection, out_selection, + is_complete_chunk, ) - for chunk_coords, chunk_selection, out_selection in indexer + for chunk_coords, chunk_selection, out_selection, is_complete_chunk in indexer ], value_buffer, drop_axes=indexer.drop_axes, diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index 583ca01c5e..727c45680e 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -16,7 +16,7 @@ ) from zarr.core.common import ChunkCoords, concurrent_map from zarr.core.config import config -from zarr.core.indexing import SelectorTuple, is_scalar, is_total_slice +from zarr.core.indexing import SelectorTuple, is_scalar from zarr.core.metadata.v2 import _default_fill_value from zarr.registry import register_pipeline @@ -230,7 +230,7 @@ async def encode_partial_batch( async def read_batch( self, - batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]], + batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], out: NDBuffer, drop_axes: tuple[int, ...] = (), ) -> None: @@ -238,10 +238,10 @@ async def read_batch( chunk_array_batch = await self.decode_partial_batch( [ (byte_getter, chunk_selection, chunk_spec) - for byte_getter, chunk_spec, chunk_selection, _ in batch_info + for byte_getter, chunk_spec, chunk_selection, *_ in batch_info ] ) - for chunk_array, (_, chunk_spec, _, out_selection) in zip( + for chunk_array, (_, chunk_spec, _, out_selection, _) in zip( chunk_array_batch, batch_info, strict=False ): if chunk_array is not None: @@ -260,22 +260,19 @@ async def read_batch( out[out_selection] = fill_value else: chunk_bytes_batch = await concurrent_map( - [ - (byte_getter, array_spec.prototype) - for byte_getter, array_spec, _, _ in batch_info - ], + [(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info], lambda byte_getter, prototype: byte_getter.get(prototype), config.get("async.concurrency"), ) chunk_array_batch = await self.decode_batch( [ (chunk_bytes, chunk_spec) - for chunk_bytes, (_, chunk_spec, _, _) in zip( + for chunk_bytes, (_, chunk_spec, *_) in zip( chunk_bytes_batch, batch_info, strict=False ) ], ) - for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip( + for chunk_array, (_, chunk_spec, chunk_selection, out_selection, _) in zip( chunk_array_batch, batch_info, strict=False ): if chunk_array is not None: @@ -296,9 +293,10 @@ def _merge_chunk_array( out_selection: SelectorTuple, chunk_spec: ArraySpec, chunk_selection: SelectorTuple, + is_complete_chunk: bool, drop_axes: tuple[int, ...], ) -> NDBuffer: - if is_total_slice(chunk_selection, chunk_spec.shape) and value.shape == chunk_spec.shape: + if is_complete_chunk and value.shape == chunk_spec.shape: return value if existing_chunk_array is None: chunk_array = chunk_spec.prototype.nd_buffer.create( @@ -327,7 +325,7 @@ def _merge_chunk_array( async def write_batch( self, - batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]], + batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], value: NDBuffer, drop_axes: tuple[int, ...] = (), ) -> None: @@ -337,14 +335,14 @@ async def write_batch( await self.encode_partial_batch( [ (byte_setter, value, chunk_selection, chunk_spec) - for byte_setter, chunk_spec, chunk_selection, out_selection in batch_info + for byte_setter, chunk_spec, chunk_selection, out_selection, _ in batch_info ], ) else: await self.encode_partial_batch( [ (byte_setter, value[out_selection], chunk_selection, chunk_spec) - for byte_setter, chunk_spec, chunk_selection, out_selection in batch_info + for byte_setter, chunk_spec, chunk_selection, out_selection, _ in batch_info ], ) @@ -361,10 +359,10 @@ async def _read_key( chunk_bytes_batch = await concurrent_map( [ ( - None if is_total_slice(chunk_selection, chunk_spec.shape) else byte_setter, + None if is_complete_chunk else byte_setter, chunk_spec.prototype, ) - for byte_setter, chunk_spec, chunk_selection, _ in batch_info + for byte_setter, chunk_spec, chunk_selection, _, is_complete_chunk in batch_info ], _read_key, config.get("async.concurrency"), @@ -372,7 +370,7 @@ async def _read_key( chunk_array_decoded = await self.decode_batch( [ (chunk_bytes, chunk_spec) - for chunk_bytes, (_, chunk_spec, _, _) in zip( + for chunk_bytes, (_, chunk_spec, *_) in zip( chunk_bytes_batch, batch_info, strict=False ) ], @@ -380,14 +378,24 @@ async def _read_key( chunk_array_merged = [ self._merge_chunk_array( - chunk_array, value, out_selection, chunk_spec, chunk_selection, drop_axes - ) - for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip( - chunk_array_decoded, batch_info, strict=False + chunk_array, + value, + out_selection, + chunk_spec, + chunk_selection, + is_complete_chunk, + drop_axes, ) + for chunk_array, ( + _, + chunk_spec, + chunk_selection, + out_selection, + is_complete_chunk, + ) in zip(chunk_array_decoded, batch_info, strict=False) ] chunk_array_batch: list[NDBuffer | None] = [] - for chunk_array, (_, chunk_spec, _, _) in zip( + for chunk_array, (_, chunk_spec, *_) in zip( chunk_array_merged, batch_info, strict=False ): if chunk_array is None: @@ -403,7 +411,7 @@ async def _read_key( chunk_bytes_batch = await self.encode_batch( [ (chunk_array, chunk_spec) - for chunk_array, (_, chunk_spec, _, _) in zip( + for chunk_array, (_, chunk_spec, *_) in zip( chunk_array_batch, batch_info, strict=False ) ], @@ -418,7 +426,7 @@ async def _write_key(byte_setter: ByteSetter, chunk_bytes: Buffer | None) -> Non await concurrent_map( [ (byte_setter, chunk_bytes) - for chunk_bytes, (byte_setter, _, _, _) in zip( + for chunk_bytes, (byte_setter, *_) in zip( chunk_bytes_batch, batch_info, strict=False ) ], @@ -446,7 +454,7 @@ async def encode( async def read( self, - batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]], + batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], out: NDBuffer, drop_axes: tuple[int, ...] = (), ) -> None: @@ -461,7 +469,7 @@ async def read( async def write( self, - batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]], + batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], value: NDBuffer, drop_axes: tuple[int, ...] = (), ) -> None: diff --git a/src/zarr/core/indexing.py b/src/zarr/core/indexing.py index 733b2464ac..eca315b5ef 100644 --- a/src/zarr/core/indexing.py +++ b/src/zarr/core/indexing.py @@ -321,12 +321,12 @@ class ChunkDimProjection(NamedTuple): Selection of items from chunk array. dim_out_sel Selection of items in target (output) array. - """ dim_chunk_ix: int dim_chunk_sel: Selector dim_out_sel: Selector | None + is_complete_chunk: bool @dataclass(frozen=True) @@ -346,7 +346,8 @@ def __iter__(self) -> Iterator[ChunkDimProjection]: dim_offset = dim_chunk_ix * self.dim_chunk_len dim_chunk_sel = self.dim_sel - dim_offset dim_out_sel = None - yield ChunkDimProjection(dim_chunk_ix, dim_chunk_sel, dim_out_sel) + is_complete_chunk = self.dim_chunk_len == 1 + yield ChunkDimProjection(dim_chunk_ix, dim_chunk_sel, dim_out_sel, is_complete_chunk) @dataclass(frozen=True) @@ -420,7 +421,8 @@ def __iter__(self) -> Iterator[ChunkDimProjection]: dim_out_sel = slice(dim_out_offset, dim_out_offset + dim_chunk_nitems) - yield ChunkDimProjection(dim_chunk_ix, dim_chunk_sel, dim_out_sel) + is_complete_chunk = dim_chunk_sel_start == 0 and (self.stop >= dim_limit) + yield ChunkDimProjection(dim_chunk_ix, dim_chunk_sel, dim_out_sel, is_complete_chunk) def check_selection_length(selection: SelectionNormalized, shape: ChunkCoords) -> None: @@ -493,12 +495,14 @@ class ChunkProjection(NamedTuple): Selection of items from chunk array. out_selection Selection of items in target (output) array. - + is_complete_chunk: + True if a complete chunk is indexed """ chunk_coords: ChunkCoords chunk_selection: tuple[Selector, ...] | npt.NDArray[np.intp] out_selection: tuple[Selector, ...] | npt.NDArray[np.intp] | slice + is_complete_chunk: bool def is_slice(s: Any) -> TypeGuard[slice]: @@ -574,8 +578,8 @@ def __iter__(self) -> Iterator[ChunkProjection]: out_selection = tuple( p.dim_out_sel for p in dim_projections if p.dim_out_sel is not None ) - - yield ChunkProjection(chunk_coords, chunk_selection, out_selection) + is_complete_chunk = all(p.is_complete_chunk for p in dim_projections) + yield ChunkProjection(chunk_coords, chunk_selection, out_selection, is_complete_chunk) @dataclass(frozen=True) @@ -643,8 +647,9 @@ def __iter__(self) -> Iterator[ChunkDimProjection]: start = self.chunk_nitems_cumsum[dim_chunk_ix - 1] stop = self.chunk_nitems_cumsum[dim_chunk_ix] dim_out_sel = slice(start, stop) + is_complete_chunk = False # TODO - yield ChunkDimProjection(dim_chunk_ix, dim_chunk_sel, dim_out_sel) + yield ChunkDimProjection(dim_chunk_ix, dim_chunk_sel, dim_out_sel, is_complete_chunk) class Order(Enum): @@ -783,8 +788,8 @@ def __iter__(self) -> Iterator[ChunkDimProjection]: # find region in chunk dim_offset = dim_chunk_ix * self.dim_chunk_len dim_chunk_sel = self.dim_sel[start:stop] - dim_offset - - yield ChunkDimProjection(dim_chunk_ix, dim_chunk_sel, dim_out_sel) + is_complete_chunk = False # TODO + yield ChunkDimProjection(dim_chunk_ix, dim_chunk_sel, dim_out_sel, is_complete_chunk) def slice_to_range(s: slice, length: int) -> range: @@ -921,7 +926,8 @@ def __iter__(self) -> Iterator[ChunkProjection]: if not is_basic_selection(out_selection): out_selection = ix_(out_selection, self.shape) - yield ChunkProjection(chunk_coords, chunk_selection, out_selection) + is_complete_chunk = all(p.is_complete_chunk for p in dim_projections) + yield ChunkProjection(chunk_coords, chunk_selection, out_selection, is_complete_chunk) @dataclass(frozen=True) @@ -1030,8 +1036,8 @@ def __iter__(self) -> Iterator[ChunkProjection]: out_selection = tuple( p.dim_out_sel for p in dim_projections if p.dim_out_sel is not None ) - - yield ChunkProjection(chunk_coords, chunk_selection, out_selection) + is_complete_chunk = all(p.is_complete_chunk for p in dim_projections) + yield ChunkProjection(chunk_coords, chunk_selection, out_selection, is_complete_chunk) @dataclass(frozen=True) @@ -1198,7 +1204,8 @@ def __iter__(self) -> Iterator[ChunkProjection]: for (dim_sel, dim_chunk_offset) in zip(self.selection, chunk_offsets, strict=True) ) - yield ChunkProjection(chunk_coords, chunk_selection, out_selection) + is_complete_chunk = False # TODO + yield ChunkProjection(chunk_coords, chunk_selection, out_selection, is_complete_chunk) @dataclass(frozen=True) @@ -1361,32 +1368,6 @@ def c_order_iter(chunks_per_shard: ChunkCoords) -> Iterator[ChunkCoords]: return itertools.product(*(range(x) for x in chunks_per_shard)) -def is_total_slice(item: Selection, shape: ChunkCoords) -> bool: - """Determine whether `item` specifies a complete slice of array with the - given `shape`. Used to optimize __setitem__ operations on the Chunk - class.""" - - # N.B., assume shape is normalized - if item == slice(None): - return True - if isinstance(item, slice): - item = (item,) - if isinstance(item, tuple): - return all( - (isinstance(dim_sel, int) and dim_len == 1) - or ( - isinstance(dim_sel, slice) - and ( - (dim_sel == slice(None)) - or ((dim_sel.stop - dim_sel.start == dim_len) and (dim_sel.step in [1, None])) - ) - ) - for dim_sel, dim_len in zip(item, shape, strict=False) - ) - else: - raise TypeError(f"expected slice or tuple of slices, found {item!r}") - - def get_indexer( selection: SelectionWithFields, shape: ChunkCoords, chunk_grid: ChunkGrid ) -> Indexer: diff --git a/tests/test_indexing.py b/tests/test_indexing.py index 56f1870994..30d0d75f22 100644 --- a/tests/test_indexing.py +++ b/tests/test_indexing.py @@ -19,7 +19,6 @@ OrthogonalSelection, Selection, _iter_grid, - is_total_slice, make_slice_selection, normalize_integer_selection, oindex, @@ -1954,11 +1953,3 @@ def test_vectorized_indexing_incompatible_shape(store) -> None: ) with pytest.raises(ValueError, match="Attempting to set"): arr[np.array([1, 2]), np.array([1, 2])] = np.array([[-1, -2], [-3, -4]]) - - -def test_is_total_slice(): - assert is_total_slice((0, slice(4, 6)), (1, 2)) - assert is_total_slice((slice(0, 1, None), slice(4, 6)), (1, 2)) - assert is_total_slice((slice(0, 1, None), slice(4, None)), (1, 2)) - # slice(5, None) starts in the middle of a chunk - assert not is_total_slice((slice(0, 1, None), slice(5, None)), (1, 2)) From 74d0995461c8fabba7369e9e57e9defc655210f7 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 5 Feb 2025 08:34:37 -0700 Subject: [PATCH 06/13] fix bug --- src/zarr/core/indexing.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/zarr/core/indexing.py b/src/zarr/core/indexing.py index eca315b5ef..c197f6f397 100644 --- a/src/zarr/core/indexing.py +++ b/src/zarr/core/indexing.py @@ -421,7 +421,9 @@ def __iter__(self) -> Iterator[ChunkDimProjection]: dim_out_sel = slice(dim_out_offset, dim_out_offset + dim_chunk_nitems) - is_complete_chunk = dim_chunk_sel_start == 0 and (self.stop >= dim_limit) + is_complete_chunk = ( + dim_chunk_sel_start == 0 and (self.stop >= dim_limit) and self.step in [1, None] + ) yield ChunkDimProjection(dim_chunk_ix, dim_chunk_sel, dim_out_sel, is_complete_chunk) From 3a85bdf935d19b4261e6caff421f453e02d02b11 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 5 Feb 2025 09:16:25 -0700 Subject: [PATCH 07/13] add oindex property test --- src/zarr/testing/strategies.py | 28 ++++++++++++++++++++++++++++ tests/test_properties.py | 22 ++++++++++++++++++++-- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/src/zarr/testing/strategies.py b/src/zarr/testing/strategies.py index b948651ce6..0cdbc0f79d 100644 --- a/src/zarr/testing/strategies.py +++ b/src/zarr/testing/strategies.py @@ -188,6 +188,34 @@ def basic_indices(draw: st.DrawFn, *, shape: tuple[int], **kwargs: Any) -> Any: ) +@st.composite # type: ignore[misc] +def orthogonal_indices( + draw: st.DrawFn, *, shape: tuple[int] +) -> tuple[tuple[np.ndarray[Any, Any], ...], tuple[np.ndarray[Any, Any], ...]]: + """ + Strategy that returns + (1) a tuple of integer arrays used for orthogonal indexing of Zarr arrays. + (2) an tuple of integer arrays that can be used for equivalent indexing of numpy arrays + """ + zindexer = [] + npindexer = [] + ndim = len(shape) + for axis, size in enumerate(shape): + (idxr,) = draw( + npst.integer_array_indices( + shape=(size,), result_shape=npst.array_shapes(min_side=1, max_side=size, max_dims=1) + ) + # | npst.basic_indices(shape=(size,), allow_ellipsis=False) + ) + zindexer.append(idxr) + if isinstance(idxr, np.ndarray): + newshape = [1] * ndim + newshape[axis] = idxr.size + idxr = idxr.reshape(newshape) + npindexer.append(idxr) + return tuple(zindexer), np.broadcast_arrays(*npindexer) + + def key_ranges( keys: SearchStrategy = node_names, max_size: int | None = None ) -> SearchStrategy[list[int]]: diff --git a/tests/test_properties.py b/tests/test_properties.py index 2e60c951dd..ee03e90008 100644 --- a/tests/test_properties.py +++ b/tests/test_properties.py @@ -6,9 +6,15 @@ import hypothesis.extra.numpy as npst import hypothesis.strategies as st -from hypothesis import given +from hypothesis import given, settings -from zarr.testing.strategies import arrays, basic_indices, numpy_arrays, zarr_formats +from zarr.testing.strategies import ( + arrays, + basic_indices, + numpy_arrays, + orthogonal_indices, + zarr_formats, +) @given(data=st.data(), zarr_format=zarr_formats) @@ -32,6 +38,18 @@ def test_basic_indexing(data: st.DataObject) -> None: assert_array_equal(nparray, zarray[:]) +@settings(report_multiple_bugs=False) +@given(data=st.data()) +def test_oindex(data: st.DataObject) -> None: + # integer_array_indices can't handle 0-size dimensions. + zarray = data.draw(arrays(shapes=npst.array_shapes(max_dims=4, min_side=1))) + nparray = zarray[:] + + zindexer, npindexer = data.draw(orthogonal_indices(shape=nparray.shape)) + actual = zarray.oindex[zindexer] + assert_array_equal(nparray[npindexer], actual) + + @given(data=st.data()) def test_vindex(data: st.DataObject) -> None: # integer_array_indices can't handle 0-size dimensions. From a0fb5f0e5d94c3dd07f9eec6e84760c32d1b8a3e Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 5 Feb 2025 09:27:21 -0700 Subject: [PATCH 08/13] more complex oindex test --- src/zarr/testing/strategies.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/zarr/testing/strategies.py b/src/zarr/testing/strategies.py index 0cdbc0f79d..bf4d5dd125 100644 --- a/src/zarr/testing/strategies.py +++ b/src/zarr/testing/strategies.py @@ -205,14 +205,20 @@ def orthogonal_indices( npst.integer_array_indices( shape=(size,), result_shape=npst.array_shapes(min_side=1, max_side=size, max_dims=1) ) - # | npst.basic_indices(shape=(size,), allow_ellipsis=False) + | basic_indices(shape=(size,), allow_ellipsis=False).map( + lambda x: (x,) if not isinstance(x, tuple) else x + ) ) + if isinstance(idxr, int): + idxr = np.array([idxr]) zindexer.append(idxr) - if isinstance(idxr, np.ndarray): - newshape = [1] * ndim - newshape[axis] = idxr.size - idxr = idxr.reshape(newshape) - npindexer.append(idxr) + if isinstance(idxr, slice): + idxr = np.arange(*idxr.indices(size)) + elif isinstance(idxr, (tuple, int)): + idxr = np.array(idxr) + newshape = [1] * ndim + newshape[axis] = idxr.size + npindexer.append(idxr.reshape(newshape)) return tuple(zindexer), np.broadcast_arrays(*npindexer) From 9cf7c4b880e16c9976b0f94cb32eb6edaa5443ac Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 5 Feb 2025 09:28:26 -0700 Subject: [PATCH 09/13] cleanup --- tests/test_properties.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_properties.py b/tests/test_properties.py index ee03e90008..cfa6a706d8 100644 --- a/tests/test_properties.py +++ b/tests/test_properties.py @@ -6,7 +6,7 @@ import hypothesis.extra.numpy as npst import hypothesis.strategies as st -from hypothesis import given, settings +from hypothesis import given from zarr.testing.strategies import ( arrays, @@ -38,7 +38,6 @@ def test_basic_indexing(data: st.DataObject) -> None: assert_array_equal(nparray, zarray[:]) -@settings(report_multiple_bugs=False) @given(data=st.data()) def test_oindex(data: st.DataObject) -> None: # integer_array_indices can't handle 0-size dimensions. From 6882137930a3be23c1439e6828fe67d1d1228b05 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 5 Feb 2025 09:32:16 -0700 Subject: [PATCH 10/13] more oindex --- src/zarr/testing/strategies.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/zarr/testing/strategies.py b/src/zarr/testing/strategies.py index bf4d5dd125..f5c0f5c716 100644 --- a/src/zarr/testing/strategies.py +++ b/src/zarr/testing/strategies.py @@ -201,14 +201,15 @@ def orthogonal_indices( npindexer = [] ndim = len(shape) for axis, size in enumerate(shape): - (idxr,) = draw( + val = draw( npst.integer_array_indices( shape=(size,), result_shape=npst.array_shapes(min_side=1, max_side=size, max_dims=1) ) - | basic_indices(shape=(size,), allow_ellipsis=False).map( - lambda x: (x,) if not isinstance(x, tuple) else x - ) + | basic_indices(min_dims=1, shape=(size,), allow_ellipsis=False) + .map(lambda x: (x,) if not isinstance(x, tuple) else x) # bare ints, slices + .filter(lambda x: bool(x)) # skip empty tuple ) + (idxr,) = val if isinstance(idxr, int): idxr = np.array([idxr]) zindexer.append(idxr) From 2ca817039e911a0ef6687dc06413527a24d1e353 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 5 Feb 2025 09:38:26 -0700 Subject: [PATCH 11/13] Add changelog entry --- changes/2784.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/2784.feature.rst diff --git a/changes/2784.feature.rst b/changes/2784.feature.rst new file mode 100644 index 0000000000..e3218e6df0 --- /dev/null +++ b/changes/2784.feature.rst @@ -0,0 +1 @@ +Avoid reading chunks during writes where possible. :issue:`757` From b2be002cd2482d0ad29c418d97110ceac35ad7d2 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 5 Feb 2025 09:45:52 -0700 Subject: [PATCH 12/13] [revert] note --- src/zarr/testing/strategies.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/zarr/testing/strategies.py b/src/zarr/testing/strategies.py index f5c0f5c716..94e4cdbe1e 100644 --- a/src/zarr/testing/strategies.py +++ b/src/zarr/testing/strategies.py @@ -220,6 +220,9 @@ def orthogonal_indices( newshape = [1] * ndim newshape[axis] = idxr.size npindexer.append(idxr.reshape(newshape)) + from hypothesis import note + + note(val) return tuple(zindexer), np.broadcast_arrays(*npindexer) From 3017a99ab613aa6753e91cce0fd864c1c651b737 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 5 Feb 2025 09:55:48 -0700 Subject: [PATCH 13/13] fix for numpy 1.25 --- src/zarr/testing/strategies.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/zarr/testing/strategies.py b/src/zarr/testing/strategies.py index 94e4cdbe1e..95e242c252 100644 --- a/src/zarr/testing/strategies.py +++ b/src/zarr/testing/strategies.py @@ -220,10 +220,9 @@ def orthogonal_indices( newshape = [1] * ndim newshape[axis] = idxr.size npindexer.append(idxr.reshape(newshape)) - from hypothesis import note - note(val) - return tuple(zindexer), np.broadcast_arrays(*npindexer) + # casting the output of broadcast_arrays is needed for numpy 1.25 + return tuple(zindexer), tuple(np.broadcast_arrays(*npindexer)) def key_ranges(