Source code for zarr_vectors.validate.metadata

"""Level 2 metadata validation — verify metadata is well-formed."""

from __future__ import annotations

from pathlib import Path

from zarr_vectors.constants import (
    CROSS_CHUNK_DEDUP, CROSS_CHUNK_BOTH, CROSS_CHUNK_EXPLICIT,
    LINKS_EXPLICIT, LINKS_IMPLICIT_BRANCHES, LINKS_IMPLICIT_SEQUENTIAL,
    OBJIDX_IDENTITY, OBJIDX_STANDARD,
)
from zarr_vectors.core.store import (
    open_store, read_root_metadata, get_resolution_level, list_resolution_levels,
)
from zarr_vectors.validate.structure import ValidationResult

VALID_LINKS = {LINKS_EXPLICIT, LINKS_IMPLICIT_SEQUENTIAL, LINKS_IMPLICIT_BRANCHES}
VALID_OBJIDX = {OBJIDX_STANDARD, OBJIDX_IDENTITY}
VALID_CROSS = {CROSS_CHUNK_EXPLICIT, CROSS_CHUNK_DEDUP, CROSS_CHUNK_BOTH}


[docs] def validate_metadata(store_path: str | Path) -> ValidationResult: """Level 2: verify all metadata is well-formed.""" result = ValidationResult(level=2) try: root = open_store(str(store_path)) except Exception as e: result.add_error(f"Cannot open store: {e}") return result try: meta = read_root_metadata(root) except Exception as e: result.add_error(f"Cannot read root metadata: {e}") return result result.add_pass("Root metadata parsed") sid_ndim = meta.sid_ndim if sid_ndim < 1: result.add_error(f"SID dimensionality is {sid_ndim}, must be >= 1") else: result.add_pass(f"SID dimensionality: {sid_ndim}") if len(meta.chunk_shape) != sid_ndim: result.add_error(f"chunk_shape has {len(meta.chunk_shape)} dims, expected {sid_ndim}") else: result.add_pass("chunk_shape dimensionality matches SID") for i, cs in enumerate(meta.chunk_shape): if cs <= 0: result.add_error(f"chunk_shape[{i}] = {cs}, must be > 0") if meta.bounds: bmin, bmax = meta.bounds if len(bmin) != sid_ndim or len(bmax) != sid_ndim: result.add_error(f"Bounds dim mismatch: min={len(bmin)}, max={len(bmax)}, expected {sid_ndim}") else: result.add_pass("Bounds dimensionality matches SID") lc = meta.links_convention if lc and lc not in VALID_LINKS: result.add_error(f"Unknown links_convention: '{lc}'") else: result.add_pass(f"links_convention: '{lc}'") oc = meta.object_index_convention if oc and oc not in VALID_OBJIDX: result.add_error(f"Unknown object_index_convention: '{oc}'") else: result.add_pass(f"object_index_convention: '{oc}'") cc = meta.cross_chunk_strategy if cc and cc not in VALID_CROSS: result.add_error(f"Unknown cross_chunk_strategy: '{cc}'") else: result.add_pass(f"cross_chunk_strategy: '{cc}'") if not meta.geometry_types: result.add_warning("No geometry_types specified") else: result.add_pass(f"geometry_types: {meta.geometry_types}") # Bin shape / chunk divisibility validation if meta.base_bin_shape is not None: if len(meta.base_bin_shape) != sid_ndim: result.add_error( f"base_bin_shape has {len(meta.base_bin_shape)} dims, " f"expected {sid_ndim}" ) else: result.add_pass( f"base_bin_shape: {meta.base_bin_shape}" ) for i, (cs, bs) in enumerate(zip(meta.chunk_shape, meta.base_bin_shape)): if bs <= 0: result.add_error( f"base_bin_shape[{i}]={bs}, must be > 0" ) else: ratio = cs / bs if abs(ratio - round(ratio)) > 1e-9: result.add_error( f"chunk_shape[{i}]={cs} not integer multiple " f"of base_bin_shape[{i}]={bs}" ) result.add_pass( f"bins_per_chunk: {meta.bins_per_chunk}" ) levels = list_resolution_levels(root) for li in levels: try: lg = get_resolution_level(root, li) la = lg.attrs result.add_pass(f"resolution_{li} metadata parsed") vc = la.get("vertex_count") if vc is not None: if not isinstance(vc, int) or vc < 0: result.add_error(f"resolution_{li}: vertex_count={vc} invalid") else: result.add_pass(f"resolution_{li}: vertex_count={vc}") # Validate per-level bin_shape divides chunk_shape bin_shape = la.get("bin_shape") if bin_shape is None: bin_shape = la.get("bin_size") # legacy fallback if bin_shape is not None: if len(bin_shape) != sid_ndim: result.add_error( f"resolution_{li}: bin_shape has {len(bin_shape)} dims" ) else: for i, (cs, bs) in enumerate(zip(meta.chunk_shape, bin_shape)): if bs <= 0: result.add_error( f"resolution_{li}: bin_shape[{i}]={bs} not > 0" ) else: ratio = cs / bs if abs(ratio - round(ratio)) > 1e-9: result.add_error( f"resolution_{li}: bin_shape[{i}]={bs} " f"does not divide chunk_shape[{i}]={cs}" ) # Validate bin_ratio values bin_ratio = la.get("bin_ratio") if bin_ratio is not None: if len(bin_ratio) != sid_ndim: result.add_error( f"resolution_{li}: bin_ratio has {len(bin_ratio)} dims" ) else: for i, r in enumerate(bin_ratio): if r < 1: result.add_error( f"resolution_{li}: bin_ratio[{i}]={r} < 1" ) # Validate object_sparsity in (0, 1] sparsity = la.get("object_sparsity", 1.0) if not (0.0 < sparsity <= 1.0): result.add_error( f"resolution_{li}: object_sparsity={sparsity} not in (0, 1]" ) except Exception as e: result.add_error(f"resolution_{li}: cannot read metadata: {e}") return result