Skip to content

Commit 65a8bd4

Browse files
committed
Zarr-v3 Consolidated Metadata
Implements the optional Consolidated Metadata feature of zarr-v3.
1 parent 8ee89f4 commit 65a8bd4

File tree

7 files changed

+374
-36
lines changed

7 files changed

+374
-36
lines changed

src/zarr/api/asynchronous.py

+45-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import dataclasses
45
import warnings
56
from collections.abc import Iterable
67
from typing import Any, Literal, Union, cast
@@ -12,8 +13,14 @@
1213
from zarr.core.array import Array, AsyncArray
1314
from zarr.core.buffer import NDArrayLike
1415
from zarr.core.chunk_key_encodings import ChunkKeyEncoding
15-
from zarr.core.common import JSON, AccessModeLiteral, ChunkCoords, MemoryOrder, ZarrFormat
16-
from zarr.core.group import AsyncGroup
16+
from zarr.core.common import (
17+
JSON,
18+
AccessModeLiteral,
19+
ChunkCoords,
20+
MemoryOrder,
21+
ZarrFormat,
22+
)
23+
from zarr.core.group import AsyncGroup, ConsolidatedMetadata
1724
from zarr.core.metadata import ArrayV2Metadata, ArrayV3Metadata
1825
from zarr.store import (
1926
StoreLike,
@@ -126,8 +133,38 @@ def _default_zarr_version() -> ZarrFormat:
126133
return 3
127134

128135

129-
async def consolidate_metadata(*args: Any, **kwargs: Any) -> AsyncGroup:
130-
raise NotImplementedError
136+
async def consolidate_metadata(store: StoreLike) -> AsyncGroup:
137+
"""
138+
Consolidate the metadata of all nodes in a hierarchy.
139+
140+
Upon completion, the metadata of the root node in the Zarr hierarchy will be
141+
updated to include all the metadata of child nodes.
142+
143+
Parameters
144+
----------
145+
store: StoreLike
146+
The store-like object whose metadata you wish to consolidate.
147+
148+
Returns
149+
-------
150+
group: AsyncGroup
151+
The group, with the ``consolidated_metadata`` field set to include
152+
the metadata of each child node.
153+
"""
154+
group = await AsyncGroup.open(store)
155+
members = dict([x async for x in group.members(recursive=True)])
156+
members_metadata = {}
157+
158+
members_metadata = {k: v.metadata for k, v in members.items()}
159+
160+
consolidated_metadata = ConsolidatedMetadata(metadata=members_metadata)
161+
metadata = dataclasses.replace(group.metadata, consolidated_metadata=consolidated_metadata)
162+
group = dataclasses.replace(
163+
group,
164+
metadata=metadata,
165+
)
166+
await group._save_metadata()
167+
return group
131168

132169

133170
async def copy(*args: Any, **kwargs: Any) -> tuple[int, int, int]:
@@ -229,7 +266,7 @@ async def open(
229266

230267

231268
async def open_consolidated(*args: Any, **kwargs: Any) -> AsyncGroup:
232-
raise NotImplementedError
269+
return await open_group(*args, **kwargs)
233270

234271

235272
async def save(
@@ -703,7 +740,9 @@ async def create(
703740
)
704741
else:
705742
warnings.warn(
706-
"dimension_separator is not yet implemented", RuntimeWarning, stacklevel=2
743+
"dimension_separator is not yet implemented",
744+
RuntimeWarning,
745+
stacklevel=2,
707746
)
708747
if write_empty_chunks:
709748
warnings.warn("write_empty_chunks is not yet implemented", RuntimeWarning, stacklevel=2)

src/zarr/codecs/crc32c_.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from __future__ import annotations
22

33
from dataclasses import dataclass
4-
from typing import TYPE_CHECKING
4+
from typing import TYPE_CHECKING, cast
55

66
import numpy as np
7+
import typing_extensions
78
from crc32c import crc32c
89

910
from zarr.abc.codec import BytesBytesCodec
@@ -37,7 +38,7 @@ async def _decode_single(
3738
crc32_bytes = data[-4:]
3839
inner_bytes = data[:-4]
3940

40-
computed_checksum = np.uint32(crc32c(inner_bytes)).tobytes()
41+
computed_checksum = np.uint32(crc32c(cast(typing_extensions.Buffer, inner_bytes))).tobytes()
4142
stored_checksum = bytes(crc32_bytes)
4243
if computed_checksum != stored_checksum:
4344
raise ValueError(
@@ -52,7 +53,7 @@ async def _encode_single(
5253
) -> Buffer | None:
5354
data = chunk_bytes.as_numpy_array()
5455
# Calculate the checksum and "cast" it to a numpy array
55-
checksum = np.array([crc32c(data)], dtype=np.uint32)
56+
checksum = np.array([crc32c(cast(typing_extensions.Buffer, data))], dtype=np.uint32)
5657
# Append the checksum (as bytes) to the data
5758
return chunk_spec.prototype.buffer.from_array_like(np.append(data, checksum.view("b")))
5859

src/zarr/core/array.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,15 @@ async def open(
372372
else:
373373
# V3 arrays are comprised of a zarr.json object
374374
assert zarr_json_bytes is not None
375+
zarr_metadata = json.loads(zarr_json_bytes.to_bytes())
376+
if zarr_metadata.get("node_type") != "array":
377+
# This KeyError is load bearing for `open`. That currently tries
378+
# to open the node as an `array` and then falls back to opening
379+
# as a group.
380+
raise KeyError
375381
return cls(
376382
store_path=store_path,
377-
metadata=ArrayV3Metadata.from_dict(json.loads(zarr_json_bytes.to_bytes())),
383+
metadata=ArrayV3Metadata.from_dict(zarr_metadata),
378384
)
379385

380386
@property

src/zarr/core/common.py

+23
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
overload,
1717
)
1818

19+
import numcodecs
20+
1921
if TYPE_CHECKING:
2022
from collections.abc import Awaitable, Callable, Iterator
2123

@@ -167,3 +169,24 @@ def parse_order(data: Any) -> Literal["C", "F"]:
167169
if data in ("C", "F"):
168170
return cast(Literal["C", "F"], data)
169171
raise ValueError(f"Expected one of ('C', 'F'), got {data} instead.")
172+
173+
174+
def _json_convert(o: Any) -> Any:
175+
if isinstance(o, np.dtype):
176+
return str(o)
177+
if np.isscalar(o):
178+
# convert numpy scalar to python type, and pass
179+
# python types through
180+
out = getattr(o, "item", lambda: o)()
181+
if isinstance(out, complex):
182+
# python complex types are not JSON serializable, so we use the
183+
# serialization defined in the zarr v3 spec
184+
return [out.real, out.imag]
185+
return out
186+
if isinstance(o, Enum):
187+
return o.name
188+
# this serializes numcodecs compressors
189+
# todo: implement to_dict for codecs
190+
elif isinstance(o, numcodecs.abc.Codec):
191+
config: dict[str, Any] = o.get_config()
192+
return config

src/zarr/core/group.py

+67-4
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
ZGROUP_JSON,
2626
ChunkCoords,
2727
ZarrFormat,
28+
_json_convert,
2829
)
2930
from zarr.core.config import config
31+
from zarr.core.metadata import ArrayMetadata, ArrayV3Metadata
3032
from zarr.core.sync import SyncMixin, sync
3133
from zarr.store import StoreLike, StorePath, make_store_path
3234
from zarr.store.common import ensure_no_existing_node
@@ -77,18 +79,65 @@ def _parse_async_node(node: AsyncArray | AsyncGroup) -> Array | Group:
7779
raise TypeError(f"Unknown node type, got {type(node)}")
7880

7981

82+
@dataclass(frozen=True)
83+
class ConsolidatedMetadata:
84+
metadata: dict[str, ArrayMetadata | GroupMetadata]
85+
kind: Literal["inline"] = "inline"
86+
must_understand: Literal[False] = False
87+
88+
def to_dict(self) -> dict[str, JSON]:
89+
return {
90+
"kind": self.kind,
91+
"must_understand": self.must_understand,
92+
"metadata": {k: v.to_dict() for k, v in self.metadata.items()},
93+
}
94+
95+
@classmethod
96+
def from_dict(cls, data: dict[str, JSON]) -> ConsolidatedMetadata:
97+
data = dict(data)
98+
raw_metadata = data.get("metadata")
99+
if not isinstance(raw_metadata, dict):
100+
raise TypeError("Unexpected type for 'metadata'")
101+
102+
elif not raw_metadata:
103+
raise ValueError("Must specify metadata")
104+
105+
metadata: dict[str, ArrayMetadata | GroupMetadata]
106+
if raw_metadata:
107+
metadata = {}
108+
for k, v in raw_metadata.items():
109+
if not isinstance(v, dict):
110+
raise TypeError(f"Invalid value for metadata items. key={k}, type={type(v)}")
111+
112+
node_type = v.get("node_type", None)
113+
if node_type == "group":
114+
metadata[k] = GroupMetadata.from_dict(v)
115+
elif node_type == "array":
116+
metadata[k] = ArrayV3Metadata.from_dict(v)
117+
else:
118+
raise ValueError(f"Invalid node_type: '{node_type}'")
119+
# assert data["kind"] == "inline"
120+
if data["kind"] != "inline":
121+
raise ValueError
122+
123+
if data["must_understand"] is not False:
124+
raise ValueError
125+
return cls(metadata=metadata)
126+
127+
80128
@dataclass(frozen=True)
81129
class GroupMetadata(Metadata):
82130
attributes: dict[str, Any] = field(default_factory=dict)
83131
zarr_format: ZarrFormat = 3
132+
consolidated_metadata: ConsolidatedMetadata | None = None
84133
node_type: Literal["group"] = field(default="group", init=False)
85134

86135
def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]:
87136
json_indent = config.get("json_indent")
88137
if self.zarr_format == 3:
89138
return {
90139
ZARR_JSON: prototype.buffer.from_bytes(
91-
json.dumps(self.to_dict(), indent=json_indent).encode()
140+
json.dumps(self.to_dict(), default=_json_convert, indent=json_indent).encode()
92141
)
93142
}
94143
else:
@@ -101,20 +150,33 @@ def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]:
101150
),
102151
}
103152

