Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
73d53d7
Fixed MemoryStore.list_dir
TomAugspurger Aug 25, 2024
90940a0
fixup s3
TomAugspurger Aug 25, 2024
8ee89f4
recursive Group.members
TomAugspurger Aug 25, 2024
65a8bd4
Zarr-v3 Consolidated Metadata
TomAugspurger Aug 23, 2024
2515ca3
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Sep 3, 2024
cdaf81f
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Sep 6, 2024
5a86789
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Sep 6, 2024
a839f16
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Sep 10, 2024
5a31390
fixup
TomAugspurger Sep 10, 2024
79bf235
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Sep 11, 2024
fc901eb
read zarr-v2 consolidated metadata
TomAugspurger Sep 11, 2024
3a3eb9d
check writablem
TomAugspurger Sep 12, 2024
78af362
Handle non-root paths
TomAugspurger Sep 12, 2024
750668c
Some error handling
TomAugspurger Sep 12, 2024
63697ab
cleanup
TomAugspurger Sep 12, 2024
5d79274
refactor open
TomAugspurger Sep 12, 2024
0c67972
remove dupe file
TomAugspurger Sep 12, 2024
657ad1e
v2 getitem
TomAugspurger Sep 12, 2024
511ff76
fixup
TomAugspurger Sep 12, 2024
b360eb4
Optimzied members
TomAugspurger Sep 12, 2024
abcdbe6
Impl flatten
TomAugspurger Sep 12, 2024
b9bcfe8
Fixups
TomAugspurger Sep 13, 2024
3575cda
doc
TomAugspurger Sep 13, 2024
7b6bd17
nest the tests
TomAugspurger Sep 13, 2024
500a91e
fixup
TomAugspurger Sep 13, 2024
22d501e
Fixups
TomAugspurger Sep 13, 2024
762cf96
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Sep 13, 2024
d6c6cc7
fixup
TomAugspurger Sep 13, 2024
6755fbc
fixup
TomAugspurger Sep 13, 2024
e406f86
fixup
TomAugspurger Sep 13, 2024
07248ea
fixup
TomAugspurger Sep 13, 2024
bdf15ad
fixup
TomAugspurger Sep 13, 2024
18eb172
consistent open_consolidated handling
TomAugspurger Sep 16, 2024
c11f1ad
fixup
TomAugspurger Sep 16, 2024
f6397f4
make clear that flat_to_nested mutates
TomAugspurger Sep 16, 2024
f55aa37
fixujp
TomAugspurger Sep 16, 2024
123dc60
fixup
TomAugspurger Sep 16, 2024
34c7720
fixup
TomAugspurger Sep 17, 2024
4db042b
Fixup
TomAugspurger Sep 17, 2024
8febba3
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Sep 19, 2024
d730350
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Sep 20, 2024
a1f1ebb
fixup
TomAugspurger Sep 20, 2024
35a3832
fixup
TomAugspurger Sep 20, 2024
c1837fd
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Sep 20, 2024
d03f4bd
fixup
TomAugspurger Sep 20, 2024
cddd01f
fixup
TomAugspurger Sep 20, 2024
9303cd0
added docs
TomAugspurger Sep 20, 2024
87b65f1
fixup
TomAugspurger Sep 20, 2024
ee5d130
Ensure empty dict
TomAugspurger Sep 23, 2024
af9788f
fixed name
TomAugspurger Sep 23, 2024
5a08466
fixup nested
TomAugspurger Sep 23, 2024
d236e53
removed dupe tests
TomAugspurger Sep 23, 2024
2824de6
fixup
TomAugspurger Sep 23, 2024
08a7682
doc fix
TomAugspurger Sep 23, 2024
b8b5f51
fixups
TomAugspurger Sep 24, 2024
ba4fb47
fixup
TomAugspurger Sep 24, 2024
10d062f
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Sep 24, 2024
e6142d8
fixup
TomAugspurger Sep 24, 2024
8ad3738
v2 writer
TomAugspurger Sep 24, 2024
fc94933
fixup
TomAugspurger Sep 24, 2024
79246dd
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Sep 25, 2024
a62240b
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Sep 28, 2024
4bfad1b
fixup
TomAugspurger Sep 28, 2024
ae02bb5
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Sep 30, 2024
3265abd
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Oct 1, 2024
8728440
path fix
TomAugspurger Oct 1, 2024
20c97a4
Fixed v2 use_consolidated=False
TomAugspurger Oct 1, 2024
f7e5b3f
fixupg
TomAugspurger Oct 1, 2024
c31f8a1
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Oct 7, 2024
483681b
Special case object dtype
TomAugspurger Oct 9, 2024
7e76e9e
fixup
TomAugspurger Oct 9, 2024
19b9271
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Oct 9, 2024
418bc6b
Merge branch 'tom/fix/dtype-str-special-case' into user/tom/feature/c…
TomAugspurger Oct 9, 2024
97fa2a0
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Oct 9, 2024
6fab362
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Oct 10, 2024
cbffcbb
docs
TomAugspurger Oct 10, 2024
56d2704
pr review
TomAugspurger Oct 10, 2024
8ade87d
must_understand
TomAugspurger Oct 10, 2024
b5fb721
Updated from_dict checking
TomAugspurger Oct 10, 2024
d17f955
cleanup
TomAugspurger Oct 10, 2024
1d17140
cleanup
TomAugspurger Oct 10, 2024
2b2e3da
Fixed fill_value
TomAugspurger Oct 10, 2024
96b274c
Merge remote-tracking branch 'upstream/v3' into user/tom/feature/cons…
TomAugspurger Oct 10, 2024
c9229d1
fixup
TomAugspurger Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import dataclasses
import warnings
from collections.abc import Iterable
from typing import Any, Literal, Union, cast
Expand All @@ -12,8 +13,14 @@
from zarr.core.array import Array, AsyncArray
from zarr.core.buffer import NDArrayLike
from zarr.core.chunk_key_encodings import ChunkKeyEncoding
from zarr.core.common import JSON, AccessModeLiteral, ChunkCoords, MemoryOrder, ZarrFormat
from zarr.core.group import AsyncGroup
from zarr.core.common import (
JSON,
AccessModeLiteral,
ChunkCoords,
MemoryOrder,
ZarrFormat,
)
from zarr.core.group import AsyncGroup, ConsolidatedMetadata
from zarr.core.metadata import ArrayV2Metadata, ArrayV3Metadata
from zarr.store import (
StoreLike,
Expand Down Expand Up @@ -126,8 +133,38 @@ def _default_zarr_version() -> ZarrFormat:
return 3


async def consolidate_metadata(*args: Any, **kwargs: Any) -> AsyncGroup:
raise NotImplementedError
async def consolidate_metadata(store: StoreLike) -> AsyncGroup:
"""
Consolidate the metadata of all nodes in a hierarchy.

Upon completion, the metadata of the root node in the Zarr hierarchy will be
updated to include all the metadata of child nodes.

Parameters
----------
store: StoreLike
The store-like object whose metadata you wish to consolidate.

Returns
-------
group: AsyncGroup
The group, with the ``consolidated_metadata`` field set to include
the metadata of each child node.
"""
group = await AsyncGroup.open(store)
members = dict([x async for x in group.members(recursive=True)])
members_metadata = {}

members_metadata = {k: v.metadata for k, v in members.items()}

consolidated_metadata = ConsolidatedMetadata(metadata=members_metadata)
metadata = dataclasses.replace(group.metadata, consolidated_metadata=consolidated_metadata)
group = dataclasses.replace(
group,
metadata=metadata,
)
await group._save_metadata()
return group


async def copy(*args: Any, **kwargs: Any) -> tuple[int, int, int]:
Expand Down Expand Up @@ -229,7 +266,7 @@ async def open(


async def open_consolidated(*args: Any, **kwargs: Any) -> AsyncGroup:
raise NotImplementedError
return await open_group(*args, **kwargs)


async def save(
Expand Down Expand Up @@ -703,7 +740,9 @@ async def create(
)
else:
warnings.warn(
"dimension_separator is not yet implemented", RuntimeWarning, stacklevel=2
"dimension_separator is not yet implemented",
RuntimeWarning,
stacklevel=2,
)
if write_empty_chunks:
warnings.warn("write_empty_chunks is not yet implemented", RuntimeWarning, stacklevel=2)
Expand Down
7 changes: 4 additions & 3 deletions src/zarr/codecs/crc32c_.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

import numpy as np
import typing_extensions
from crc32c import crc32c

from zarr.abc.codec import BytesBytesCodec
Expand Down Expand Up @@ -37,7 +38,7 @@ async def _decode_single(
crc32_bytes = data[-4:]
inner_bytes = data[:-4]

computed_checksum = np.uint32(crc32c(inner_bytes)).tobytes()
computed_checksum = np.uint32(crc32c(cast(typing_extensions.Buffer, inner_bytes))).tobytes()
stored_checksum = bytes(crc32_bytes)
if computed_checksum != stored_checksum:
raise ValueError(
Expand All @@ -52,7 +53,7 @@ async def _encode_single(
) -> Buffer | None:
data = chunk_bytes.as_numpy_array()
# Calculate the checksum and "cast" it to a numpy array
checksum = np.array([crc32c(data)], dtype=np.uint32)
checksum = np.array([crc32c(cast(typing_extensions.Buffer, data))], dtype=np.uint32)
# Append the checksum (as bytes) to the data
return chunk_spec.prototype.buffer.from_array_like(np.append(data, checksum.view("b")))

Expand Down
8 changes: 7 additions & 1 deletion src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,15 @@ async def open(
else:
# V3 arrays are comprised of a zarr.json object
assert zarr_json_bytes is not None
zarr_metadata = json.loads(zarr_json_bytes.to_bytes())
if zarr_metadata.get("node_type") != "array":
# This KeyError is load bearing for `open`. That currently tries
# to open the node as an `array` and then falls back to opening
# as a group.
raise KeyError
return cls(
store_path=store_path,
metadata=ArrayV3Metadata.from_dict(json.loads(zarr_json_bytes.to_bytes())),
metadata=ArrayV3Metadata.from_dict(zarr_metadata),
)

@property
Expand Down
23 changes: 23 additions & 0 deletions src/zarr/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
overload,
)

import numcodecs

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Iterator

Expand Down Expand Up @@ -167,3 +169,24 @@ def parse_order(data: Any) -> Literal["C", "F"]:
if data in ("C", "F"):
return cast(Literal["C", "F"], data)
raise ValueError(f"Expected one of ('C', 'F'), got {data} instead.")


def _json_convert(o: Any) -> Any:
if isinstance(o, np.dtype):
return str(o)
if np.isscalar(o):
# convert numpy scalar to python type, and pass
# python types through
out = getattr(o, "item", lambda: o)()
if isinstance(out, complex):
# python complex types are not JSON serializable, so we use the
# serialization defined in the zarr v3 spec
return [out.real, out.imag]
return out
if isinstance(o, Enum):
return o.name
# this serializes numcodecs compressors
# todo: implement to_dict for codecs
elif isinstance(o, numcodecs.abc.Codec):
config: dict[str, Any] = o.get_config()
return config
124 changes: 110 additions & 14 deletions src/zarr/core/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
ZGROUP_JSON,
ChunkCoords,
ZarrFormat,
_json_convert,
)
from zarr.core.config import config
from zarr.core.metadata import ArrayMetadata, ArrayV3Metadata
from zarr.core.sync import SyncMixin, sync
from zarr.store import StoreLike, StorePath, make_store_path
from zarr.store.common import ensure_no_existing_node
Expand Down Expand Up @@ -77,18 +79,65 @@ def _parse_async_node(node: AsyncArray | AsyncGroup) -> Array | Group:
raise TypeError(f"Unknown node type, got {type(node)}")


@dataclass(frozen=True)
class ConsolidatedMetadata:
metadata: dict[str, ArrayMetadata | GroupMetadata]
kind: Literal["inline"] = "inline"
must_understand: Literal[False] = False

def to_dict(self) -> dict[str, JSON]:
return {
"kind": self.kind,
"must_understand": self.must_understand,
"metadata": {k: v.to_dict() for k, v in self.metadata.items()},
}

@classmethod
def from_dict(cls, data: dict[str, JSON]) -> ConsolidatedMetadata:
data = dict(data)
raw_metadata = data.get("metadata")
if not isinstance(raw_metadata, dict):
raise TypeError("Unexpected type for 'metadata'")

elif not raw_metadata:
raise ValueError("Must specify metadata")

metadata: dict[str, ArrayMetadata | GroupMetadata]
if raw_metadata:
metadata = {}
for k, v in raw_metadata.items():
if not isinstance(v, dict):
raise TypeError(f"Invalid value for metadata items. key={k}, type={type(v)}")

node_type = v.get("node_type", None)
if node_type == "group":
metadata[k] = GroupMetadata.from_dict(v)
elif node_type == "array":
metadata[k] = ArrayV3Metadata.from_dict(v)
else:
raise ValueError(f"Invalid node_type: '{node_type}'")
# assert data["kind"] == "inline"
if data["kind"] != "inline":
raise ValueError

if data["must_understand"] is not False:
raise ValueError
return cls(metadata=metadata)


@dataclass(frozen=True)
class GroupMetadata(Metadata):
attributes: dict[str, Any] = field(default_factory=dict)
zarr_format: ZarrFormat = 3
consolidated_metadata: ConsolidatedMetadata | None = None
node_type: Literal["group"] = field(default="group", init=False)

def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]:
json_indent = config.get("json_indent")
if self.zarr_format == 3:
return {
ZARR_JSON: prototype.buffer.from_bytes(
json.dumps(self.to_dict(), indent=json_indent).encode()
json.dumps(self.to_dict(), default=_json_convert, indent=json_indent).encode()
)
}
else:
Expand All @@ -101,20 +150,33 @@ def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]:
),
}

