Skip to content

[v3] zarr.array from from an existing zarr.Array #2410

Closed
@jhamman

Description

@jhamman

Zarr version

3.0.0.beta

Numcodecs version

0.13

Python Version

3.11

Operating System

Mac

Installation

pip

Description

It used to be possible to create a one zarr array from another. This is currently broken but would be interested to bring back in a more optimized form using the underlying async array for streaming.

Steps to reproduce

In [24]: a = zarr.zeros((10000, 10000), chunks=(100,100), dtype='uint16',
    ...: store='a.zarr')

In [25]: b = zarr.array(a, chunks=(100, 200), store='b.zarr')
--------------------------------------------------------------------------
SyncError                                Traceback (most recent call last)
Cell In[25], line 1
----> 1 b = zarr.array(a, chunks=(100, 200), store='b.zarr')

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/api/synchronous.py:164, in array(data, **kwargs)
    163 def array(data: NDArrayLike, **kwargs: Any) -> Array:
--> 164     return Array(sync(async_api.array(data=data, **kwargs)))

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/sync.py:141, in sync(coro, loop, timeout)
    138 return_result = next(iter(finished)).result()
    140 if isinstance(return_result, BaseException):
--> 141     raise return_result
    142 else:
    143     return return_result

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/sync.py:100, in _runner(coro)
     95 """
     96 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
     97 exception, the exception will be returned.
     98 """
     99 try:
--> 100     return await coro
    101 except Exception as ex:
    102     return ex

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/api/asynchronous.py:529, in array(data, **kwargs)
    526 z = await create(**kwargs)
    528 # fill with data
--> 529 await z.setitem(slice(None), data)
    531 return z

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/array.py:880, in AsyncArray.setitem(self, selection, value, prototype)
    874     prototype = default_buffer_prototype()
    875 indexer = BasicIndexer(
    876     selection,
    877     shape=self.metadata.shape,
    878     chunk_grid=self.metadata.chunk_grid,
    879 )
--> 880 return await self._set_selection(indexer, value, prototype=prototype)

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/array.py:853, in AsyncArray._set_selection(self, indexer, value, prototype, fields)
    850 value_buffer = prototype.nd_buffer.from_ndarray_like(value)
    852 # merging with existing data and encoding chunks
--> 853 await self.codec_pipeline.write(
    854     [
    855         (
    856             self.store_path / self.metadata.encode_chunk_key(chunk_coords),
    857             self.metadata.get_chunk_spec(chunk_coords, self.order, prototype),
    858             chunk_selection,
    859             out_selection,
    860         )
    861         for chunk_coords, chunk_selection, out_selection in indexer
    862     ],
    863     value_buffer,
    864     drop_axes=indexer.drop_axes,
    865 )

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/codecs/pipeline.py:456, in BatchedCodecPipeline.write(self, batch_info, value, drop_axes)
    450 async def write(
    451     self,
    452     batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]],
    453     value: NDBuffer,
    454     drop_axes: tuple[int, ...] = (),
    455 ) -> None:
--> 456     await concurrent_map(
    457         [
    458             (single_batch_info, value, drop_axes)
    459             for single_batch_info in batched(batch_info, self.batch_size)
    460         ],
    461         self.write_batch,
    462         config.get("async.concurrency"),
    463     )

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/common.py:64, in concurrent_map(items, func, limit)
     61     async with sem:
     62         return await func(*item)
---> 64 return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/common.py:62, in concurrent_map.<locals>.run(item)
     60 async def run(item: tuple[Any]) -> V:
     61     async with sem:
---> 62         return await func(*item)

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/codecs/pipeline.py:374, in BatchedCodecPipeline.write_batch(self, batch_info, value, drop_axes)
    353 chunk_bytes_batch = await concurrent_map(
    354     [
    355         (
   (...)
    362     config.get("async.concurrency"),
    363 )
    364 chunk_array_batch = await self.decode_batch(
    365     [
    366         (chunk_bytes, chunk_spec)
   (...)
    370     ],
    371 )
    373 chunk_array_batch = [
--> 374     self._merge_chunk_array(
    375         chunk_array, value, out_selection, chunk_spec, chunk_selection, drop_axes
    376     )
    377     for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip(
    378         chunk_array_batch, batch_info, strict=False
    379     )
    380 ]
    382 chunk_array_batch = [
    383     None
    384     if chunk_array is None or chunk_array.all_equal(chunk_spec.fill_value)
   (...)
    388     )
    389 ]
    391 chunk_bytes_batch = await self.encode_batch(
    392     [
    393         (chunk_array, chunk_spec)
   (...)
    397     ],
    398 )

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/codecs/pipeline.py:316, in BatchedCodecPipeline._merge_chunk_array(self, existing_chunk_array, value, out_selection, chunk_spec, chunk_selection, drop_axes)
    314     chunk_value = value
    315 else:
--> 316     chunk_value = value[out_selection]
    317     # handle missing singleton dimensions
    318     if drop_axes != ():

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/buffer/cpu.py:180, in NDBuffer.__getitem__(self, key)
    179 def __getitem__(self, key: Any) -> Self:
--> 180     return self.__class__(np.asanyarray(self._data.__getitem__(key)))

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/array.py:1327, in Array.__getitem__(self, selection)
   1325     return self.vindex[cast(CoordinateSelection | MaskSelection, selection)]
   1326 elif is_pure_orthogonal_indexing(pure_selection, self.ndim):
-> 1327     return self.get_orthogonal_selection(pure_selection, fields=fields)
   1328 else:
   1329     return self.get_basic_selection(cast(BasicSelection, pure_selection), fields=fields)

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/_compat.py:43, in _deprecate_positional_args.<locals>._inner_deprecate_positional_args.<locals>.inner_f(*args, **kwargs)
     41 extra_args = len(args) - len(all_args)
     42 if extra_args <= 0:
---> 43     return f(*args, **kwargs)
     45 # extra_args > 0
     46 args_msg = [
     47     f"{name}={arg}"
     48     for name, arg in zip(kwonly_args[:extra_args], args[-extra_args:], strict=False)
     49 ]

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/array.py:1769, in Array.get_orthogonal_selection(self, selection, out, fields, prototype)
   1767     prototype = default_buffer_prototype()
   1768 indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid)
-> 1769 return sync(
   1770     self._async_array._get_selection(
   1771         indexer=indexer, out=out, fields=fields, prototype=prototype
   1772     )
   1773 )

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/sync.py:128, in sync(coro, loop, timeout)
    126     loop0 = asyncio.events.get_running_loop()
    127     if loop0 is loop:
--> 128         raise SyncError("Calling sync() from within a running loop")
    129 except RuntimeError:
    130     pass

SyncError: Calling sync() from within a running loop

Additional output

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugPotential issues with the zarr-python library

    Type

    No type

    Projects

    Status

    Done

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions