Source code for zarr_vectors.core.group

"""Backend-agnostic group abstraction wrapping a :class:`zarr.Group`.

This is the format seam: all ZV array I/O routes through this class.
The underlying Zarr store can be any :class:`zarr.abc.store.Store` —
``LocalStore``, ``MemoryStore``, ``FsspecStore``, ``ObjectStore``,
``IcechunkStore``.

Per-chunk byte blobs are stored as tiny single-chunk 1D ``uint8`` Zarr
arrays under a per-array Zarr group (Option G in the design doc):

    level/vertices/0.1.2          → Zarr 1D uint8 array, shape=(N,), chunks=(N,)
    level/vertex_fragments/0.1.2  → likewise
    level/object_index/data       → likewise (one blob per slot)

Per-array metadata (the ``zv_array`` discriminator and friends) lives on
the *group* node (``vertices/.attrs`` in v3 maps to ``zarr.json``
``attributes``).

Public surface mirrors the legacy :class:`FsGroup` for back-compat:

* ``attrs`` — dict-like access to this group's attributes
* ``create_group`` / ``require_group`` / ``__getitem__`` / ``__contains__``
  / ``__iter__`` — hierarchy navigation (sub-groups only)
* ``write_bytes`` / ``read_bytes`` / ``chunk_exists`` / ``list_chunks`` —
  per-chunk byte I/O
* ``write_array_meta`` / ``read_array_meta`` / ``array_exists`` —
  per-array metadata
* ``path`` — :class:`pathlib.Path` when the backing store is a Zarr
  ``LocalStore``, raises otherwise (use ``url`` instead)
"""

from __future__ import annotations

import warnings
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Iterator, Sequence

import numpy as np
import zarr
from zarr.codecs import VLenBytesCodec
from zarr.errors import UnstableSpecificationWarning
from zarr.storage import LocalStore

from zarr_vectors.exceptions import StoreError