def __init__(self, attributes: dict[str, Any] | None = None, zarr_format: ZarrFormat = 3):
def __init__(
self,
attributes: dict[str, Any] | None = None,
zarr_format: ZarrFormat = 3,
consolidated_metadata: ConsolidatedMetadata | None = None,
):
attributes_parsed = parse_attributes(attributes)
zarr_format_parsed = parse_zarr_format(zarr_format)

object.__setattr__(self, "attributes", attributes_parsed)
object.__setattr__(self, "zarr_format", zarr_format_parsed)
object.__setattr__(self, "consolidated_metadata", consolidated_metadata)

@classmethod
def from_dict(cls, data: dict[str, Any]) -> GroupMetadata:
data = dict(data)
assert data.pop("node_type", None) in ("group", None)
consolidated_metadata = data.pop("consolidated_metadata", None)
if consolidated_metadata:
data["consolidated_metadata"] = ConsolidatedMetadata.from_dict(consolidated_metadata)
return cls(**data)

def to_dict(self) -> dict[str, Any]:
return asdict(self)
result = asdict(replace(self, consolidated_metadata=None))
if self.consolidated_metadata:
result["consolidated_metadata"] = self.consolidated_metadata.to_dict()
return result


@dataclass(frozen=True)
Expand Down Expand Up @@ -424,20 +486,43 @@ async def update_attributes(self, new_attributes: dict[str, Any]) -> AsyncGroup:
def __repr__(self) -> str:
return f"<AsyncGroup {self.store_path}>"

