-
-
Notifications
You must be signed in to change notification settings - Fork 329
[v3] zarr-python
fails to decode sharded array written by other implementations.
#2302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I think the error occurs due to an error in sharding |
Thanks @brokkoli71 for looking into this. I generated new array (at path new stacktrace for index_location=endFile ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/array.py:919,
in Array.__getitem__(self, selection)
917 return self.vindex[cast(CoordinateSelection | MaskSelection, selection)]
918 elif is_pure_orthogonal_indexing(pure_selection, self.ndim):
--> 919 return self.get_orthogonal_selection(pure_selection, fields=fields)
920 else:
921 return self.get_basic_selection(cast(BasicSelection, pure_selection), fi
elds=fields)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/_compat.py:43, in _
deprecate_positional_args.<locals>._inner_deprecate_positional_args.<locals>.inner_f
(*args, **kwargs)
41 extra_args = len(args) - len(all_args)
42 if extra_args <= 0:
---> 43 return f(*args, **kwargs)
45 # extra_args > 0
46 args_msg = [
47 f"{name}={arg}"
48 for name, arg in zip(kwonly_args[:extra_args], args[-extra_args:], stric
t=False)
49 ]
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/array.py:1361,
in Array.get_orthogonal_selection(self, selection, out, fields, prototype)
1359 prototype = default_buffer_prototype()
1360 indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid)
-> 1361 return sync(
1362 self._async_array._get_selection(
1363 indexer=indexer, out=out, fields=fields, prototype=prototype
1364 )
1365 )
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/sync.py:91, in
sync(coro, loop, timeout)
88 return_result = next(iter(finished)).result()
90 if isinstance(return_result, BaseException):
---> 91 raise return_result
92 else:
93 return return_result
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/sync.py:50, in
_runner(coro)
45 """
46 Await a coroutine and return the result of running it. If awaiting the corou
tine raises an
47 exception, the exception will be returned.
48 """
49 try:
---> 50 return await coro
51 except Exception as ex:
52 return ex
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/array.py:476,
in AsyncArray._get_selection(self, indexer, prototype, out, fields)
468 out_buffer = prototype.nd_buffer.create(
469 shape=indexer.shape,
470 dtype=out_dtype,
471 order=self.order,
472 fill_value=self.metadata.fill_value,
473 )
474 if product(indexer.shape) > 0:
475 # reading chunks and decoding them
--> 476 await self.codec_pipeline.read(
477 [
478 (
479 self.store_path / self.metadata.encode_chunk_key(chunk_coord
s),
480 self.metadata.get_chunk_spec(chunk_coords, self.order, proto
type=prototype),
481 chunk_selection,
482 out_selection,
483 )
484 for chunk_coords, chunk_selection, out_selection in indexer
485 ],
486 out_buffer,
487 drop_axes=indexer.drop_axes,
488 )
489 return out_buffer.as_ndarray_like()
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
427, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
421 async def read(
422 self,
423 batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, Selecto
rTuple]],
424 out: NDBuffer,
425 drop_axes: tuple[int, ...] = (),
426 ) -> None:
--> 427 await concurrent_map(
428 [
429 (single_batch_info, out, drop_axes)
430 for single_batch_info in batched(batch_info, self.batch_size)
431 ],
432 self.read_batch,
433 config.get("async.concurrency"),
434 )
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/common.py:53,
in concurrent_map(items, func, limit)
49 async def concurrent_map(
50 items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = N
one
51 ) -> list[V]:
52 if limit is None:
---> 53 return await asyncio.gather(*[func(*item) for item in items])
55 else:
56 sem = asyncio.Semaphore(limit)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
238, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
231 async def read_batch(
232 self,
233 batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, Selecto
rTuple]],
234 out: NDBuffer,
235 drop_axes: tuple[int, ...] = (),
236 ) -> None:
237 if self.supports_partial_decode:
--> 238 chunk_array_batch = await self.decode_partial_batch(
239 [
240 (byte_getter, chunk_selection, chunk_spec)
241 for byte_getter, chunk_spec, chunk_selection, _ in batch_inf
o
242 ]
243 )
244 for chunk_array, (_, chunk_spec, _, out_selection) in zip(
245 chunk_array_batch, batch_info, strict=False
246 ):
247 if chunk_array is not None:
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
194, in BatchedCodecPipeline.decode_partial_batch(self, batch_info)
192 assert self.supports_partial_decode
193 assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin)
--> 194 return await self.array_bytes_codec.decode_partial(batch_info)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/abc/codec.py:200, i
n ArrayBytesCodecPartialDecodeMixin.decode_partial(self, batch_info)
180 async def decode_partial(
181 self,
182 batch_info: Iterable[tuple[ByteGetter, SelectorTuple, ArraySpec]],
183 ) -> Iterable[NDBuffer | None]:
184 """Partially decodes a batch of chunks.
185 This method determines parts of a chunk from the slice selection,
186 fetches these parts from the store (via ByteGetter) and decodes them.
(...)
198 Iterable[NDBuffer | None]
199 """
--> 200 return await concurrent_map(
201 list(batch_info),
202 self._decode_partial_single,
203 config.get("async.concurrency"),
204 )
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/common.py:53,
in concurrent_map(items, func, limit)
49 async def concurrent_map(
50 items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = N
one
51 ) -> list[V]:
52 if limit is None:
---> 53 return await asyncio.gather(*[func(*item) for item in items])
55 else:
56 sem = asyncio.Semaphore(limit)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/sharding.py:
510, in ShardingCodec._decode_partial_single(self, byte_getter, selection, shard_spe
c)
507 shard_dict[chunk_coords] = chunk_bytes
509 # decoding chunks and writing them into the output buffer
--> 510 await self.codec_pipeline.read(
511 [
512 (
513 _ShardingByteGetter(shard_dict, chunk_coords),
514 chunk_spec,
515 chunk_selection,
516 out_selection,
517 )
518 for chunk_coords, chunk_selection, out_selection in indexer
519 ],
520 out,
521 )
522 return out
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
427, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
421 async def read(
422 self,
423 batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, Selecto
rTuple]],
424 out: NDBuffer,
425 drop_axes: tuple[int, ...] = (),
426 ) -> None:
--> 427 await concurrent_map(
428 [
429 (single_batch_info, out, drop_axes)
430 for single_batch_info in batched(batch_info, self.batch_size)
431 ],
432 self.read_batch,
433 config.get("async.concurrency"),
434 )
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/common.py:53,
in concurrent_map(items, func, limit)
49 async def concurrent_map(
50 items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = N
one
51 ) -> list[V]:
52 if limit is None:
---> 53 return await asyncio.gather(*[func(*item) for item in items])
55 else:
56 sem = asyncio.Semaphore(limit)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
260, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
251 else:
252 chunk_bytes_batch = await concurrent_map(
253 [
254 (byte_getter, array_spec.prototype)
(...)
258 config.get("async.concurrency"),
259 )
--> 260 chunk_array_batch = await self.decode_batch(
261 [
262 (chunk_bytes, chunk_spec)
263 for chunk_bytes, (_, chunk_spec, _, _) in zip(
264 chunk_bytes_batch, batch_info, strict=False
265 )
266 ],
267 )
268 for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip(
269 chunk_array_batch, batch_info, strict=False
270 ):
271 if chunk_array is not None:
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/pipeline.py:
177, in BatchedCodecPipeline.decode_batch(self, chunk_bytes_and_specs)
172 chunk_bytes_batch = await bb_codec.decode(
173 zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
174 )
176 ab_codec, chunk_spec_batch = ab_codec_with_spec
--> 177 chunk_array_batch = await ab_codec.decode(
178 zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
179 )
181 for aa_codec, chunk_spec_batch in aa_codecs_with_spec[::-1]:
182 chunk_array_batch = await aa_codec.decode(
183 zip(chunk_array_batch, chunk_spec_batch, strict=False)
184 )
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/abc/codec.py:125, i
n _Codec.decode(self, chunks_and_specs)
109 async def decode(
110 self,
111 chunks_and_specs: Iterable[tuple[CodecOutput | None, ArraySpec]],
112 ) -> Iterable[CodecInput | None]:
113 """Decodes a batch of chunks.
114 Chunks can be None in which case they are ignored by the codec.
115
(...)
123 Iterable[CodecInput | None]
124 """
--> 125 return await _batching_helper(self._decode_single, chunks_and_specs)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/abc/codec.py:409, i
n _batching_helper(func, batch_info)
405 async def _batching_helper(
406 func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
407 batch_info: Iterable[tuple[CodecInput | None, ArraySpec]],
408 ) -> list[CodecOutput | None]:
--> 409 return await concurrent_map(
410 list(batch_info),
411 _noop_for_none(func),
412 config.get("async.concurrency"),
413 )
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/common.py:53,
in concurrent_map(items, func, limit)
49 async def concurrent_map(
50 items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = N
one
51 ) -> list[V]:
52 if limit is None:
---> 53 return await asyncio.gather(*[func(*item) for item in items])
55 else:
56 sem = asyncio.Semaphore(limit)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/abc/codec.py:422, i
n _noop_for_none.<locals>.wrap(chunk, chunk_spec)
420 if chunk is None:
421 return None
--> 422 return await func(chunk, chunk_spec)
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/codecs/bytes.py:94,
in BytesCodec._decode_single(self, chunk_bytes, chunk_spec)
92 # ensure correct chunk shape
93 if chunk_array.shape != chunk_spec.shape:
---> 94 chunk_array = chunk_array.reshape(
95 chunk_spec.shape,
96 )
97 return chunk_array
File ~/micromamba/envs/general/lib/python3.10/site-packages/zarr/core/buffer/core.py
:444, in NDBuffer.reshape(self, newshape)
443 def reshape(self, newshape: ChunkCoords | Literal[-1]) -> Self:
--> 444 return self.__class__(self._data.reshape(newshape))
ValueError: cannot reshape array of size 150 into shape (5,5,3) |
Zarr version
3.0.0a5
Numcodecs version
0.13.0
Python Version
3.10.14
Operating System
Linux
Installation
pip install zarr==3.0.0a5
Description
It appears as though the implementation for partial decoding chunks written using the sharding indexed codec is buggy. As in, the implementation fails to read a sharding-encoded array created by another implementation. The same dataset can be read without problem by the
zarrita
project.Steps to reproduce
shardingdata.zip
I have attached a zip file containing the filesystem store
testdata.zarr
used to reproduce the bug.If I use
zarrita
to read the problematic array, I get no errors:Additional output
Here is the metadata of the "problematic" array:
Here is the stacktrace of the exception thrown by
zarr-python
when reading the sharded array:stacktrace
NOTE
It's worth noting that if I add a compression codec to the sharding config's codec pipeline (e.g Gzip with compression level 5); I get a different exception:
ValueError: cannot reshape array of size 225 into shape (5,5,3)
.zarrita
still reads the array correctly even in this scenario.Data generation code
The data inside the attached zip can be generated using the following python code:
Python code
The text was updated successfully, but these errors were encountered: