Skip to content

Commit df2dd71

Browse files
committed
fixe, and allow partial reads for uncompressed v3 arrays
1 parent f6c87b4 commit df2dd71

File tree

8 files changed

+207
-44
lines changed

8 files changed

+207
-44
lines changed

zarr/_storage/store.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,13 +272,17 @@ def get_partial_values(self, key_ranges):
272272
range_length may be None to indicate to read until the end.
273273
range_start may be negative to start reading range_start bytes
274274
from the end of the file.
275-
A key may occur multiple times with different ranges."""
275+
A key may occur multiple times with different ranges.
276+
Inserts None for missing keys into the returned list."""
276277
results = [None] * len(key_ranges)
277278
indexed_ranges_by_key = defaultdict(list)
278279
for i, (key, range_) in enumerate(key_ranges):
279280
indexed_ranges_by_key[key].append((i, range_))
280281
for key, indexed_ranges in indexed_ranges_by_key.items():
281-
value = self[key]
282+
try:
283+
value = self[key]
284+
except KeyError:
285+
continue
282286
for i, (range_from, range_length) in indexed_ranges:
283287
if range_length is None:
284288
results[i] = value[range_from:]

zarr/_storage/v3.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,37 @@ def rmdir(self, path=None):
182182
if self.fs.isdir(store_path):
183183
self.fs.rm(store_path, recursive=True)
184184

185+
def supports_efficient_get_partial_values(self):
186+
return True
187+
188+
def get_partial_values(self, key_ranges):
189+
"""Get multiple partial values.
190+
key_ranges can be an iterable of key, range pairs,
191+
where a range specifies two integers range_start and range_length
192+
as a tuple, (range_start, range_length).
193+
range_length may be None to indicate to read until the end.
194+
range_start may be negative to start reading range_start bytes
195+
from the end of the file.
196+
A key may occur multiple times with different ranges.
197+
Inserts None for missing keys into the returned list."""
198+
results = []
199+
for key, (range_start, range_length) in key_ranges:
200+
key = self._normalize_key(key)
201+
path = self.dir_path(key)
202+
try:
203+
if range_start < 0:
204+
if range_length is None:
205+
result = self.fs.tail(path, size=-range_start)
206+
else:
207+
size = self.fs.size(path)
208+
result = self.fs.read_block(path, size + range_start, range_length)
209+
else:
210+
result = self.fs.read_block(path, range_start, range_length)
211+
except self.map.missing_exceptions:
212+
result = None
213+
results.append(result)
214+
return results
215+
185216

186217
class MemoryStoreV3(MemoryStore, StoreV3):
187218

zarr/_storage/v3_storage_transformers.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def get_chunk_slice(self, chunk: Tuple[int, ...]) -> Optional[slice]:
3535
if (chunk_start, chunk_len) == (MAX_UINT_64, MAX_UINT_64):
3636
return None
3737
else:
38-
return slice(chunk_start, chunk_start + chunk_len)
38+
return slice(int(chunk_start), int(chunk_start + chunk_len))
3939

4040
def set_chunk_slice(
4141
self, chunk: Tuple[int, ...], chunk_slice: Optional[slice]
@@ -126,10 +126,13 @@ def _key_to_shard(self, chunk_key: str) -> Tuple[str, Tuple[int, ...]]:
126126

127127
def _get_index_from_store(self, shard_key: str) -> _ShardIndex:
128128
# At the end of each shard 2*64bit per chunk for offset and length define the index:
129+
index_bytes = self.inner_store.get_partial_values(
130+
[(shard_key, (-16 * self._num_chunks_per_shard, None))]
131+
)[0]
132+
if index_bytes is None:
133+
raise KeyError(shard_key)
129134
return _ShardIndex.from_bytes(
130-
self.inner_store.get_partial_values(
131-
[(shard_key, (-16 * self._num_chunks_per_shard, None))]
132-
)[0],
135+
index_bytes,
133136
self,
134137
)
135138

@@ -156,7 +159,11 @@ def __getitem__(self, key):
156159
if _is_data_key(key):
157160
if self.supports_efficient_get_partial_values():
158161
# Use the partial implementation, which fetches the index seperately
159-
return self.get_partial_values([(key, (0, None))])[0]
162+
value = self.get_partial_values([(key, (0, None))])[0]
163+
if value is None:
164+
raise KeyError(key)
165+
else:
166+
return value
160167
shard_key, chunk_subkey = self._key_to_shard(key)
161168
try:
162169
full_shard_value = self.inner_store[shard_key]
@@ -254,7 +261,7 @@ def __delitem__(self, key):
254261
del self.inner_store[shard_key]
255262
else:
256263
index_bytes = index.to_bytes()
257-
self.set_partial_values([(shard_key, -len(index_bytes), index_bytes)])
264+
self.inner_store.set_partial_values([(shard_key, -len(index_bytes), index_bytes)])
258265
else:
259266
del self.inner_store[key]
260267

@@ -281,22 +288,35 @@ def get_partial_values(self, key_ranges):
281288
if self.supports_efficient_get_partial_values():
282289
transformed_key_ranges = []
283290
cached_indices = {}
284-
for key, range_ in key_ranges:
291+
none_indices = []
292+
for i, (key, range_) in enumerate(key_ranges):
285293
if _is_data_key(key):
286294
shard_key, chunk_subkey = self._key_to_shard(key)
287295
try:
288296
index = cached_indices[shard_key]
289297
except KeyError:
290-
index = self._get_index_from_store(shard_key)
298+
try:
299+
index = self._get_index_from_store(shard_key)
300+
except KeyError:
301+
none_indices.append(i)
302+
continue
291303
cached_indices[shard_key] = index
292304
chunk_slice = index.get_chunk_slice(chunk_subkey)
305+
if chunk_slice is None:
306+
none_indices.append(i)
307+
continue
293308
range_start, range_length = range_
309+
if range_length is None:
310+
range_length = chunk_slice.stop - chunk_slice.start
294311
transformed_key_ranges.append(
295-
(shard_key, (range_start + chunk_slice.satrt, range_length))
312+
(shard_key, (range_start + chunk_slice.start, range_length))
296313
)
297314
else:
298315
transformed_key_ranges.append((key, range_))
299-
return self.inner_store.get_partial_values(transformed_key_ranges)
316+
values = self.inner_store.get_partial_values(transformed_key_ranges)
317+
for i in none_indices:
318+
values.insert(i, None)
319+
return values
300320
else:
301321
return StoreV3.get_partial_values(self, key_ranges)
302322

zarr/core.py

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
normalize_shape,
5252
normalize_storage_path,
5353
PartialReadBuffer,
54+
UncompressedPartialReadBufferV3,
5455
)
5556

5657

@@ -180,6 +181,7 @@ def __init__(
180181

181182
self._store = store
182183
self._chunk_store = chunk_store
184+
self._transformed_chunk_store = None
183185
self._path = normalize_storage_path(path)
184186
if self._path:
185187
self._key_prefix = self._path + '/'
@@ -282,17 +284,13 @@ def _load_metadata_nosync(self):
282284

283285
if self._version == 3:
284286
storage_transformers = meta.get('storage_transformers', [])
285-
transformed_store = self._store
286-
for storage_transformer in storage_transformers:
287-
transformed_store = storage_transformer._copy_for_array(self, transformed_store)
288-
self._transformed_store = transformed_store
289-
if self._chunk_store is not None:
290-
transformed_chunk_store = self._chunk_store
287+
if storage_transformers:
288+
transformed_store = self._chunk_store or self._store
291289
for storage_transformer in storage_transformers:
292-
transformed_chunk_store = (
293-
storage_transformer._copy_for_array(self, transformed_chunk_store)
290+
transformed_store = storage_transformer._copy_for_array(
291+
self, transformed_store
294292
)
295-
self._transformed_chunk_store = transformed_chunk_store
293+
self._transformed_chunk_store = transformed_store
296294

297295
def _refresh_metadata(self):
298296
if not self._cache_metadata:
@@ -336,7 +334,7 @@ def _flush_metadata_nosync(self):
336334
@property
337335
def store(self):
338336
"""A MutableMapping providing the underlying storage for the array."""
339-
return self._transformed_store
337+
return self._store
340338

341339
@property
342340
def path(self):
@@ -373,10 +371,12 @@ def read_only(self, value):
373371
@property
374372
def chunk_store(self):
375373
"""A MutableMapping providing the underlying storage for array chunks."""
376-
if self._chunk_store is None:
377-
return self._store
378-
else:
374+
if self._transformed_chunk_store is not None:
379375
return self._transformed_chunk_store
376+
elif self._chunk_store is not None:
377+
return self._chunk_store
378+
else:
379+
return self._store
380380

381381
@property
382382
def shape(self):
@@ -1252,8 +1252,12 @@ def _get_selection(self, indexer, out=None, fields=None):
12521252
check_array_shape('out', out, out_shape)
12531253

12541254
# iterate over chunks
1255-
if not hasattr(self.chunk_store, "getitems") or \
1256-
any(map(lambda x: x == 0, self.shape)):
1255+
if (
1256+
not hasattr(self.chunk_store, "getitems") and not (
1257+
hasattr(self.chunk_store, "get_partial_values") and
1258+
self.chunk_store.supports_efficient_get_partial_values()
1259+
)
1260+
) or any(map(lambda x: x == 0, self.shape)):
12571261
# sequentially get one key at a time from storage
12581262
for chunk_coords, chunk_selection, out_selection in indexer:
12591263

@@ -1790,7 +1794,7 @@ def _set_selection(self, indexer, value, fields=None):
17901794
check_array_shape('value', value, sel_shape)
17911795

17921796
# iterate over chunks in range
1793-
if not hasattr(self.store, "setitems") or self._synchronizer is not None \
1797+
if not hasattr(self.chunk_store, "setitems") or self._synchronizer is not None \
17941798
or any(map(lambda x: x == 0, self.shape)):
17951799
# iterative approach
17961800
for chunk_coords, chunk_selection, out_selection in indexer:
@@ -1868,10 +1872,12 @@ def _process_chunk(
18681872
# contiguous, so we can decompress directly from the chunk
18691873
# into the destination array
18701874
if self._compressor:
1871-
if isinstance(cdata, PartialReadBuffer):
1875+
if isinstance(cdata, (PartialReadBuffer, UncompressedPartialReadBufferV3)):
18721876
cdata = cdata.read_full()
18731877
self._compressor.decode(cdata, dest)
18741878
else:
1879+
if isinstance(cdata, UncompressedPartialReadBufferV3):
1880+
cdata = cdata.read_full()
18751881
chunk = ensure_ndarray(cdata).view(self._dtype)
18761882
chunk = chunk.reshape(self._chunks, order=self._order)
18771883
np.copyto(dest, chunk)
@@ -1893,13 +1899,21 @@ def _process_chunk(
18931899
else dim
18941900
for i, dim in enumerate(self.chunks)
18951901
]
1896-
cdata.read_part(start, nitems)
1897-
chunk_partial = self._decode_chunk(
1898-
cdata.buff,
1899-
start=start,
1900-
nitems=nitems,
1901-
expected_shape=expected_shape,
1902-
)
1902+
if isinstance(cdata, UncompressedPartialReadBufferV3):
1903+
chunk_partial = self._decode_chunk(
1904+
cdata.read_part(start, nitems),
1905+
start=start,
1906+
nitems=nitems,
1907+
expected_shape=expected_shape,
1908+
)
1909+
else:
1910+
cdata.read_part(start, nitems)
1911+
chunk_partial = self._decode_chunk(
1912+
cdata.buff,
1913+
start=start,
1914+
nitems=nitems,
1915+
expected_shape=expected_shape,
1916+
)
19031917
tmp[partial_out_selection] = chunk_partial
19041918
out[out_selection] = tmp[chunk_selection]
19051919
return
@@ -1994,9 +2008,29 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
19942008
for ckey in ckeys
19952009
if ckey in self.chunk_store
19962010
}
2011+
elif (
2012+
self._partial_decompress
2013+
and not self._compressor
2014+
and not fields
2015+
and self.dtype != object
2016+
and hasattr(self.chunk_store, "get_partial_values")
2017+
and self.chunk_store.supports_efficient_get_partial_values()
2018+
):
2019+
partial_read_decode = True
2020+
cdatas = {
2021+
ckey: UncompressedPartialReadBufferV3(
2022+
ckey, self.chunk_store, itemsize=self.itemsize
2023+
)
2024+
for ckey in ckeys
2025+
if ckey in self.chunk_store
2026+
}
19972027
else:
19982028
partial_read_decode = False
1999-
cdatas = self.chunk_store.getitems(ckeys, on_error="omit")
2029+
if not hasattr(self.chunk_store, "getitems"):
2030+
values = self.chunk_store.get_partial_values([(ckey, (0, None)) for ckey in ckeys])
2031+
cdatas = {key: value for key, value in zip(ckeys, values) if value is not None}
2032+
else:
2033+
cdatas = self.chunk_store.getitems(ckeys, on_error="omit")
20002034
for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection):
20012035
if ckey in cdatas:
20022036
self._process_chunk(

zarr/tests/test_core.py

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3287,6 +3287,54 @@ def expected(self):
32873287
]
32883288

32893289

3290+
@pytest.mark.skipif(have_fsspec is False, reason="needs fsspec")
3291+
@pytest.mark.skipif(not v3_api_available, reason="V3 is disabled")
3292+
class TestArrayWithFSStoreV3PartialReadUncompressedSharded(
3293+
TestArrayWithPathV3, TestArrayWithFSStorePartialRead
3294+
):
3295+
3296+
@staticmethod
3297+
def create_array(array_path='arr1', read_only=False, **kwargs):
3298+
path = mkdtemp()
3299+
atexit.register(shutil.rmtree, path)
3300+
store = FSStoreV3(path)
3301+
cache_metadata = kwargs.pop("cache_metadata", True)
3302+
cache_attrs = kwargs.pop("cache_attrs", True)
3303+
write_empty_chunks = kwargs.pop('write_empty_chunks', True)
3304+
kwargs.setdefault('compressor', None)
3305+
num_dims = 1 if isinstance(kwargs["shape"], int) else len(kwargs["shape"])
3306+
sharding_transformer = ShardingStorageTransformer(
3307+
"indexed", chunks_per_shard=(2, ) * num_dims
3308+
)
3309+
init_array(store, path=array_path, storage_transformers=[sharding_transformer], **kwargs)
3310+
return Array(
3311+
store,
3312+
path=array_path,
3313+
read_only=read_only,
3314+
cache_metadata=cache_metadata,
3315+
cache_attrs=cache_attrs,
3316+
partial_decompress=True,
3317+
write_empty_chunks=write_empty_chunks,
3318+
)
3319+
3320+
def test_nbytes_stored(self):
3321+
z = self.create_array(shape=1000, chunks=100)
3322+
expect_nbytes_stored = sum(buffer_size(v) for k, v in z._store.items() if k != 'zarr.json')
3323+
assert expect_nbytes_stored == z.nbytes_stored
3324+
z[:] = 42
3325+
expect_nbytes_stored = sum(buffer_size(v) for k, v in z._store.items() if k != 'zarr.json')
3326+
assert expect_nbytes_stored == z.nbytes_stored
3327+
3328+
def expected(self):
3329+
return [
3330+
"90109fc2a4e17efbcb447003ea1c08828b91f71e",
3331+
"2b73519f7260dba3ddce0d2b70041888856fec6b",
3332+
"bca5798be2ed71d444f3045b05432d937682b7dd",
3333+
"9ff1084501e28520e577662a6e3073f1116c76a2",
3334+
"882a97cad42417f90f111d0cb916a21579650467",
3335+
]
3336+
3337+
32903338
@pytest.mark.skipif(have_fsspec is False, reason="needs fsspec")
32913339
@pytest.mark.skipif(not v3_api_available, reason="V3 is disabled")
32923340
class TestArrayWithFSStoreV3Nested(TestArrayWithPathV3, TestArrayWithFSStoreNested):
@@ -3398,9 +3446,6 @@ def create_array(array_path='arr1', read_only=False, **kwargs):
33983446
cache_metadata=cache_metadata,
33993447
cache_attrs=cache_attrs, write_empty_chunks=write_empty_chunks)
34003448

3401-
# def test_nbytes_stored(self):
3402-
# pass # not implemented
3403-
34043449
def test_nbytes_stored(self):
34053450
z = self.create_array(shape=1000, chunks=100)
34063451
expect_nbytes_stored = sum(buffer_size(v) for k, v in z._store.items() if k != 'zarr.json')
@@ -3410,9 +3455,19 @@ def test_nbytes_stored(self):
34103455
assert expect_nbytes_stored == z.nbytes_stored
34113456

34123457
# mess with store
3413-
z.chunk_store[data_root + z._key_prefix + 'foo'] = list(range(10))
3458+
z.store[data_root + z._key_prefix + 'foo'] = list(range(10))
34143459
assert -1 == z.nbytes_stored
34153460

3461+
def test_keys_inner_store(self):
3462+
z = self.create_array(shape=1000, chunks=100)
3463+
assert z.chunk_store.keys() == z._store.keys()
3464+
meta_keys = set(z.store.keys())
3465+
z[:] = 42
3466+
assert len(z.chunk_store.keys() - meta_keys) == 10
3467+
# inner store should have half the data keys,
3468+
# since chunks_per_shard is 2:
3469+
assert len(z._store.keys() - meta_keys) == 5
3470+
34163471
def expected(self):
34173472
return [
34183473
"b46294e25b1d816055e7937780265c0d8d5d6c47",

0 commit comments

Comments
 (0)