Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion src/zarr/abc/store.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
from abc import abstractmethod, ABC
import json
import numbers
from enum import Enum

import numpy as np
from collections.abc import AsyncGenerator
from typing import List, Tuple, Optional
from typing import Any, List, Tuple, Optional


def json_default(o: Any) -> Any:
# See json.JSONEncoder.default docstring for explanation
# This is necessary to encode numpy dtype
if isinstance(o, numbers.Integral):
return int(o)
if isinstance(o, numbers.Real):
return float(o)
if isinstance(o, np.dtype):
if o.fields is None:
return o.str
else:
return o.descr
if isinstance(o, Enum):
return o.name
# this serializes numcodecs compressors
# todo: implement to_dict for codecs
elif hasattr(o, "get_config"):
return o.get_config()
raise TypeError


class Store(ABC):
Expand All @@ -22,6 +47,12 @@ async def get(
"""
...

async def get_metadata(self, key: str) -> Optional[dict[str, Any]]:
data = await self.get(key)
if data is None:
return None
return json.loads(data)

@abstractmethod
async def get_partial_values(
self, key_ranges: List[Tuple[str, Tuple[int, int]]]
Expand Down Expand Up @@ -71,6 +102,10 @@ async def set(self, key: str, value: bytes) -> None:
"""
...

async def set_metadata(self, key: str, metadata: dict[str, Any]) -> None:
data = json.dumps(metadata, default=json_default).encode("utf-8")
await self.set(key, data)

@abstractmethod
async def delete(self, key: str) -> None:
"""Remove a key from the store
Expand Down
19 changes: 9 additions & 10 deletions src/zarr/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

from dataclasses import dataclass, replace

import json
from typing import Any, Dict, Iterable, Literal, Optional, Tuple, Union

import numpy as np
Expand Down Expand Up @@ -146,11 +145,11 @@ async def open(
runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(),
) -> AsyncArray:
store_path = make_store_path(store)
zarr_json_bytes = await (store_path / ZARR_JSON).get()
assert zarr_json_bytes is not None
zarr_json = await (store_path / ZARR_JSON).get_metadata()
assert zarr_json is not None
return cls.from_dict(
store_path,
json.loads(zarr_json_bytes),
zarr_json,
runtime_configuration=runtime_configuration,
)

Expand All @@ -161,11 +160,11 @@ async def open_auto(
runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(),
) -> AsyncArray: # TODO: Union[AsyncArray, ArrayV2]
store_path = make_store_path(store)
v3_metadata_bytes = await (store_path / ZARR_JSON).get()
if v3_metadata_bytes is not None:
v3_metadata = await (store_path / ZARR_JSON).get_metadata()
if v3_metadata is not None:
return cls.from_dict(
store_path,
json.loads(v3_metadata_bytes),
v3_metadata,
runtime_configuration=runtime_configuration or RuntimeConfiguration(),
)
else:
Expand Down Expand Up @@ -223,7 +222,7 @@ async def getitem(self, selection: Selection) -> np.ndarray:
return out[()]

async def _save_metadata(self) -> None:
await (self.store_path / ZARR_JSON).set(self.metadata.to_bytes())
await (self.store_path / ZARR_JSON).set_metadata(self.metadata.to_dict())

async def _read_chunk(
self,
Expand Down Expand Up @@ -392,14 +391,14 @@ async def _delete_key(key: str) -> None:
)

# Write new metadata
await (self.store_path / ZARR_JSON).set(new_metadata.to_bytes())
await (self.store_path / ZARR_JSON).set_metadata(new_metadata.to_dict())
return replace(self, metadata=new_metadata)

async def update_attributes(self, new_attributes: Dict[str, Any]) -> AsyncArray:
new_metadata = replace(self.metadata, attributes=new_attributes)

# Write new metadata
await (self.store_path / ZARR_JSON).set(new_metadata.to_bytes())
await (self.store_path / ZARR_JSON).set_metadata(new_metadata.to_dict())
return replace(self, metadata=new_metadata)

def __repr__(self):
Expand Down
20 changes: 11 additions & 9 deletions src/zarr/array_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ def from_dict(
async def _save_metadata(self) -> None:
self._validate_metadata()

await (self.store_path / ZARRAY_JSON).set(self.metadata.to_bytes())
meta = self.metadata.to_dict()
assert isinstance(meta, dict) # for mypy
await (self.store_path / ZARRAY_JSON).set_metadata(meta)
if self.attributes is not None and len(self.attributes) > 0:
await (self.store_path / ZATTRS_JSON).set(
json.dumps(self.attributes).encode(),
)
await (self.store_path / ZATTRS_JSON).set_metadata(self.attributes)
else:
await (self.store_path / ZATTRS_JSON).delete()

Expand Down Expand Up @@ -432,7 +432,9 @@ async def _delete_key(key: str) -> None:
)

# Write new metadata
await (self.store_path / ZARRAY_JSON).set(new_metadata.to_bytes())
meta = new_metadata.to_dict()
assert isinstance(meta, dict) # for mypy
await (self.store_path / ZARRAY_JSON).set_metadata(meta)
return replace(self, metadata=new_metadata)

def resize(self, new_shape: ChunkCoords) -> ArrayV2:
Expand Down Expand Up @@ -505,17 +507,17 @@ async def convert_to_v3_async(self) -> Array:
dimension_names=None,
)

new_metadata_bytes = new_metadata.to_bytes()
await (self.store_path / ZARR_JSON).set(new_metadata_bytes)
_new_metadata = new_metadata.to_dict()
await (self.store_path / ZARR_JSON).set_metadata(_new_metadata)

return Array.from_dict(
store_path=self.store_path,
data=json.loads(new_metadata_bytes),
data=_new_metadata,
runtime_configuration=self.runtime_configuration,
)

async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> ArrayV2:
await (self.store_path / ZATTRS_JSON).set(json.dumps(new_attributes).encode())
await (self.store_path / ZATTRS_JSON).set_metadata(new_attributes)
return replace(self, attributes=new_attributes)

def update_attributes(self, new_attributes: Dict[str, Any]) -> ArrayV2:
Expand Down
85 changes: 39 additions & 46 deletions src/zarr/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from dataclasses import asdict, dataclass, field, replace

import asyncio
import json
import logging

if TYPE_CHECKING:
Expand Down Expand Up @@ -48,14 +47,13 @@ class GroupMetadata(Metadata):
zarr_format: Literal[2, 3] = 3
node_type: Literal["group"] = field(default="group", init=False)

# todo: rename this, since it doesn't return bytes
def to_bytes(self) -> dict[str, bytes]:
def to_meta_dict(self) -> dict[str, Any]:
if self.zarr_format == 3:
return {ZARR_JSON: json.dumps(self.to_dict()).encode()}
return {ZARR_JSON: self.to_dict()}
else:
return {
ZGROUP_JSON: json.dumps({"zarr_format": 2}).encode(),
ZATTRS_JSON: json.dumps(self.attributes).encode(),
ZGROUP_JSON: {"zarr_format": 2},
ZATTRS_JSON: self.attributes,
}

def __init__(self, attributes: dict[str, Any] | None = None, zarr_format: Literal[2, 3] = 3):
Expand Down Expand Up @@ -114,29 +112,29 @@ async def open(
store_path = make_store_path(store)

if zarr_format == 2:
zgroup_bytes, zattrs_bytes = await asyncio.gather(
(store_path / ZGROUP_JSON).get(), (store_path / ZATTRS_JSON).get()
zgroup, zattrs = await asyncio.gather(
(store_path / ZGROUP_JSON).get_metadata(), (store_path / ZATTRS_JSON).get_metadata()
)
if zgroup_bytes is None:
if zgroup is None:
raise KeyError(store_path) # filenotfounderror?
elif zarr_format == 3:
zarr_json_bytes = await (store_path / ZARR_JSON).get()
if zarr_json_bytes is None:
zarr_json = await (store_path / ZARR_JSON).get_metadata()
if zarr_json is None:
raise KeyError(store_path) # filenotfounderror?
elif zarr_format is None:
zarr_json_bytes, zgroup_bytes, zattrs_bytes = await asyncio.gather(
(store_path / ZARR_JSON).get(),
(store_path / ZGROUP_JSON).get(),
(store_path / ZATTRS_JSON).get(),
zarr_json, zgroup, zattrs = await asyncio.gather(
(store_path / ZARR_JSON).get_metadata(),
(store_path / ZGROUP_JSON).get_metadata(),
(store_path / ZATTRS_JSON).get_metadata(),
)
if zarr_json_bytes is not None and zgroup_bytes is not None:
if zarr_json is not None and zgroup is not None:
# TODO: revisit this exception type
# alternatively, we could warn and favor v3
raise ValueError("Both zarr.json and .zgroup objects exist")
if zarr_json_bytes is None and zgroup_bytes is None:
if zarr_json is None and zgroup is None:
raise KeyError(store_path) # filenotfounderror?
# set zarr_format based on which keys were found
if zarr_json_bytes is not None:
if zarr_json is not None:
zarr_format = 3
else:
zarr_format = 2
Expand All @@ -145,14 +143,14 @@ async def open(

if zarr_format == 2:
# V2 groups are comprised of a .zgroup and .zattrs objects
assert zgroup_bytes is not None
zgroup = json.loads(zgroup_bytes)
zattrs = json.loads(zattrs_bytes) if zattrs_bytes is not None else {}
assert zgroup is not None
if zattrs is None:
zattrs = {}
group_metadata = {**zgroup, "attributes": zattrs}
else:
# V3 groups are comprised of a zarr.json object
assert zarr_json_bytes is not None
group_metadata = json.loads(zarr_json_bytes)
assert zarr_json is not None
group_metadata = zarr_json

return cls.from_dict(store_path, group_metadata, runtime_configuration)

Expand Down Expand Up @@ -187,11 +185,10 @@ async def getitem(
raise KeyError(key)

if self.metadata.zarr_format == 3:
zarr_json_bytes = await (store_path / ZARR_JSON).get()
if zarr_json_bytes is None:
zarr_json = await (store_path / ZARR_JSON).get_metadata()
if zarr_json is None:
raise KeyError(key)
else:
zarr_json = json.loads(zarr_json_bytes)

if zarr_json["node_type"] == "group":
return type(self).from_dict(store_path, zarr_json, self.runtime_configuration)
elif zarr_json["node_type"] == "array":
Expand All @@ -203,19 +200,18 @@ async def getitem(
elif self.metadata.zarr_format == 2:
# Q: how do we like optimistically fetching .zgroup, .zarray, and .zattrs?
# This guarantees that we will always make at least one extra request to the store
zgroup_bytes, zarray_bytes, zattrs_bytes = await asyncio.gather(
(store_path / ZGROUP_JSON).get(),
(store_path / ZARRAY_JSON).get(),
(store_path / ZATTRS_JSON).get(),
zgroup, zarray, zattrs = await asyncio.gather(
(store_path / ZGROUP_JSON).get_metadata(),
(store_path / ZARRAY_JSON).get_metadata(),
(store_path / ZATTRS_JSON).get_metadata(),
)

if zgroup_bytes is None and zarray_bytes is None:
if zgroup is None and zarray is None:
raise KeyError(key)

# unpack the zarray, if this is None then we must be opening a group
zarray = json.loads(zarray_bytes) if zarray_bytes else None
# unpack the zattrs, this can be None if no attrs were written
zattrs = json.loads(zattrs_bytes) if zattrs_bytes is not None else {}
if zattrs is None:
zattrs = {}

if zarray is not None:
# TODO: update this once the V2 array support is part of the primary array class
Expand All @@ -224,11 +220,8 @@ async def getitem(
store_path, zarray, runtime_configuration=self.runtime_configuration
)
else:
zgroup = (
json.loads(zgroup_bytes)
if zgroup_bytes is not None
else {"zarr_format": self.metadata.zarr_format}
)
if zgroup is None:
zgroup = {}
zarr_json = {**zgroup, "attributes": zattrs}
return type(self).from_dict(store_path, zarr_json, self.runtime_configuration)
else:
Expand All @@ -247,8 +240,8 @@ async def delitem(self, key: str) -> None:
raise ValueError(f"unexpected zarr_format: {self.metadata.zarr_format}")

async def _save_metadata(self) -> None:
to_save = self.metadata.to_bytes()
awaitables = [(self.store_path / key).set(value) for key, value in to_save.items()]
to_save = self.metadata.to_meta_dict()
awaitables = [(self.store_path / key).set_metadata(value) for key, value in to_save.items()]
await asyncio.gather(*awaitables)

@property
Expand Down Expand Up @@ -281,12 +274,12 @@ async def update_attributes(self, new_attributes: dict[str, Any]):
self.metadata.attributes.update(new_attributes)

# Write new metadata
to_save = self.metadata.to_bytes()
to_save = self.metadata.to_meta_dict()
if self.metadata.zarr_format == 2:
# only save the .zattrs object
await (self.store_path / ZATTRS_JSON).set(to_save[ZATTRS_JSON])
await (self.store_path / ZATTRS_JSON).set_metadata(to_save[ZATTRS_JSON])
else:
await (self.store_path / ZARR_JSON).set(to_save[ZARR_JSON])
await (self.store_path / ZARR_JSON).set_metadata(to_save[ZARR_JSON])

self.metadata.attributes.clear()
self.metadata.attributes.update(new_attributes)
Expand Down Expand Up @@ -460,7 +453,7 @@ async def update_attributes_async(self, new_attributes: dict[str, Any]) -> Group
new_metadata = replace(self.metadata, attributes=new_attributes)

# Write new metadata
to_save = new_metadata.to_bytes()
to_save = new_metadata.to_meta_dict()
awaitables = [(self.store_path / key).set(value) for key, value in to_save.items()]
await asyncio.gather(*awaitables)

Expand Down
Loading