104-
def __init__(self, attributes: dict[str, Any] | None = None, zarr_format: ZarrFormat = 3):
153+
def __init__(
154+
self,
155+
attributes: dict[str, Any] | None = None,
156+
zarr_format: ZarrFormat = 3,
157+
consolidated_metadata: ConsolidatedMetadata | None = None,
158+
):
105159
attributes_parsed = parse_attributes(attributes)
106160
zarr_format_parsed = parse_zarr_format(zarr_format)
107161

108162
object.__setattr__(self, "attributes", attributes_parsed)
109163
object.__setattr__(self, "zarr_format", zarr_format_parsed)
164+
object.__setattr__(self, "consolidated_metadata", consolidated_metadata)
110165

111166
@classmethod
112167
def from_dict(cls, data: dict[str, Any]) -> GroupMetadata:
168+
data = dict(data)
113169
assert data.pop("node_type", None) in ("group", None)
170+
consolidated_metadata = data.pop("consolidated_metadata", None)
171+
if consolidated_metadata:
172+
data["consolidated_metadata"] = ConsolidatedMetadata.from_dict(consolidated_metadata)
114173
return cls(**data)
115174

116175
def to_dict(self) -> dict[str, Any]:
117-
return asdict(self)
176+
result = asdict(replace(self, consolidated_metadata=None))
177+
if self.consolidated_metadata:
178+
result["consolidated_metadata"] = self.consolidated_metadata.to_dict()
179+
return result
118180