[docs] class Group: """A ZV group wrapping an underlying :class:`zarr.Group`.""" # Class-level defaults so callers that build a Group via ``__new__`` # (see ``create_store`` / ``open_store``) start with batching off # without needing to remember to set the attributes. _pending_writes: list[tuple[str, str, bytes]] | None = None _pending_array_metas: dict[str, dict[str, Any]] | None = None _prefetch_cache: dict[tuple[str, str], bytes] | None = None # Active codec spec for chunk-array writes, set by # :meth:`batched_writes(compressor=...)`. ``None`` means callers fall # back to zarr v3's default codec pipeline (``bytes`` + ``zstd``). # Consumed by :meth:`write_bytes` and the batched flush in # :mod:`zarr_vectors.core._batch_writer`. _active_codecs: list[dict[str, Any]] | None = None # When set, per-chunk-array creations route through the native # ``sharding_indexed`` codec instead of the legacy Option-G layout. # Set by :meth:`native_sharded_arrays`; the dict holds ``shard_shape`` # and ``grid_shape``. Used by ``arrays._ensure_array_dir`` and the # ``write_bytes`` / ``write_array_meta`` dispatch on this class. _native_sharded_config: dict[str, tuple[int, ...]] | None = None
[docs] def __init__(self, zarr_group: zarr.Group) -> None: self._zarr = zarr_group # Deferred-write queues activated by :meth:`batched_writes`. # When set, :meth:`write_bytes` appends to ``_pending_writes`` # and :meth:`write_array_meta` appends to # ``_pending_array_metas``; both flush in one ``asyncio.gather`` # against the underlying Store on context exit. self._pending_writes = None self._pending_array_metas = None # Prefetch cache activated by :meth:`batched_reads`. When set, # :meth:`read_bytes` looks here first before hitting the store. self._prefetch_cache = None self._active_codecs = None self._native_sharded_config = None
@classmethod def _from_zarr(cls, zarr_group: zarr.Group) -> Group: instance = cls.__new__(cls) instance._zarr = zarr_group instance._pending_writes = None instance._pending_array_metas = None instance._prefetch_cache = None instance._active_codecs = None return instance @classmethod def _from_backend(cls, store_or_shim: Any, prefix: str = "") -> Group: """Build a Group from a Zarr store (or a legacy ``_BackendShim``). Kept for back-compat with callers in :mod:`zarr_vectors.lazy` that resurrect a root-level Group from a stored backend handle. Always returns a write-capable handle — a read-only store reference (left over from an ``open_store(mode='r')`` flow) is unwrapped via ``store.with_read_only(False)``. """ store = store_or_shim._store if isinstance(store_or_shim, _BackendShim) else store_or_shim if getattr(store, "read_only", False) and hasattr(store, "with_read_only"): store = store.with_read_only(False) path = "/" + prefix.strip("/") if prefix else "/" zg = zarr.open_group(store, path=path, mode="r+") return cls._from_zarr(zg) # ---------------- attributes ---------------- @property def attrs(self) -> _Attrs: return _Attrs(self._zarr.attrs) # ---------------- sub-groups ----------------
[docs] def create_group(self, name: str, **_kwargs: Any) -> Group: zg = self._zarr.require_group(name) return type(self)._from_zarr(zg)
[docs] def require_group(self, name: str) -> Group: zg = self._zarr.require_group(name) return type(self)._from_zarr(zg)
def __getitem__(self, key: str) -> Group: try: node = self._zarr[key] except KeyError: raise StoreError( f"Group {key!r} not found under {self._zarr.path or '<root>'}" ) from None if not isinstance(node, zarr.Group): raise StoreError( f"{key!r} under {self._zarr.path or '<root>'} is a " f"{type(node).__name__}, not a Group" ) return type(self)._from_zarr(node) def __contains__(self, key: str) -> bool: return key in self._zarr def __iter__(self) -> Iterator[str]: yield from sorted(self._zarr.group_keys())
[docs] def children(self) -> list[str]: """Return every immediate child name — both arrays and groups. Use this when iterating a parent path that may contain a mix of Option-G chunk-group children (vertex / link attributes) and flat-array children (object / group attributes after the 0.8.1 migration). ``__iter__`` deliberately yields groups only to preserve back-compat for callers that predate the migration. """ return sorted( set(self._zarr.group_keys()) | set(self._zarr.array_keys()) )
# ---------------- chunk I/O ------------------------------------------ # # Two physical layouts share this surface: # # * **Legacy "Option G"** — ``<array_name>`` is a Zarr Group whose # children are single-chunk ``uint8`` arrays, one per chunk key. # * **Native sharded** — ``<array_name>`` is a single multidim Zarr # array (vlen-bytes) whose shape is the level's chunk grid; the # Zarr v3 ``sharding_indexed`` codec packs many cells into one # storage object. Created via :meth:`create_sharded_chunk_array` # or by the migration tool in :mod:`zarr_vectors.sharding.io`. # # ``read_bytes`` / ``write_bytes`` / ``chunk_exists`` / ``list_chunks`` # dispatch on the node type at runtime so callers don't care which # layout they're talking to.
[docs] def write_bytes(self, array_name: str, chunk_key: str, data: bytes) -> None: # Native-sharded fast path: a single Zarr Array at ``array_name`` # means we're talking to the standard ``sharding_indexed`` layout. # The chunk's bytes live in one cell of that multidim vlen-bytes # array; the sharding codec packs many such cells per shard file. # Checked BEFORE the batched-write queue so that writers running # under :meth:`native_sharded_arrays` always reach the array # cells, even if an outer :meth:`batched_writes` is open. sharded_arr = self._sharded_chunk_array(array_name) if sharded_arr is not None: coords = _parse_chunk_coords(chunk_key) if coords is None: raise StoreError( f"Cannot write to native-sharded array {array_name!r}: " f"chunk_key {chunk_key!r} is not a coord tuple" ) _check_coords_in_bounds(coords, sharded_arr.shape, array_name) _vlen_set_cell(sharded_arr, coords, bytes(data)) _record_nonempty_chunk(sharded_arr, chunk_key, present=bool(data)) return # Batched-write mode (see :meth:`batched_writes`): defer until # the context manager flushes all queued PUTs concurrently. # Batched writes target the legacy Option-G layout only. if self._pending_writes is not None: self._pending_writes.append((array_name, chunk_key, bytes(data))) return arr_group = self._zarr.require_group(array_name) if chunk_key in arr_group: del arr_group[chunk_key] # When a session compressor is active we pass an explicit codec # list to ``create_array``; otherwise zarr 3.x's default applies # (which is ``bytes`` + ``zstd``). See # :func:`zarr_vectors.encoding.compression.resolve_compressor`. from zarr_vectors.encoding.compression import codecs_for_create_array extra_kwargs: dict[str, Any] = {} if self._active_codecs is not None: extra_kwargs["compressors"] = codecs_for_create_array( self._active_codecs ) n = len(data) if n == 0: arr_group.create_array( chunk_key, shape=(0,), chunks=(1,), dtype="uint8", **extra_kwargs, ) return a = arr_group.create_array( chunk_key, shape=(n,), chunks=(n,), dtype="uint8", **extra_kwargs, ) a[:] = np.frombuffer(data, dtype="uint8")
[docs] @contextmanager def batched_reads( self, plan: list[tuple[str, list[str]]], ) -> Iterator[None]: """Prefetch every chunk in ``plan`` via one :func:`asyncio.gather` and serve subsequent :meth:`read_bytes` calls from the resulting in-memory cache. ``plan`` is a list of ``(array_name, [chunk_keys, ...])`` pairs — typically ``(VERTICES, list_chunk_keys(group, VERTICES))`` plus the parallel ``vertex_fragments`` and per-attribute arrays. On entry every (array_name, chunk_key) pair is fetched in a single async gather; on exit the cache is dropped. Reads for a key NOT in the plan fall through to the sync :meth:`read_bytes` path, so under-specifying the plan degrades performance gracefully (still correct). Use for chunk-heavy read loops against high-latency object stores (GCS / S3 / Azure). Each per-chunk GET becomes one async task instead of one serial sync call, so the total wall time approaches one round-trip rather than ``N`` round-trips. Nesting is not supported and raises :class:`StoreError`. Writes inside the block are unaffected. Example:: chunk_keys = list_chunk_keys(level_group, VERTICES) with level_group.batched_reads([ (VERTICES, chunk_keys), (VERTEX_FRAGMENTS, chunk_keys), *((f"{VERTEX_ATTRIBUTES}/{a}", chunk_keys) for a in attrs), ]): for cc in chunk_keys: fragments = read_chunk_vertices(level_group, cc, ...) """ if self._prefetch_cache is not None: raise StoreError("batched_reads() does not support nesting") from zarr_vectors.core._batch_reader import flush_prefetch self._prefetch_cache = flush_prefetch(self._zarr, plan) try: yield finally: self._prefetch_cache = None
[docs] @contextmanager def batched_writes(self, compressor: Any = None) -> Iterator[None]: """Defer every :meth:`write_bytes` and :meth:`write_array_meta` call inside the block and flush them in a single :func:`asyncio.gather` on exit. Use for chunk-heavy write loops against high-latency object stores (GCS / S3 / Azure). Each per-chunk PUT and each per-array ``zarr.json`` PUT becomes one async task instead of one serial sync call, so the total wall time approaches one round-trip rather than ``N`` round-trips. Args: compressor: Codec selection applied to every chunk array written inside the block. See :func:`zarr_vectors.encoding.compression.resolve_compressor` for accepted values; the default ``None`` resolves to zarr v3's default (``bytes`` + ``zstd``). # TODO: per-array-type codec dict (vertices vs fragments # vs links) — future work; today every chunk gets the # same codec. Nesting is not supported and raises :class:`StoreError`. Reads inside the block are unaffected and execute synchronously. Example:: with level_group.batched_writes(): create_vertices_array(level_group, dtype="float32") create_attribute_array(level_group, "intensity") for cc in chunk_coords: write_chunk_vertices(level_group, cc, ...) write_chunk_attributes(level_group, "intensity", cc, ...) # exit point: every PUT scheduled above flushes in parallel """ if self._pending_writes is not None: raise StoreError("batched_writes() does not support nesting") from zarr_vectors.encoding.compression import resolve_compressor codecs = resolve_compressor(compressor) self._pending_writes = [] self._pending_array_metas = {} self._active_codecs = codecs try: yield pending_writes = self._pending_writes pending_metas = self._pending_array_metas self._pending_writes = None self._pending_array_metas = None if pending_writes or pending_metas: # Lazy import to avoid pulling the asyncio/zarr-sync # machinery into the import path of every Group caller. from zarr_vectors.core._batch_writer import flush_batch flush_batch( self._zarr, pending_writes, array_metas=pending_metas, codecs=codecs, ) finally: # On normal exit the queues are already None. On an # exception, drop them so the Group stays usable. self._pending_writes = None self._pending_array_metas = None self._active_codecs = None
[docs] @contextmanager def native_sharded_arrays( self, shard_shape: tuple[int, ...], grid_shape: tuple[int, ...], ) -> Iterator[None]: """Make subsequent per-chunk-array creations native-sharded. Inside this block, every :func:`zarr_vectors.core.arrays._ensure_array_dir` call for a per-spatial-chunk array (vertices, vertex_fragments, links/<delta>, link_fragments, vertex_attributes/<name>, fragment_attributes/<name>) allocates a single multidim vlen-bytes Zarr array at that path using Zarr v3's ``sharding_indexed`` codec — instead of the legacy Option-G group-of-tiny-arrays. Subsequent :meth:`write_bytes` calls go through the grid-coord write dispatch on this class, so callers don't need to change. Args: shard_shape: Outer-chunk shape in *inner-chunk* units, e.g. ``(8, 8, 8)``. Length must match ``grid_shape``. grid_shape: Number of spatial chunks along each axis at this level. Compute via :func:`zarr_vectors.spatial.chunking.compute_grid_shape`. Nesting is not supported; an inner call raises :class:`StoreError`. The legacy ``batched_writes`` context can be active concurrently — writes still route through the sharded path (the batched queue's flush is a no-op when nothing is queued). """ if self._native_sharded_config is not None: raise StoreError( "native_sharded_arrays() does not support nesting" ) if len(shard_shape) != len(grid_shape): raise StoreError( f"shard_shape rank {len(shard_shape)} != grid_shape " f"rank {len(grid_shape)}" ) self._native_sharded_config = { "shard_shape": tuple(int(s) for s in shard_shape), "grid_shape": tuple(int(g) for g in grid_shape), } try: yield finally: self._native_sharded_config = None
[docs] def read_bytes(self, array_name: str, chunk_key: str) -> bytes: # Batched-read mode (see :meth:`batched_reads`): serve from the # prefetch cache when possible. Cache misses fall through to # the sync path below — useful when a caller under-specifies # the plan or hits an array the prefetch skipped. if self._prefetch_cache is not None: cached = self._prefetch_cache.get((array_name, chunk_key)) if cached is not None: return cached sharded_arr = self._sharded_chunk_array(array_name) if sharded_arr is not None: coords = _parse_chunk_coords(chunk_key) if coords is None or not _coords_in_bounds(coords, sharded_arr.shape): raise StoreError( f"Chunk {array_name!r}/{chunk_key!r} not found in " f"{self._zarr.path or '<root>'}" ) return _vlen_get_cell(sharded_arr, coords) path = f"{array_name}/{chunk_key}" try: arr = self._zarr[path] except KeyError: raise StoreError( f"Chunk {array_name!r}/{chunk_key!r} not found in " f"{self._zarr.path or '<root>'}" ) from None if not isinstance(arr, zarr.Array): raise StoreError( f"{path!r} is a {type(arr).__name__}, not an Array" ) if arr.shape[0] == 0: return b"" return bytes(np.asarray(arr[:]).tobytes())
[docs] def chunk_exists(self, array_name: str, chunk_key: str) -> bool: sharded_arr = self._sharded_chunk_array(array_name) if sharded_arr is not None: present = sharded_arr.attrs.get(_NONEMPTY_CHUNKS_ATTR) if present is not None: return chunk_key in present # Fall back to inspecting the cell — slow path used when the # presence manifest is missing (e.g. mid-migration). coords = _parse_chunk_coords(chunk_key) if coords is None or not _coords_in_bounds(coords, sharded_arr.shape): return False return _vlen_get_cell(sharded_arr, coords) != b"" return f"{array_name}/{chunk_key}" in self._zarr
[docs] def list_chunks(self, array_name: str) -> list[str]: if array_name not in self._zarr: return [] try: node = self._zarr[array_name] except KeyError: return [] if isinstance(node, zarr.Array): # Native-sharded layout. Trust the per-array presence # manifest written by ``write_bytes``; without it we'd have # to fetch every shard index to find non-empty cells. present = node.attrs.get(_NONEMPTY_CHUNKS_ATTR) return sorted(present) if present else [] if not isinstance(node, zarr.Group): return [] return sorted(node.array_keys())
# ---------------- array metadata ----------------
[docs] def write_array_meta(self, array_name: str, meta: dict[str, Any]) -> None: # Native-sharded fast path: write directly to the multidim # array's ``attrs`` (the Zarr-spec location for per-array user # metadata). Checked BEFORE the batched queue so that writers # under :meth:`native_sharded_arrays` always land on the array. sharded_arr = self._sharded_chunk_array(array_name) if sharded_arr is not None: sharded_arr.attrs.update(_json_safe(meta)) return # Batched-write mode (see :meth:`batched_writes`): queue the # full parent-group ``zarr.json`` content so it flushes in the # gather instead of paying a per-array sync ``require_group + # attrs.update`` (which costs 2-3 round-trips each on cloud). # Merge with anything already queued for the same name so # successive ``write_array_meta`` calls within the block # compose, matching the existing ``attrs.update`` semantics. if self._pending_array_metas is not None: existing = self._pending_array_metas.get(array_name, {}) merged = {**existing, **_json_safe(meta)} self._pending_array_metas[array_name] = merged return arr_group = self._zarr.require_group(array_name) arr_group.attrs.update(_json_safe(meta))
[docs] def read_array_meta(self, array_name: str) -> dict[str, Any]: if array_name not in self._zarr: return {} try: node = self._zarr[array_name] except KeyError: return {} if not isinstance(node, (zarr.Group, zarr.Array)): return {} return dict(node.attrs)
[docs] def array_exists(self, array_name: str) -> bool: if array_name not in self._zarr: return False try: node = self._zarr[array_name] except KeyError: return False return isinstance(node, (zarr.Group, zarr.Array))
# ---------------- standard Zarr v3 arrays (single array per path) ---------------- # # The methods above (``write_bytes`` / ``read_bytes``) implement the # legacy "Option G" layout where every logical array is a Zarr group # holding single-chunk ``uint8`` arrays. The methods below write a # single standard Zarr v3 array at the given path — the target of # the v0.7 format migration (see plan # ``can-you-do-an-compressed-wilkes.md``). Both layouts coexist # during the transition; new sites should prefer these. # # Batched-writes / batched-reads queues only cover the Option-G # path; these methods always execute synchronously.
[docs] def write_array( self, path: str, data: Any, *, chunks: tuple[int, ...] | None = None, fill_value: Any = None, attributes: dict[str, Any] | None = None, compressors: Any = None, ) -> None: """Write a chunked Zarr v3 array at ``path``. Args: path: Logical path of the array within this group, e.g. ``"object_attributes/intensity"``. Intermediate path segments become Zarr groups if absent. data: Numpy-coercible array. ``dtype`` and ``shape`` set the array's dtype and shape. chunks: Chunk shape. Defaults to ``data.shape`` (single chunk). fill_value: Value Zarr returns for unwritten chunk positions. Special floats are JSON-string-encoded per spec (``"NaN"``, ``"Infinity"``, ``"-Infinity"``). attributes: Dict applied to the array's ``attributes`` block in its own ``zarr.json`` — the spec-blessed location for per-array user metadata. compressors: Override the codec pipeline. ``None`` uses the active session codec (from :meth:`batched_writes`) or zarr v3's default (``bytes`` + ``zstd``). """ arr_data = np.asarray(data) if chunks is None: chunks = arr_data.shape if arr_data.shape else (1,) parent_path, _, leaf = path.rpartition("/") parent = self._zarr.require_group(parent_path) if parent_path else self._zarr if leaf in parent: del parent[leaf] create_kwargs: dict[str, Any] = { "name": leaf, "shape": arr_data.shape, "chunks": chunks, "dtype": arr_data.dtype, } if fill_value is not None: create_kwargs["fill_value"] = fill_value resolved = self._resolve_codecs(compressors) if resolved is not None: from zarr_vectors.encoding.compression import codecs_for_create_array create_kwargs["compressors"] = codecs_for_create_array(resolved) arr = parent.create_array(**create_kwargs) arr[:] = arr_data if attributes: arr.attrs.update(_json_safe(attributes))
[docs] def read_array(self, path: str) -> np.ndarray: """Read a chunked Zarr array at ``path`` as a numpy array.""" node = self._require_array_node(path) return np.asarray(node[:])
[docs] def write_vlen_array( self, path: str, blobs: Sequence[bytes], *, chunks: int | None = None, attributes: dict[str, Any] | None = None, ) -> None: """Write a 1D variable-length-bytes Zarr v3 array at ``path``. Each element of ``blobs`` becomes one row of the resulting array. Mirrors the layout used by ``object_index/manifests``. Args: path: Logical path of the array. blobs: Iterable of bytes blobs. Empty input writes no array (caller's responsibility to check). chunks: Elements per chunk. Defaults to ``len(blobs)`` (single chunk). attributes: Dict applied to the array's ``attributes`` block. """ blob_list = list(blobs) n = len(blob_list) if n == 0: return parent_path, _, leaf = path.rpartition("/") parent = self._zarr.require_group(parent_path) if parent_path else self._zarr if leaf in parent: del parent[leaf] chunk_size = chunks if chunks is not None else n with warnings.catch_warnings(): # vlen-bytes lacks a finalised V3 spec — see the matching # suppression in `_write_object_index_manifests`. warnings.simplefilter("ignore", UnstableSpecificationWarning) arr = parent.create_array( leaf, shape=(n,), chunks=(chunk_size,), dtype="bytes", serializer=VLenBytesCodec(), ) obj = np.empty(n, dtype=object) for i, blob in enumerate(blob_list): obj[i] = blob arr[:] = obj if attributes: arr.attrs.update(_json_safe(attributes))
[docs] def read_vlen_array(self, path: str) -> list[bytes]: """Read a vlen-bytes Zarr array at ``path`` as a list of bytes.""" node = self._require_array_node(path) return [bytes(b) for b in node[:]]
# ---------------- native-sharded chunk array (sharding_indexed) -------
[docs] def create_sharded_chunk_array( self, array_name: str, grid_shape: tuple[int, ...], *, shard_shape: tuple[int, ...] | None = None, attributes: dict[str, Any] | None = None, ) -> None: """Allocate a sharded vlen-bytes Zarr array for per-chunk blobs. Replaces the legacy "Option G" group-of-tiny-arrays layout with a single Zarr v3 array whose shape equals the level's chunk grid. Each cell holds one ZVF spatial chunk's payload. When ``shard_shape`` is provided the ``sharding_indexed`` codec packs many cells into a single storage object — the standard cloud layout described in :doc:`/spec/chunking/sharding`. Args: array_name: Logical path of the array, e.g. ``"vertices"`` or ``"links/0"``. Intermediate path segments become Zarr sub-groups if absent. grid_shape: Number of chunks along each spatial axis at this level. Computed via :func:`zarr_vectors.spatial.chunking.compute_grid_shape`. shard_shape: Outer-chunk shape in *inner-chunk* units. ``None`` (default) creates an unsharded array — one storage object per ZVF chunk, same I/O cost as the legacy layout but already in the new structural form. A typical cloud workload uses ``(8, 8, 8)``: 512 ZVF chunks per storage object. attributes: Per-array metadata merged into the array's ``zarr.json`` ``attributes`` block (the standard Zarr v3 location for user metadata). """ ndim = len(grid_shape) if shard_shape is not None and len(shard_shape) != ndim: raise StoreError( f"shard_shape rank {len(shard_shape)} != grid_shape " f"rank {ndim} for array {array_name!r}" ) parent_path, _, leaf = array_name.rpartition("/") parent = ( self._zarr.require_group(parent_path) if parent_path else self._zarr ) # Replace whatever sat at this path (legacy group, prior array) # before creating the new sharded array. if leaf in parent: del parent[leaf] create_kwargs: dict[str, Any] = { "shape": grid_shape, "chunks": (1,) * ndim, "dtype": "bytes", "serializer": VLenBytesCodec(), } if shard_shape is not None: # Zarr 3.2 high-level ``shards=`` kwarg: wraps the inner # ``chunks=`` codec in ``sharding_indexed`` automatically. create_kwargs["shards"] = tuple(shard_shape) with warnings.catch_warnings(): warnings.simplefilter("ignore", UnstableSpecificationWarning) arr = parent.create_array(leaf, **create_kwargs) arr.attrs[_NONEMPTY_CHUNKS_ATTR] = [] if attributes: arr.attrs.update(_json_safe(attributes))
def _sharded_chunk_array(self, array_name: str) -> zarr.Array | None: """Return the multidim Zarr array at ``array_name`` when the native-sharded layout is in use, else ``None``. The legacy "Option G" layout has a Zarr Group at the same path with single-chunk uint8 children — this returns ``None`` for that case so callers fall back to the per-chunk logic. """ if array_name not in self._zarr: return None try: node = self._zarr[array_name] except KeyError: return None return node if isinstance(node, zarr.Array) else None
[docs] def read_array_attrs(self, path: str) -> dict[str, Any]: """Read the ``attributes`` block of a Zarr array at ``path``. Returns ``{}`` for missing paths or paths that point at a group rather than an array. """ if path not in self._zarr: return {} try: node = self._zarr[path] except KeyError: return {} if not isinstance(node, zarr.Array): return {} return dict(node.attrs)
[docs] def read_array_fill_value(self, path: str) -> Any: """Return the ``fill_value`` of the standard Zarr array at ``path``. ``fill_value`` lives at the top level of the array's ``zarr.json`` (not in the user ``attributes`` block), so this helper exists alongside :meth:`read_array_attrs` rather than being folded into it. Raises :class:`StoreError` if ``path`` is missing or points at a group. """ return self._require_array_node(path).fill_value
[docs] def standalone_array_exists(self, path: str) -> bool: """``True`` when ``path`` points at a Zarr array (not a group). Counterpart to :meth:`array_exists`, which reports ``True`` for the legacy Option-G layout (a group containing chunk arrays). """ if path not in self._zarr: return False try: node = self._zarr[path] except KeyError: return False return isinstance(node, zarr.Array)
def _require_array_node(self, path: str) -> zarr.Array: if path not in self._zarr: raise StoreError( f"Array {path!r} not found in {self._zarr.path or '<root>'}" ) node = self._zarr[path] if not isinstance(node, zarr.Array): raise StoreError( f"{path!r} is a {type(node).__name__}, not an Array" ) return node def _resolve_codecs( self, compressors: Any, ) -> list[dict[str, Any]] | None: """Pick effective codecs: explicit override > session codec > None.""" if compressors is not None: from zarr_vectors.encoding.compression import resolve_compressor return resolve_compressor(compressors) return self._active_codecs # ---------------- delete ----------------
[docs] def delete_subtree(self, name: str) -> None: if name in self._zarr: del self._zarr[name]
# ---------------- path / url ---------------- @property def path(self) -> Path: store = self._zarr.store if not isinstance(store, LocalStore): raise StoreError( f"Group.path is only available for LocalStore; got " f"{type(store).__name__}. Use Group.url instead." ) root = _local_root(store) if self._zarr.path: return root.joinpath(*self._zarr.path.strip("/").split("/")) return root @property def url(self) -> str: store = self._zarr.store if isinstance(store, LocalStore): base = _local_root(store).absolute().as_uri() else: base = repr(store) if self._zarr.path: return f"{base.rstrip('/')}/{self._zarr.path.strip('/')}" return base @property def prefix(self) -> str: return self._zarr.path # ---------------- back-compat shims (used by lazy/) ---------------- @property def backend(self) -> _BackendShim: return _BackendShim(self._zarr.store) @property def _backend(self) -> _BackendShim: # noqa: D401 (legacy callers) return _BackendShim(self._zarr.store) @property def zarr_group(self) -> zarr.Group: """The underlying :class:`zarr.Group`.""" return self._zarr # ---------------- repr ---------------- def __repr__(self) -> str: return f"Group({self.url!r})"
# =================================================================== # .attrs dict-like wrapper # =================================================================== class _Attrs: """Dict-like wrapper around :attr:`zarr.Group.attrs`. The wrapper exists for API parity with the legacy on-disk-JSON ``_Attrs`` — callers use ``attrs.to_dict()``, ``attrs.update(d)``, ``attrs[k]``, ``attrs.get(k, default)``, ``k in attrs``. """ def __init__(self, zarr_attrs: Any) -> None: self._attrs = zarr_attrs def __getitem__(self, key: str) -> Any: return self._attrs[key] def __setitem__(self, key: str, value: Any) -> None: self._attrs[key] = _json_safe_value(value) def __contains__(self, key: str) -> bool: return key in self._attrs def get(self, key: str, default: Any = None) -> Any: try: return self._attrs[key] except KeyError: return default def update(self, other: dict[str, Any]) -> None: self._attrs.update(_json_safe(other)) def to_dict(self) -> dict[str, Any]: return dict(self._attrs) def __repr__(self) -> str: return f"_Attrs({dict(self._attrs)!r})" # =================================================================== # Back-compat shim for `group._backend.url` / `Group._from_backend` # =================================================================== class _BackendShim: """Minimal compat shim for callers that reach for ``group._backend``. Provides the ``url`` accessor and identity needed by :class:`Group._from_backend`. Anything else raises ``AttributeError`` so we notice if some caller depends on the deeper legacy surface. """ def __init__(self, store: Any) -> None: self._store = store @property def url(self) -> str: if isinstance(self._store, LocalStore): return _local_root(self._store).absolute().as_uri() return repr(self._store) def close(self) -> None: try: close = getattr(self._store, "close", None) if close is not None: close() except Exception: # pragma: no cover (defensive) pass # =================================================================== # Helpers # =================================================================== def _local_root(store: LocalStore) -> Path: raw = store.root return raw if isinstance(raw, Path) else Path(raw) # Per-array attribute name listing which chunk keys hold non-empty # payload in a native-sharded array. Vlen-bytes fill_value is ``b""`` # (indistinguishable from an explicitly-empty write), so we keep a # tiny sidecar so ``list_chunks`` / ``chunk_exists`` stay O(1) without # scanning every shard tail. _NONEMPTY_CHUNKS_ATTR = "nonempty_chunks" def _parse_chunk_coords(key: str) -> tuple[int, ...] | None: """Parse a dot-separated chunk key into integer coords. ``"0.1.2"`` → ``(0, 1, 2)``. Returns ``None`` if any segment is not an integer. """ parts = key.split(".") try: return tuple(int(p) for p in parts) except ValueError: return None def _format_chunk_key(coords: tuple[int, ...]) -> str: return ".".join(str(int(c)) for c in coords) def _coords_in_bounds(coords: tuple[int, ...], shape: tuple[int, ...]) -> bool: if len(coords) != len(shape): return False return all(0 <= c < s for c, s in zip(coords, shape)) def _check_coords_in_bounds( coords: tuple[int, ...], shape: tuple[int, ...], array_name: str, ) -> None: if not _coords_in_bounds(coords, shape): raise StoreError( f"Chunk coords {coords} out of grid {shape} for " f"array {array_name!r}" ) def _vlen_get_cell( arr: zarr.Array, coords: tuple[int, ...], ) -> bytes: """Read one vlen-bytes cell as ``bytes``. Zarr 3.x vlen-bytes scalar indexing returns a 0-d object array; we slice-then-extract so the result is always a ``bytes`` instance. """ slices = tuple(slice(c, c + 1) for c in coords) region = np.asarray(arr[slices]) if region.size == 0: return b"" val = region.flat[0] if val is None: return b"" return bytes(val) def _vlen_set_cell( arr: zarr.Array, coords: tuple[int, ...], data: bytes, ) -> None: obj = np.empty((1,) * len(coords), dtype=object) obj.flat[0] = bytes(data) slices = tuple(slice(c, c + 1) for c in coords) with warnings.catch_warnings(): warnings.simplefilter("ignore", UnstableSpecificationWarning) arr[slices] = obj def _record_nonempty_chunk( arr: zarr.Array, chunk_key: str, *, present: bool, ) -> None: """Update the per-array ``nonempty_chunks`` manifest. The manifest is a sorted list of dot-separated chunk keys — consulted by :meth:`Group.list_chunks` and :meth:`chunk_exists` so those calls don't have to scan every shard tail just to enumerate non-empty cells. """ current = arr.attrs.get(_NONEMPTY_CHUNKS_ATTR) keys = set(current) if current else set() if present: keys.add(chunk_key) else: keys.discard(chunk_key) arr.attrs[_NONEMPTY_CHUNKS_ATTR] = sorted(keys) def _json_safe(d: dict[str, Any]) -> dict[str, Any]: return {k: _json_safe_value(v) for k, v in d.items()} def _json_safe_value(v: Any) -> Any: """Coerce numpy scalars / arrays to JSON-native types for zarr attrs.""" if isinstance(v, np.generic): return v.item() if isinstance(v, np.ndarray): return v.tolist() if isinstance(v, (list, tuple)): return [_json_safe_value(x) for x in v] if isinstance(v, dict): return _json_safe(v) return v