async def nmembers(self) -> int:
async def nmembers(self, recursive: bool = False) -> int:
"""
Count the number of members in this group.

Parameters
----------
recursive : bool, default False
Whether to recursively count arrays and groups in child groups of
this Group. By default, just immediate child array and group members
are counted.

Returns
-------
count : int
"""
# TODO: consider using aioitertools.builtins.sum for this
# return await aioitertools.builtins.sum((1 async for _ in self.members()), start=0)
n = 0
async for _ in self.members():
async for _ in self.members(recursive=recursive):
n += 1
return n

async def members(self) -> AsyncGenerator[tuple[str, AsyncArray | AsyncGroup], None]:
async def members(
self, recursive: bool = False
) -> AsyncGenerator[tuple[str, AsyncArray | AsyncGroup], None]:
"""
Returns an AsyncGenerator over the arrays and groups contained in this group.
This method requires that `store_path.store` supports directory listing.

The results are not guaranteed to be ordered.

Parameters
----------
recursive : bool, default False
Whether to recursively include arrays and groups in child groups of
this Group. By default, just immediate child array and group members
are included.
"""
if not self.store_path.store.supports_listing:
msg = (
Expand All @@ -456,13 +541,26 @@ async def members(self) -> AsyncGenerator[tuple[str, AsyncArray | AsyncGroup], N
if key in _skip_keys:
continue
try:
yield (key, await self.getitem(key))
obj = await self.getitem(key)
yield (key, obj)

if (
recursive
and hasattr(obj.metadata, "node_type")
and obj.metadata.node_type == "group"
):
# the assert is just for mypy to know that `obj.metadata.node_type`
# implies an AsyncGroup, not an AsyncArray
assert isinstance(obj, AsyncGroup)
async for child_key, val in obj.members(recursive=recursive):
yield "/".join([key, child_key]), val
except KeyError:
# keyerror is raised when `key` names an object (in the object storage sense),
# as opposed to a prefix, in the store under the prefix associated with this group
# in which case `key` cannot be the name of a sub-array or sub-group.
logger.warning(
"Object at %s is not recognized as a component of a Zarr hierarchy.", key
"Object at %s is not recognized as a component of a Zarr hierarchy.",
key,
)

async def contains(self, member: str) -> bool:
Expand Down Expand Up @@ -628,17 +726,15 @@ def update_attributes(self, new_attributes: dict[str, Any]) -> Group:
self._sync(self._async_group.update_attributes(new_attributes))
return self

@property
def nmembers(self) -> int:
return self._sync(self._async_group.nmembers())
def nmembers(self, recursive: bool = False) -> int:
return self._sync(self._async_group.nmembers(recursive=recursive))

@property
def members(self) -> tuple[tuple[str, Array | Group], ...]:
def members(self, recursive: bool = False) -> tuple[tuple[str, Array | Group], ...]:
"""
Return the sub-arrays and sub-groups of this group as a tuple of (name, array | group)
pairs
"""
_members = self._sync_iter(self._async_group.members())
_members = self._sync_iter(self._async_group.members(recursive=recursive))

result = tuple(map(lambda kv: (kv[0], _parse_async_node(kv[1])), _members))
return result
Expand Down
Loading