119181

120182
@dataclass(frozen=True)
@@ -497,7 +559,8 @@ async def members(
497559
# as opposed to a prefix, in the store under the prefix associated with this group
498560
# in which case `key` cannot be the name of a sub-array or sub-group.
499561
logger.warning(
500-
"Object at %s is not recognized as a component of a Zarr hierarchy.", key
562+
"Object at %s is not recognized as a component of a Zarr hierarchy.",
563+
key,
501564
)
502565

503566
async def contains(self, member: str) -> bool:

src/zarr/core/metadata.py

+1-22
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
if TYPE_CHECKING:
2121
from typing_extensions import Self
2222

23-
import numcodecs.abc
2423

2524
from zarr.core.array_spec import ArraySpec
2625
from zarr.core.common import (
@@ -30,6 +29,7 @@
3029
ZATTRS_JSON,
3130
ChunkCoords,
3231
ZarrFormat,
32+
_json_convert,
3333
parse_dtype,
3434
parse_named_configuration,
3535
parse_shapelike,
@@ -252,27 +252,6 @@ def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str:
252252
return self.chunk_key_encoding.encode_chunk_key(chunk_coords)
253253

254254
def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]:
255-
def _json_convert(o: Any) -> Any:
256-
if isinstance(o, np.dtype):
257-
return str(o)
258-
if np.isscalar(o):
259-
# convert numpy scalar to python type, and pass
260-
# python types through
261-
out = getattr(o, "item", lambda: o)()
262-
if isinstance(out, complex):
263-
# python complex types are not JSON serializable, so we use the
264-
# serialization defined in the zarr v3 spec
265-
return [out.real, out.imag]
266-
return out
267-
if isinstance(o, Enum):
268-
return o.name
269-
# this serializes numcodecs compressors
270-
# todo: implement to_dict for codecs
271-
elif isinstance(o, numcodecs.abc.Codec):
272-
config: dict[str, Any] = o.get_config()
273-
return config
274-
raise TypeError
275-
276255
json_indent = config.get("json_indent")
277256
return {
278257
ZARR_JSON: prototype.buffer.from_bytes(

0 commit comments

Comments
 (0)