1
1
from __future__ import annotations
2
+ from typing import TYPE_CHECKING
2
3
from dataclasses import asdict , dataclass , field , replace
3
4
4
5
import asyncio
5
6
import json
6
7
import logging
7
- from typing import Any , Dict , Literal , Optional , Union , AsyncIterator , List
8
+
9
+ if TYPE_CHECKING :
10
+ from typing import (
11
+ Any ,
12
+ AsyncGenerator ,
13
+ Literal ,
14
+ AsyncIterator ,
15
+ )
8
16
from zarr .abc .metadata import Metadata
9
17
10
18
from zarr .array import AsyncArray , Array
@@ -25,7 +33,7 @@ def parse_zarr_format(data: Any) -> Literal[2, 3]:
25
33
26
34
27
35
# todo: convert None to empty dict
28
- def parse_attributes (data : Any ) -> Dict [str , Any ]:
36
+ def parse_attributes (data : Any ) -> dict [str , Any ]:
29
37
if data is None :
30
38
return {}
31
39
elif isinstance (data , dict ) and all (map (lambda v : isinstance (v , str ), data .keys ())):
@@ -36,12 +44,12 @@ def parse_attributes(data: Any) -> Dict[str, Any]:
36
44
37
45
@dataclass (frozen = True )
38
46
class GroupMetadata (Metadata ):
39
- attributes : Dict [str , Any ] = field (default_factory = dict )
47
+ attributes : dict [str , Any ] = field (default_factory = dict )
40
48
zarr_format : Literal [2 , 3 ] = 3
41
49
node_type : Literal ["group" ] = field (default = "group" , init = False )
42
50
43
51
# todo: rename this, since it doesn't return bytes
44
- def to_bytes (self ) -> Dict [str , bytes ]:
52
+ def to_bytes (self ) -> dict [str , bytes ]:
45
53
if self .zarr_format == 3 :
46
54
return {ZARR_JSON : json .dumps (self .to_dict ()).encode ()}
47
55
else :
@@ -50,34 +58,34 @@ def to_bytes(self) -> Dict[str, bytes]:
50
58
ZATTRS_JSON : json .dumps (self .attributes ).encode (),
51
59
}
52
60
53
- def __init__ (self , attributes : Optional [ Dict [ str , Any ]] = None , zarr_format : Literal [2 , 3 ] = 3 ):
61
+ def __init__ (self , attributes : dict [ str , Any ] | None = None , zarr_format : Literal [2 , 3 ] = 3 ):
54
62
attributes_parsed = parse_attributes (attributes )
55
63
zarr_format_parsed = parse_zarr_format (zarr_format )
56
64
57
65
object .__setattr__ (self , "attributes" , attributes_parsed )
58
66
object .__setattr__ (self , "zarr_format" , zarr_format_parsed )
59
67
60
68
@classmethod
61
- def from_dict (cls , data : Dict [str , Any ]) -> GroupMetadata :
69
+ def from_dict (cls , data : dict [str , Any ]) -> GroupMetadata :
62
70
assert data .pop ("node_type" , None ) in ("group" , None )
63
71
return cls (** data )
64
72
65
- def to_dict (self ) -> Dict [str , Any ]:
73
+ def to_dict (self ) -> dict [str , Any ]:
66
74
return asdict (self )
67
75
68
76
69
77
@dataclass (frozen = True )
70
78
class AsyncGroup :
71
79
metadata : GroupMetadata
72
80
store_path : StorePath
73
- runtime_configuration : RuntimeConfiguration
81
+ runtime_configuration : RuntimeConfiguration = RuntimeConfiguration ()
74
82
75
83
@classmethod
76
84
async def create (
77
85
cls ,
78
86
store : StoreLike ,
79
87
* ,
80
- attributes : Optional [ Dict [ str , Any ]] = None ,
88
+ attributes : dict [ str , Any ] = {} ,
81
89
exists_ok : bool = False ,
82
90
zarr_format : Literal [2 , 3 ] = 3 ,
83
91
runtime_configuration : RuntimeConfiguration = RuntimeConfiguration (),
@@ -89,7 +97,7 @@ async def create(
89
97
elif zarr_format == 2 :
90
98
assert not await (store_path / ZGROUP_JSON ).exists ()
91
99
group = cls (
92
- metadata = GroupMetadata (attributes = attributes or {} , zarr_format = zarr_format ),
100
+ metadata = GroupMetadata (attributes = attributes , zarr_format = zarr_format ),
93
101
store_path = store_path ,
94
102
runtime_configuration = runtime_configuration ,
95
103
)
@@ -137,7 +145,7 @@ async def open(
137
145
def from_dict (
138
146
cls ,
139
147
store_path : StorePath ,
140
- data : Dict [str , Any ],
148
+ data : dict [str , Any ],
141
149
runtime_configuration : RuntimeConfiguration ,
142
150
) -> AsyncGroup :
143
151
group = cls (
@@ -150,14 +158,24 @@ def from_dict(
150
158
async def getitem (
151
159
self ,
152
160
key : str ,
153
- ) -> Union [ AsyncArray , AsyncGroup ] :
161
+ ) -> AsyncArray | AsyncGroup :
154
162
store_path = self .store_path / key
155
163
164
+ # Note:
165
+ # in zarr-python v2, we first check if `key` references an Array, else if `key` references
166
+ # a group,using standalone `contains_array` and `contains_group` functions. These functions
167
+ # are reusable, but for v3 they would perform redundant I/O operations.
168
+ # Not clear how much of that strategy we want to keep here.
169
+
170
+ # if `key` names an object in storage, it cannot be an array or group
171
+ if await store_path .exists ():
172
+ raise KeyError (key )
173
+
156
174
if self .metadata .zarr_format == 3 :
157
175
zarr_json_bytes = await (store_path / ZARR_JSON ).get ()
158
176
if zarr_json_bytes is None :
159
177
# implicit group?
160
- logger .warning ("group at {} is an implicit group" , store_path )
178
+ logger .warning ("group at %s is an implicit group" , store_path )
161
179
zarr_json = {
162
180
"zarr_format" : self .metadata .zarr_format ,
163
181
"node_type" : "group" ,
@@ -196,7 +214,7 @@ async def getitem(
196
214
else :
197
215
if zgroup_bytes is None :
198
216
# implicit group?
199
- logger .warning ("group at {} is an implicit group" , store_path )
217
+ logger .warning ("group at %s is an implicit group" , store_path )
200
218
zgroup = (
201
219
json .loads (zgroup_bytes )
202
220
if zgroup_bytes is not None
@@ -248,7 +266,7 @@ async def create_array(self, path: str, **kwargs) -> AsyncArray:
248
266
** kwargs ,
249
267
)
250
268
251
- async def update_attributes (self , new_attributes : Dict [str , Any ]):
269
+ async def update_attributes (self , new_attributes : dict [str , Any ]):
252
270
# metadata.attributes is "frozen" so we simply clear and update the dict
253
271
self .metadata .attributes .clear ()
254
272
self .metadata .attributes .update (new_attributes )
@@ -269,26 +287,68 @@ async def update_attributes(self, new_attributes: Dict[str, Any]):
269
287
def __repr__ (self ):
270
288
return f"<AsyncGroup { self .store_path } >"
271
289
272
- async def nchildren (self ) -> int :
273
- raise NotImplementedError
274
-
275
- async def children (self ) -> AsyncIterator [Union [AsyncArray , AsyncGroup ]]:
276
- raise NotImplementedError
277
-
278
- async def contains (self , child : str ) -> bool :
290
+ async def nmembers (self ) -> int :
279
291
raise NotImplementedError
280
292
281
- async def group_keys (self ) -> AsyncIterator [str ]:
282
- raise NotImplementedError
293
+ async def members (self ) -> AsyncGenerator [tuple [str , AsyncArray | AsyncGroup ], None ]:
294
+ """
295
+ Returns an AsyncGenerator over the arrays and groups contained in this group.
296
+ This method requires that `store_path.store` supports directory listing.
297
+
298
+ The results are not guaranteed to be ordered.
299
+ """
300
+ if not self .store_path .store .supports_listing :
301
+ msg = (
302
+ f"The store associated with this group ({ type (self .store_path .store )} ) "
303
+ "does not support listing, "
304
+ "specifically via the `list_dir` method. "
305
+ "This function requires a store that supports listing."
306
+ )
283
307
284
- async def groups (self ) -> AsyncIterator [AsyncGroup ]:
285
- raise NotImplementedError
308
+ raise ValueError (msg )
309
+ subkeys = await self .store_path .store .list_dir (self .store_path .path )
310
+ # would be nice to make these special keys accessible programmatically,
311
+ # and scoped to specific zarr versions
312
+ subkeys_filtered = filter (lambda v : v not in ("zarr.json" , ".zgroup" , ".zattrs" ), subkeys )
313
+ # is there a better way to schedule this?
314
+ for subkey in subkeys_filtered :
315
+ try :
316
+ yield (subkey , await self .getitem (subkey ))
317
+ except KeyError :
318
+ # keyerror is raised when `subkey` names an object (in the object storage sense),
319
+ # as opposed to a prefix, in the store under the prefix associated with this group
320
+ # in which case `subkey` cannot be the name of a sub-array or sub-group.
321
+ logger .warning (
322
+ "Object at %s is not recognized as a component of a Zarr hierarchy." , subkey
323
+ )
324
+ pass
286
325
287
- async def array_keys (self ) -> AsyncIterator [ str ] :
326
+ async def contains (self , member : str ) -> bool :
288
327
raise NotImplementedError
289
328
329
+ # todo: decide if this method should be separate from `groups`
330
+ async def group_keys (self ) -> AsyncGenerator [str , None ]:
331
+ async for key , value in self .members ():
332
+ if isinstance (value , AsyncGroup ):
333
+ yield key
334
+
335
+ # todo: decide if this method should be separate from `group_keys`
336
+ async def groups (self ) -> AsyncGenerator [AsyncGroup , None ]:
337
+ async for key , value in self .members ():
338
+ if isinstance (value , AsyncGroup ):
339
+ yield value
340
+
341
+ # todo: decide if this method should be separate from `arrays`
342
+ async def array_keys (self ) -> AsyncGenerator [str , None ]:
343
+ async for key , value in self .members ():
344
+ if isinstance (value , AsyncArray ):
345
+ yield key
346
+
347
+ # todo: decide if this method should be separate from `array_keys`
290
348
async def arrays (self ) -> AsyncIterator [AsyncArray ]:
291
- raise NotImplementedError
349
+ async for key , value in self .members ():
350
+ if isinstance (value , AsyncArray ):
351
+ yield value
292
352
293
353
async def tree (self , expand = False , level = None ) -> Any :
294
354
raise NotImplementedError
@@ -331,7 +391,7 @@ def create(
331
391
cls ,
332
392
store : StoreLike ,
333
393
* ,
334
- attributes : Optional [ Dict [ str , Any ]] = None ,
394
+ attributes : dict [ str , Any ] = {} ,
335
395
exists_ok : bool = False ,
336
396
runtime_configuration : RuntimeConfiguration = RuntimeConfiguration (),
337
397
) -> Group :
@@ -358,7 +418,7 @@ def open(
358
418
)
359
419
return cls (obj )
360
420
361
- def __getitem__ (self , path : str ) -> Union [ Array , Group ] :
421
+ def __getitem__ (self , path : str ) -> Array | Group :
362
422
obj = self ._sync (self ._async_group .getitem (path ))
363
423
if isinstance (obj , AsyncArray ):
364
424
return Array (obj )
@@ -378,7 +438,7 @@ def __setitem__(self, key, value):
378
438
"""__setitem__ is not supported in v3"""
379
439
raise NotImplementedError
380
440
381
- async def update_attributes_async (self , new_attributes : Dict [str , Any ]) -> Group :
441
+ async def update_attributes_async (self , new_attributes : dict [str , Any ]) -> Group :
382
442
new_metadata = replace (self .metadata , attributes = new_attributes )
383
443
384
444
# Write new metadata
@@ -389,6 +449,10 @@ async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> Group
389
449
async_group = replace (self ._async_group , metadata = new_metadata )
390
450
return replace (self , _async_group = async_group )
391
451
452
+ @property
453
+ def store_path (self ) -> StorePath :
454
+ return self ._async_group .store_path
455
+
392
456
@property
393
457
def metadata (self ) -> GroupMetadata :
394
458
return self ._async_group .metadata
@@ -401,50 +465,54 @@ def attrs(self) -> Attributes:
401
465
def info (self ):
402
466
return self ._async_group .info
403
467
404
- @property
405
- def store_path (self ) -> StorePath :
406
- return self ._async_group .store_path
407
-
408
- def update_attributes (self , new_attributes : Dict [str , Any ]):
468
+ def update_attributes (self , new_attributes : dict [str , Any ]):
409
469
self ._sync (self ._async_group .update_attributes (new_attributes ))
410
470
return self
411
471
412
472
@property
413
- def nchildren (self ) -> int :
414
- return self ._sync (self ._async_group .nchildren ())
473
+ def nmembers (self ) -> int :
474
+ return self ._sync (self ._async_group .nmembers ())
415
475
416
476
@property
417
- def children (self ) -> List [Union [Array , Group ]]:
418
- raise NotImplementedError
419
- # Uncomment with AsyncGroup implements this method
420
- # _children: List[Union[AsyncArray, AsyncGroup]] = self._sync_iter(
421
- # self._async_group.children()
422
- # )
423
- # return [Array(obj) if isinstance(obj, AsyncArray) else Group(obj) for obj in _children]
477
+ def members (self ) -> tuple [tuple [str , Array | Group ], ...]:
478
+ """
479
+ Return the sub-arrays and sub-groups of this group as a `tuple` of (name, array | group)
480
+ pairs
481
+ """
482
+ _members : list [tuple [str , AsyncArray | AsyncGroup ]] = self ._sync_iter (
483
+ self ._async_group .members ()
484
+ )
485
+ ret : list [tuple [str , Array | Group ]] = []
486
+ for key , value in _members :
487
+ if isinstance (value , AsyncArray ):
488
+ ret .append ((key , Array (value )))
489
+ else :
490
+ ret .append ((key , Group (value )))
491
+ return tuple (ret )
424
492
425
- def __contains__ (self , child ) -> bool :
426
- return self ._sync (self ._async_group .contains (child ))
493
+ def __contains__ (self , member ) -> bool :
494
+ return self ._sync (self ._async_group .contains (member ))
427
495
428
- def group_keys (self ) -> List [str ]:
429
- raise NotImplementedError
496
+ def group_keys (self ) -> list [str ]:
430
497
# uncomment with AsyncGroup implements this method
431
498
# return self._sync_iter(self._async_group.group_keys())
499
+ raise NotImplementedError
432
500
433
- def groups (self ) -> List [Group ]:
501
+ def groups (self ) -> list [Group ]:
434
502
# TODO: in v2 this was a generator that return key: Group
435
- raise NotImplementedError
436
503
# uncomment with AsyncGroup implements this method
437
504
# return [Group(obj) for obj in self._sync_iter(self._async_group.groups())]
505
+ raise NotImplementedError
438
506
439
- def array_keys (self ) -> List [str ]:
507
+ def array_keys (self ) -> list [str ]:
440
508
# uncomment with AsyncGroup implements this method
441
- # return self._sync_iter(self._async_group.array_keys() )
509
+ # return self._sync_iter(self._async_group.array_keys)
442
510
raise NotImplementedError
443
511
444
- def arrays (self ) -> List [Array ]:
445
- raise NotImplementedError
512
+ def arrays (self ) -> list [Array ]:
446
513
# uncomment with AsyncGroup implements this method
447
- # return [Array(obj) for obj in self._sync_iter(self._async_group.arrays())]
514
+ # return [Array(obj) for obj in self._sync_iter(self._async_group.arrays)]
515
+ raise NotImplementedError
448
516
449
517
def tree (self , expand = False , level = None ) -> Any :
450
518
return self ._sync (self ._async_group .tree (expand = expand , level = level ))
0 commit comments