Source code for zarr_vectors.validate.consistency

"""Level 3 consistency validation — verify data arrays are internally consistent."""

from __future__ import annotations

from pathlib import Path

import numpy as np

from zarr_vectors.core.arrays import (
    list_chunk_keys, read_all_object_manifests, read_chunk_vertices, read_cross_chunk_links,
)
from zarr_vectors.core.store import (
    get_resolution_level, list_resolution_levels, open_store, read_root_metadata,
)
from zarr_vectors.validate.structure import ValidationResult


[docs] def validate_consistency(store_path: str | Path) -> ValidationResult: """Level 3: verify internal data consistency.""" result = ValidationResult(level=3) try: root = open_store(str(store_path)) meta = read_root_metadata(root) except Exception as e: result.add_error(f"Cannot open store: {e}") return result ndim = meta.sid_ndim levels = list_resolution_levels(root) for li in levels: prefix = f"resolution_{li}" try: lg = get_resolution_level(root, li) except Exception as e: result.add_error(f"{prefix}: cannot open: {e}") continue chunk_keys = list_chunk_keys(lg) if not chunk_keys: result.add_warning(f"{prefix}: no chunk data") continue total_verts = 0 chunk_fragment_counts: dict[tuple, int] = {} # Resolve per-level chunk_shape (v0.7 may override root). from zarr_vectors.core.metadata import get_level_chunk_shape from zarr_vectors.core.store import read_level_metadata try: level_meta_obj = read_level_metadata(root, li) except Exception: level_meta_obj = None level_chunk_shape = get_level_chunk_shape(meta, level_meta_obj) # Determine effective bin shape for this level try: la = lg.attrs level_bin_shape = la.get("bin_shape") or la.get("bin_size") if level_bin_shape is not None: level_bin_shape = tuple(float(x) for x in level_bin_shape) elif li == 0: level_bin_shape = meta.effective_bin_shape else: level_bin_shape = level_chunk_shape # unknown — skip bin checks except Exception: level_bin_shape = level_chunk_shape # Compute bins_per_chunk for this level level_bins_per_chunk = tuple( int(round(cs / bs)) for cs, bs in zip(level_chunk_shape, level_bin_shape) ) max_fragments = 1 for b in level_bins_per_chunk: max_fragments *= b # Per-bin fragment layout only applies to undifferentiated point-cloud stores. # Polylines / lines / graphs / meshes use fragments to represent segments, # endpoints, or per-object partitions — not bins. geom_types = meta.geometry_types or [] is_point_cloud_only = ( "point_cloud" in geom_types and not any(gt in geom_types for gt in [ "polyline", "streamline", "line", "graph", "skeleton", "mesh", ]) ) # Also skip if the store has object_index (fragments are per-object, not per-bin) try: has_object_index = "object_index" in lg except Exception: has_object_index = False check_bin_layout = is_point_cloud_only and not has_object_index chunks_checked_for_bin_bounds = 0 for ck in chunk_keys: try: groups = read_chunk_vertices(lg, ck, dtype=np.float32, ndim=ndim) except Exception as e: result.add_error(f"{prefix}: chunk {ck} decode failed: {e}") continue chunk_fragment_counts[ck] = len(groups) # Check fragment count doesn't exceed bins_per_chunk # (only for undifferentiated point clouds with explicit bins) has_bins = any(b > 1 for b in level_bins_per_chunk) if check_bin_layout and has_bins and len(groups) > max_fragments: result.add_error( f"{prefix}: chunk {ck} has {len(groups)} fragments, " f"exceeds bins_per_chunk product {max_fragments}" ) for vi, fragment in enumerate(groups): if fragment.ndim != 2 or fragment.shape[1] != ndim: result.add_error(f"{prefix}: chunk {ck} fragment[{vi}] shape {fragment.shape}") if len(fragment) > 0 and np.any(~np.isfinite(fragment)): result.add_warning(f"{prefix}: chunk {ck} fragment[{vi}] NaN/Inf") total_verts += len(fragment) # Spot-check bin bounds for point clouds only if check_bin_layout and has_bins and chunks_checked_for_bin_bounds < 3: from zarr_vectors.spatial.chunking import fragment_index_to_bin chunks_checked_for_bin_bounds += 1 for vi, fragment in enumerate(groups): if len(fragment) == 0: continue try: bin_coords = fragment_index_to_bin(vi, ck, level_bins_per_chunk) except Exception: continue bin_lo = np.array( [bc * bs for bc, bs in zip(bin_coords, level_bin_shape)], dtype=np.float64, ) bin_hi = bin_lo + np.array(level_bin_shape, dtype=np.float64) # Allow small tolerance for float rounding tol = 1e-4 out_of_bin = np.any( (fragment < bin_lo - tol) | (fragment >= bin_hi + tol), axis=1, ) n_out = int(np.sum(out_of_bin)) if n_out > 0: result.add_warning( f"{prefix}: chunk {ck} fragment[{vi}] has {n_out} points " f"outside bin {bin_coords} bounds" ) result.add_pass( f"{prefix}: {len(chunk_keys)} chunks decoded, {total_verts} vertices" ) try: la = lg.attrs evc = la.get("vertex_count") if evc is not None: if total_verts != evc: result.add_error(f"{prefix}: metadata vertex_count={evc}, actual={total_verts}") else: result.add_pass(f"{prefix}: vertex_count matches") except Exception: pass try: manifests = read_all_object_manifests(lg) for oid, mf in enumerate(manifests): for cc, fragment_index in mf: if cc not in chunk_fragment_counts: result.add_error(f"{prefix}: obj {oid} refs non-existent chunk {cc}") elif fragment_index >= chunk_fragment_counts[cc]: result.add_error(f"{prefix}: obj {oid} refs fragment_idx={fragment_index} >= {chunk_fragment_counts[cc]}") result.add_pass(f"{prefix}: object_index validated ({len(manifests)} objects)") except Exception: pass # Walk every cross_chunk_links/<delta>/ family. Two passes: # # (a) Structural pass — for each cell key, parse the dotted # canonical chunk-tuple and check the cell-key arity # (sid_ndim * link_width) and the canonical-sort # invariant (chunks non-decreasing in lex order). # (b) Endpoint-presence pass — read records in input order # via read_cross_chunk_links so endpoint 0 is the # owning-level source. For delta=0 every endpoint must # exist in this level's chunk grid; for delta != 0 only # endpoint 0 is constrained here. from zarr_vectors.core.arrays import list_cross_link_deltas from zarr_vectors.core.paths import ( cross_chunk_links_path, parse_cell_key, ) for d in list_cross_link_deltas(lg): family = cross_chunk_links_path(d) try: family_meta = lg.read_array_meta(family) or {} link_width = int(family_meta.get("link_width", 2)) ccl_sid_ndim = int(family_meta.get("sid_ndim", 0)) except Exception: continue if ccl_sid_ndim == 0: continue cell_keys = sorted(lg.list_chunks(family)) for ckey in cell_keys: try: canonical_chunks = parse_cell_key( ckey, sid_ndim=ccl_sid_ndim, link_width=link_width, ) except ValueError as exc: result.add_error( f"{prefix}: ccl[delta={d}] cell {ckey!r} " f"malformed: {exc}" ) continue for i in range(1, link_width): if canonical_chunks[i] < canonical_chunks[i - 1]: result.add_error( f"{prefix}: ccl[delta={d}] cell {ckey!r} " f"violates canonical-sort invariant" ) break try: ccl = read_cross_chunk_links(lg, delta=d) except Exception: ccl = [] for record in ccl: # record is a tuple of (chunk_coords, vi) endpoints # in input order; endpoint 0 is the owning-level # source side. src_chunk = record[0][0] if src_chunk not in chunk_fragment_counts: result.add_error( f"{prefix}: ccl[delta={d}] refs non-existent " f"source chunk {src_chunk}" ) if d == 0: for ep_chunk, _ in record[1:]: if ep_chunk not in chunk_fragment_counts: result.add_error( f"{prefix}: ccl[delta=0] refs " f"non-existent chunk {ep_chunk}" ) result.add_pass( f"{prefix}: ccl[delta={d}] validated " f"({len(ccl)} links across {len(cell_keys)} cells)" ) return result