Source code for zarr_vectors.multiresolution.coarsen

"""Multi-resolution pyramid construction orchestrator.

Two entry points (use one):

* ``build_pyramid(store, factors=[(cf_1, sf_1), ...])`` builds every
  coarser level in sequence, optionally emitting cross-level link
  arrays (``cross_level_storage="implicit"`` or ``"explicit"``).
* ``coarsen_level(store, source, target, coarsen_factor=..., sparsity_factor=...)``
  writes a single coarser level for callers that want manual control.

Both use the per-object pyramid: each surviving object's vertices are
aggregated into bin centroids (metavertices) that may be shared
between objects, and per-object OIDs are preserved across levels.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any

import numpy as np
import numpy.typing as npt

from zarr_vectors.constants import (
    CAP_MULTISCALE_LINKS,
    CAP_PRESERVED_OBJECT_IDS,
    CAP_SHARED_FRAGMENTS,
    COARSEN_PER_OBJECT,
    DEFAULT_CROSS_LEVEL_DEPTH,
    DEFAULT_CROSS_LEVEL_STORAGE,
    LINKS,
    OBJECT_ATTRIBUTES,
    VERTICES,
    XLEVEL_EXPLICIT,
    XLEVEL_NONE,
    VALID_XLEVEL_STORAGE,
)
from zarr_vectors.core.arrays import (
    create_cross_chunk_links_array,
    create_links_array,
    create_object_attributes_array,
    create_object_index_array,
    create_vertices_array,
    list_chunk_keys,
    read_all_object_manifests,
    read_chunk_links,
    read_chunk_vertices,
    read_cross_chunk_links,
    read_object_attributes,
    write_chunk_links,
    write_chunk_vertices,
    write_cross_chunk_links,
    write_object_attributes,
    write_object_index,
)
from zarr_vectors.core.metadata import (
    LevelMetadata,
    get_level_chunk_shape,
)
from zarr_vectors.core.store import (
    create_resolution_level,
    get_resolution_level,
    list_resolution_levels,
    open_store,
    read_level_metadata,
    read_root_metadata,
)
from zarr_vectors.exceptions import ArrayError, CoarseningError
from zarr_vectors.multiresolution.object_selection import apply_sparsity
from zarr_vectors.spatial.boundary import (
    build_vertex_chunk_mapping,
    partition_cross_level_edges,
)
from zarr_vectors.spatial.chunking import assign_chunks
from zarr_vectors.typing import ChunkCoords


# ===================================================================
# Single-level coarsening
# ===================================================================

[docs] def coarsen_level( store_path: str | Path, source_level: int, target_level: int, *, coarsen_factor: float = 1.0, sparsity_factor: float = 1.0, chunk_scale_factor: int | tuple[int, ...] = 1, sparsity_strategy: str = "random", sparsity_seed: int | None = None, cross_level_storage: str = XLEVEL_NONE, ) -> dict[str, Any]: """Coarsen a single level and write it to the store. Per-object vertex aggregation with stable OIDs across levels. A metavertex's source vertices may come from multiple source objects; the resulting metavertex appears in each of those objects' manifests at the coarser level. Args: store_path: Path to the zarr vectors store. source_level: Level to read from. target_level: Level to write to (must not exist). coarsen_factor: Per-object vertex aggregation factor (≥ 1). ``1.0`` is the identity (no aggregation). sparsity_factor: Object-dropping factor (≥ 1). Survivors keep their OIDs; dropped objects leave empty manifest slots. ``1.0`` is the identity (no drop). chunk_scale_factor: Per-axis multiplier applied to the source level's ``chunk_shape`` to derive the target level's ``chunk_shape``. ``1`` (default) keeps the chunk grid unchanged. Scalar values apply uniformly to every axis; tuples set per-axis multipliers. Each multiplier must be a positive integer (nested chunk grids). When the resulting target chunk_shape differs from the root chunk_shape it is stamped on the target level's ``LevelMetadata.chunk_shape``; otherwise the target inherits from root. sparsity_strategy: Object selection strategy. sparsity_seed: Random seed. cross_level_storage: When called via ``build_pyramid`` this is threaded through to enable inline ``±1`` cross-level link emission. Standalone callers should leave it at the ``"none"`` default. Returns: Summary dict. Always includes ``method``, ``preserves_object_ids``, ``vertex_count``. """ return _per_object_coarsen( store_path=store_path, source_level=source_level, target_level=target_level, coarsen_factor=coarsen_factor, sparsity_factor=sparsity_factor, chunk_scale_factor=chunk_scale_factor, sparsity_strategy=sparsity_strategy, sparsity_seed=sparsity_seed, cross_level_storage=cross_level_storage, )
def _per_object_coarsen( *, store_path: str | Path, source_level: int, target_level: int, coarsen_factor: float, sparsity_factor: float, chunk_scale_factor: int | tuple[int, ...] = 1, sparsity_strategy: str, sparsity_seed: int | None, cross_level_storage: str = XLEVEL_NONE, ) -> dict[str, Any]: """Per-object pyramid: aggregate within-bin source vertices into shared metavertices, preserving each surviving object's OID and its trajectory through the new metavertices. See the 12-step implementation sketch in the plan file ``Provenance-preserving pyramid: shared metavertices + ID-stable objects`` (`schema/zarr_vectors.linkml.yaml` schema captures the persistent metadata side). """ root = open_store(str(store_path), mode="r+") root_meta = read_root_metadata(root) ndim = root_meta.sid_ndim base_bin = root_meta.effective_bin_shape # Source level's chunk_shape — may itself be a per-level override. try: src_level_meta = read_level_metadata(root, source_level) except Exception: src_level_meta = None source_chunk_shape = get_level_chunk_shape(root_meta, src_level_meta) # Target level's chunk_shape = source × chunk_scale_factor (per-axis). if isinstance(chunk_scale_factor, (tuple, list)): if len(chunk_scale_factor) != ndim: raise CoarseningError( f"chunk_scale_factor rank {len(chunk_scale_factor)} " f"!= sid_ndim {ndim}" ) chunk_scale = tuple(int(r) for r in chunk_scale_factor) else: chunk_scale = tuple(int(chunk_scale_factor) for _ in range(ndim)) if any(r < 1 for r in chunk_scale): raise CoarseningError( f"chunk_scale_factor must be positive integers per axis, " f"got {chunk_scale}" ) target_chunk_shape = tuple( float(s) * int(r) for s, r in zip(source_chunk_shape, chunk_scale) ) # The on-disk per-level chunk_shape field is omitted when the # target equals root (the implicit default). Compare via float. target_chunk_shape_override: tuple[float, ...] | None if all( abs(t - r) < 1e-9 for t, r in zip(target_chunk_shape, root_meta.chunk_shape) ): target_chunk_shape_override = None else: target_chunk_shape_override = target_chunk_shape chunk_shape = target_chunk_shape # used for assign_chunks below src_group = get_resolution_level(root, source_level) # --- Step 0: read source manifests + vertex positions ---------------- # Read source vertex positions, indexed by (chunk_coords, vg_idx). src_vg_positions: dict[tuple[ChunkCoords, int], npt.NDArray] = {} for cc in list_chunk_keys(src_group, VERTICES): try: vgs = read_chunk_vertices(src_group, cc, dtype=np.float32, ndim=ndim) except ArrayError: continue for vg_idx, vg in enumerate(vgs): src_vg_positions[(cc, vg_idx)] = vg src_has_objects = "object_index" in src_group if src_has_objects: src_manifests = read_all_object_manifests(src_group) else: # No object_index — treat the level as one implicit object whose # manifest enumerates every vg in chunk-major order. implicit: list[tuple[ChunkCoords, int]] = [] for cc in list_chunk_keys(src_group, VERTICES): vg_idx = 0 while (cc, vg_idx) in src_vg_positions: implicit.append((cc, vg_idx)) vg_idx += 1 src_manifests = [implicit] if implicit else [] n_src_objects = len(src_manifests) if n_src_objects == 0: return { "vertex_count": 0, "object_count": 0, "objects_kept": 0, "method": COARSEN_PER_OBJECT, "preserves_object_ids": True, } # --- Step 1: drop a fraction of source objects ---------------------- keep_oids: list[int] if sparsity_factor > 1.0 and n_src_objects > 1: keep_frac = 1.0 / sparsity_factor kept = apply_sparsity( n_src_objects, keep_frac, sparsity_strategy, seed=sparsity_seed, representative_points=None, bin_shape=base_bin, ) keep_oids = sorted(int(o) for o in kept) else: keep_oids = list(range(n_src_objects)) keep_set = set(keep_oids) # --- Step 2-3: build (source vertex → bin → metavertex) map --------- # Per-object ordered source-vertex positions (with their global index # in the flat source-vertex array). per_object_positions: dict[int, np.ndarray] = {} flat_positions: list[np.ndarray] = [] flat_oid_of_v: list[int] = [] next_global = 0 for oid in keep_oids: manifest = src_manifests[oid] parts: list[np.ndarray] = [] for cc, vg_idx in manifest: vg = src_vg_positions.get((cc, vg_idx)) if vg is None or len(vg) == 0: continue parts.append(np.asarray(vg, dtype=np.float32)) if not parts: per_object_positions[oid] = np.zeros((0, ndim), dtype=np.float32) continue obj_positions = np.concatenate(parts, axis=0) per_object_positions[oid] = obj_positions flat_positions.append(obj_positions) flat_oid_of_v.extend([oid] * obj_positions.shape[0]) next_global += obj_positions.shape[0] if not flat_positions: # Surviving objects had no vertices. Write an empty level. _write_empty_preserve_level( root, source_level, target_level, base_bin=base_bin, coarsen_factor=coarsen_factor, sparsity_factor=sparsity_factor, inherited_num_objects=n_src_objects, ) return { "vertex_count": 0, "object_count": 0, "objects_kept": len(keep_oids), "method": COARSEN_PER_OBJECT, "preserves_object_ids": True, "shared_fragments": True, } all_pos = np.concatenate(flat_positions, axis=0) # Target bin shape: source bin_shape × coarsen_factor. target_bin_shape = tuple(float(b) * float(coarsen_factor) for b in base_bin) # Compute per-vertex bin coords: (N, ndim) int64. bin_shape_arr = np.asarray(target_bin_shape, dtype=np.float64) bin_coords = np.floor(all_pos / bin_shape_arr).astype(np.int64) # Combine each bin coord tuple into a single sort-key for np.unique. bin_keys = np.ascontiguousarray(bin_coords).view( np.dtype((np.void, bin_coords.dtype.itemsize * bin_coords.shape[1])) ).ravel() _, inverse = np.unique(bin_keys, return_inverse=True) inverse = inverse.astype(np.int64, copy=False) n_metavertices = int(inverse.max()) + 1 if inverse.size > 0 else 0 # --- Step 3 (continued): centroid per bin -------------------------- meta_positions = np.zeros((n_metavertices, ndim), dtype=np.float32) bin_counts = np.zeros(n_metavertices, dtype=np.int64) np.add.at(meta_positions, inverse, all_pos) np.add.at(bin_counts, inverse, 1) meta_positions /= bin_counts[:, None] # --- Step 4: chunk-assign metavertices ------------------------------ chunk_assignments = assign_chunks(meta_positions, chunk_shape) # --- Step 5: per-chunk vg layout (one vg per metavertex) ------------ metavertex_to_ref: dict[int, tuple[ChunkCoords, int]] = {} per_chunk_groups: dict[ChunkCoords, list[np.ndarray]] = {} for cc, indices in sorted(chunk_assignments.items()): # ``indices`` are metavertex indices that fell in this chunk. for vg_idx, mv_idx in enumerate(indices.tolist()): metavertex_to_ref[int(mv_idx)] = (cc, vg_idx) per_chunk_groups.setdefault(cc, []).append( meta_positions[mv_idx:mv_idx + 1] ) # --- Step 6: write per-chunk vertex groups -------------------------- arrays_present = [VERTICES, "object_index"] if src_has_objects else [VERTICES] level_meta_initial = LevelMetadata( level=target_level, vertex_count=int(n_metavertices), arrays_present=arrays_present, bin_shape=target_bin_shape, bin_ratio=tuple(max(1, int(round(coarsen_factor))) for _ in range(ndim)), chunk_shape=target_chunk_shape_override, object_sparsity=(1.0 / sparsity_factor), coarsening_method=COARSEN_PER_OBJECT, parent_level=source_level, preserves_object_ids=src_has_objects, inherited_num_objects=n_src_objects if src_has_objects else 0, shared_fragments=True, ) level_group = create_resolution_level(root, target_level, level_meta_initial) create_vertices_array(level_group, dtype="float32") if src_has_objects: create_object_index_array(level_group) for cc, groups in sorted(per_chunk_groups.items()): write_chunk_vertices(level_group, cc, groups, dtype=np.float32) # --- Step 7: emit per-object manifests ------------------------------ # We need to map each source vertex back to its metavertex_index. # Walk per-object slices of the flat ``inverse`` array. cursor = 0 new_manifests: dict[int, list[tuple[ChunkCoords, int]]] = {} for oid in keep_oids: n = per_object_positions[oid].shape[0] if n == 0: cursor += 0 new_manifests[oid] = [] continue mv_seq = inverse[cursor:cursor + n].tolist() cursor += n # Deduplicate consecutive duplicates while preserving order. manifest: list[tuple[ChunkCoords, int]] = [] prev = -1 for mv_idx in mv_seq: if mv_idx == prev: continue prev = mv_idx manifest.append(metavertex_to_ref[int(mv_idx)]) new_manifests[oid] = manifest # --- Step 9: emit object_index (gap-fill for dropped OIDs) ---------- if src_has_objects: write_object_index( level_group, new_manifests, sid_ndim=ndim, total_objects=n_src_objects, ) # --- Step 10: per-object attributes with present_mask --------------- src_obj_attr_group_name = f"{OBJECT_ATTRIBUTES}" if src_obj_attr_group_name in src_group: src_obj_attr_group = src_group[src_obj_attr_group_name] attr_names = [n for n in src_obj_attr_group] else: attr_names = [] for attr_name in attr_names: try: src_data = read_object_attributes(src_group, attr_name) except ArrayError: continue # Dense (O, C) or (O,) padded to the inherited OID space, with # rows for survivors copied over. Layout matches the source's # OID space (which already equals n_src_objects). out_data = np.zeros_like(src_data) for oid in keep_oids: if oid < len(src_data): out_data[oid] = src_data[oid] mask = np.zeros(n_src_objects, dtype=np.uint8) for oid in keep_oids: mask[oid] = 1 create_object_attributes_array(level_group, attr_name) write_object_attributes(level_group, attr_name, out_data, present_mask=mask) # --- Step 12: stamp root capability tokens -------------------------- if src_has_objects: _stamp_root_capability(root, CAP_PRESERVED_OBJECT_IDS) _stamp_root_capability(root, CAP_SHARED_FRAGMENTS) # --- Step 13: emit inline ±1 cross-level link arrays ---------------- if cross_level_storage != XLEVEL_NONE and n_metavertices > 0: _emit_inline_cross_level_links( root, src_group=src_group, level_group=level_group, source_level=source_level, ndim=ndim, bin_shape_arr=bin_shape_arr, bin_keys=bin_keys, coarse_chunk_assignments_mv=chunk_assignments, storage=cross_level_storage, ) return { "vertex_count": int(n_metavertices), "object_count": len(keep_oids), "objects_kept": len(keep_oids), "source_objects": n_src_objects, "method": COARSEN_PER_OBJECT, "preserves_object_ids": True, "shared_fragments": True, } def _emit_inline_cross_level_links( root, *, src_group, level_group, source_level: int, ndim: int, bin_shape_arr: npt.NDArray[np.float64], bin_keys: npt.NDArray, coarse_chunk_assignments_mv: dict[ChunkCoords, npt.NDArray[np.int64]], storage: str, ) -> None: """Emit ``±1`` link/cross_chunk_link arrays for one coarsen step. Re-walks the source level in chunk-major order, re-bins each vertex against ``bin_shape_arr``, and looks up the matching metavertex via the ``bin_key`` ↔ ``mv_idx`` map implicit in ``np.unique(bin_keys, return_inverse=inverse)``. Translates metavertex IDs to chunk-major-flat coarse indices via the just-written coarse-level chunks, then dispatches to :func:`_write_cross_level_edges`. """ # bin_key_bytes → mv_idx (bin-key-ordered, matches np.unique output). unique_keys = np.unique(bin_keys) bin_key_to_mv: dict[bytes, int] = { bytes(k): i for i, k in enumerate(unique_keys) } # mv_idx → chunk-major-flat coarse index. coarse_chunk_assignments, n_coarse = _reconstruct_chunk_assignments( level_group, ndim, ) mv_to_coarse_global: dict[int, int] = {} for cc, mv_indices_for_chunk in sorted(coarse_chunk_assignments_mv.items()): for local_vg, mv_idx in enumerate(mv_indices_for_chunk.tolist()): mv_to_coarse_global[int(mv_idx)] = int( coarse_chunk_assignments[cc][local_vg] ) # Build fine→coarse parent[] by re-walking source in chunk-major order. fine_chunk_assignments, n_fine = _reconstruct_chunk_assignments( src_group, ndim, ) parent = np.full(n_fine, -1, dtype=np.int64) cursor = 0 key_dtype = np.dtype(( np.void, int(bin_shape_arr.shape[0]) * np.dtype(np.int64).itemsize, )) for cc in list_chunk_keys(src_group, VERTICES): try: vgs = read_chunk_vertices( src_group, cc, dtype=np.float32, ndim=ndim, ) except ArrayError: continue for vg in vgs: n_local = int(vg.shape[0]) if n_local == 0: continue local_bins = np.floor( np.asarray(vg, dtype=np.float32) / bin_shape_arr, ).astype(np.int64) local_keys = np.ascontiguousarray(local_bins).view(key_dtype).ravel() for j in range(n_local): mv = bin_key_to_mv.get(bytes(local_keys[j])) if mv is not None: parent[cursor + j] = mv_to_coarse_global[int(mv)] cursor += n_local _write_cross_level_edges( root, fine_level=source_level, delta=1, fine_chunk_assignments=fine_chunk_assignments, coarse_chunk_assignments=coarse_chunk_assignments, n_fine=n_fine, n_coarse=n_coarse, parent=parent, sid_ndim=ndim, storage=storage, ) def _write_empty_preserve_level( root, source_level: int, target_level: int, *, base_bin: tuple[float, ...], coarsen_factor: float, sparsity_factor: float, inherited_num_objects: int, ) -> None: """Write an empty ID-preserving level when no surviving object has vertices.""" ndim = len(base_bin) target_bin_shape = tuple(float(b) * float(coarsen_factor) for b in base_bin) level_meta = LevelMetadata( level=target_level, vertex_count=0, arrays_present=[VERTICES, "object_index"], bin_shape=target_bin_shape, bin_ratio=tuple(max(1, int(round(coarsen_factor))) for _ in range(ndim)), object_sparsity=(1.0 / sparsity_factor), coarsening_method=COARSEN_PER_OBJECT, parent_level=source_level, preserves_object_ids=True, inherited_num_objects=inherited_num_objects, shared_fragments=True, ) level_group = create_resolution_level(root, target_level, level_meta) create_vertices_array(level_group, dtype="float32") create_object_index_array(level_group) # Empty object_index with the inherited size — all manifests are []. write_object_index( level_group, {}, sid_ndim=ndim, total_objects=inherited_num_objects, ) _stamp_root_capability(root, CAP_PRESERVED_OBJECT_IDS) def _stamp_root_capability(root_group, cap: str) -> None: """Add ``cap`` to root metadata's ``format_capabilities`` (idempotent).""" attrs = root_group.attrs.to_dict() zv = attrs.get("zarr_vectors", {}) caps = list(zv.get("format_capabilities", [])) if cap not in caps: caps.append(cap) zv["format_capabilities"] = caps root_group.attrs.update({"zarr_vectors": zv}) def _stamp_root_cross_level( root_group, *, depth: int, storage: str, ) -> None: """Persist cross_level_depth/cross_level_storage on root metadata.""" attrs = root_group.attrs.to_dict() zv = attrs.get("zarr_vectors", {}) zv["cross_level_depth"] = int(depth) zv["cross_level_storage"] = storage root_group.attrs.update({"zarr_vectors": zv}) def _reconstruct_chunk_assignments( level_group, ndim: int, ) -> tuple[dict[ChunkCoords, npt.NDArray[np.int64]], int]: """Rebuild ``{chunk_coords: vertex_indices}`` from on-disk vertex chunks. The "vertex index" assigned to each vertex is the position it would occupy in a flat enumeration that walks chunks in ``list_chunk_keys`` order and concatenates each chunk's vertex groups in order. This matches the convention used by ``build_vertex_chunk_mapping`` for in-memory edge partitioning. Returns the assignments dict and the total vertex count. """ chunk_keys = list_chunk_keys(level_group, VERTICES) assignments: dict[ChunkCoords, npt.NDArray[np.int64]] = {} cursor = 0 for cc in chunk_keys: try: vgs = read_chunk_vertices(level_group, cc, dtype=np.float32, ndim=ndim) except ArrayError: continue n = sum(int(vg.shape[0]) for vg in vgs) if n == 0: continue assignments[cc] = np.arange(cursor, cursor + n, dtype=np.int64) cursor += n return assignments, cursor def _decode_parent_from_plus_one( fine_lg, *, fine_assn: dict[ChunkCoords, npt.NDArray[np.int64]], coarse_assn: dict[ChunkCoords, npt.NDArray[np.int64]], n_fine: int, ) -> npt.NDArray[np.int64] | None: """Decode a fine→coarse ``parent`` array from already-written ``+1`` arrays. Reads ``links/<+1>/<chunk_key>`` (intra-chunk edges) and ``cross_chunk_links/<+1>/`` (cross-chunk edges) at the fine level and converts each ``(chunk, local_idx)`` pair to global flat indices via the supplied chunk-assignment dicts. Returns ``None`` when neither array exists. """ parent = np.full(n_fine, -1, dtype=np.int64) found_any = False # Aligned (intra-chunk) edges: read each chunk in links/+1/. try: chunk_keys = list_chunk_keys(fine_lg, f"{LINKS}/+1") except (ArrayError, KeyError): chunk_keys = [] for cc in chunk_keys: try: link_groups = read_chunk_links(fine_lg, cc, delta=1) except ArrayError: continue for rows in link_groups: if rows is None or len(rows) == 0: continue local_src = rows[:, 0].astype(np.int64) local_tgt = rows[:, 1].astype(np.int64) fine_global = fine_assn[cc][local_src] coarse_global = coarse_assn[cc][local_tgt] parent[fine_global] = coarse_global found_any = True # Cross-chunk edges. try: records = read_cross_chunk_links(fine_lg, delta=1) except (ArrayError, KeyError): records = [] for (cc_s, vi_s), (cc_t, vi_t) in records: parent[int(fine_assn[cc_s][vi_s])] = int(coarse_assn[cc_t][vi_t]) found_any = True return parent if found_any else None def _finalize_cross_level_for_store( store_path: str | Path, *, cross_level_depth: int, cross_level_storage: str, ) -> None: """Persist root cross-level metadata and emit ``±N`` (N ≥ 2) link arrays. Adjacent ``±1`` arrays are emitted inline during coarsening (see :func:`_emit_inline_cross_level_links`). This finalize pass walks every adjacent (fine, coarse) level pair, decodes the on-disk ``+1`` parent map back into a flat fine→coarse array, then composes step-by-step to produce ``+N``/``-N`` link arrays for N ≥ 2 up to ``cross_level_depth``. ``cross_level_depth=-1`` means "walk all available level pairs". """ root = open_store(str(store_path), mode="r+") _stamp_root_cross_level( root, depth=cross_level_depth, storage=cross_level_storage, ) if cross_level_storage == XLEVEL_NONE or cross_level_depth == 0: return meta = read_root_metadata(root) ndim = meta.sid_ndim levels = sorted(list_resolution_levels(root)) if len(levels) < 2: return _stamp_root_capability(root, CAP_MULTISCALE_LINKS) # Build per-level chunk_assignments + total counts once. per_level: dict[int, tuple[dict[ChunkCoords, npt.NDArray[np.int64]], int]] = {} for lvl in levels: lg = get_resolution_level(root, lvl) per_level[lvl] = _reconstruct_chunk_assignments(lg, ndim) max_delta = ( max(levels) - min(levels) if cross_level_depth == -1 else int(cross_level_depth) ) if max_delta < 2: return # +1/-1 was already emitted inline # Cache each adjacent (fine_level, fine_level+1) parent array. adjacent_parent: dict[int, npt.NDArray[np.int64]] = {} for fine_level in levels[:-1]: coarse_level = fine_level + 1 if coarse_level not in per_level: continue fine_assn, n_fine = per_level[fine_level] coarse_assn, _ = per_level[coarse_level] if n_fine == 0: continue fine_lg = get_resolution_level(root, fine_level) parent = _decode_parent_from_plus_one( fine_lg, fine_assn=fine_assn, coarse_assn=coarse_assn, n_fine=n_fine, ) if parent is not None: adjacent_parent[fine_level] = parent # Compose deeper-delta parents and emit. for fine_level in levels[:-1]: if fine_level not in adjacent_parent: continue fine_assn, n_fine = per_level[fine_level] parent = adjacent_parent[fine_level].copy() for step in range(2, max_delta + 1): coarse_level = fine_level + step if coarse_level not in per_level: break inter_level = coarse_level - 1 if inter_level not in adjacent_parent: break inter_parent = adjacent_parent[inter_level] coarse_assn, n_coarse = per_level[coarse_level] if n_coarse == 0: break composed = np.full(n_fine, -1, dtype=np.int64) valid = parent >= 0 composed[valid] = inter_parent[parent[valid]] parent = composed if not np.any(parent >= 0): break _write_cross_level_edges( root, fine_level=fine_level, delta=step, fine_chunk_assignments=fine_assn, coarse_chunk_assignments=coarse_assn, n_fine=n_fine, n_coarse=n_coarse, parent=parent, sid_ndim=ndim, storage=cross_level_storage, ) def _write_cross_level_edges( root_group, *, fine_level: int, delta: int, fine_chunk_assignments: dict[ChunkCoords, npt.NDArray[np.int64]], coarse_chunk_assignments: dict[ChunkCoords, npt.NDArray[np.int64]], n_fine: int, n_coarse: int, parent: npt.NDArray[np.int64], sid_ndim: int, storage: str, ) -> None: """Materialize ``delta``-step cross-level edges between two adjacent levels. ``parent[i]`` is the metanode index in the coarser level that fine vertex ``i`` belongs to. The cross-level edges are trivially ``(i, parent[i])`` for each fine vertex. Writes the ``+delta`` arrays under the fine level. When ``storage='explicit'`` also writes the matching ``-delta`` arrays under the coarse level by swapping endpoint roles. """ if storage == XLEVEL_NONE or delta == 0: return coarse_level = fine_level + delta # Drop orphaned fine vertices (parent < 0) before building edges. valid_mask = parent >= 0 if not np.any(valid_mask): return fine_global = np.flatnonzero(valid_mask).astype(np.int64) parent_valid = parent[valid_mask].astype(np.int64) # Build chunk-mapping tables for both levels. fine_chunk_list = sorted(fine_chunk_assignments.keys()) fine_vchunks, fine_vlocal, fine_chunk_list = build_vertex_chunk_mapping( fine_chunk_assignments, n_fine, fine_chunk_list, ) coarse_chunk_list = sorted(coarse_chunk_assignments.keys()) coarse_vchunks, coarse_vlocal, coarse_chunk_list = build_vertex_chunk_mapping( coarse_chunk_assignments, n_coarse, coarse_chunk_list, ) # Trivial fine→parent edge list. edges = np.stack([fine_global, parent_valid], axis=1) aligned, cross = partition_cross_level_edges( edges, fine_vchunks, fine_vlocal, fine_chunk_list, coarse_vchunks, coarse_vlocal, coarse_chunk_list, ) fine_lg = get_resolution_level(root_group, fine_level) if aligned: create_links_array(fine_lg, link_width=2, delta=delta) for cc, rows in aligned.items(): write_chunk_links(fine_lg, cc, [rows], delta=delta) if cross: create_cross_chunk_links_array(fine_lg, delta=delta) write_cross_chunk_links(fine_lg, cross, sid_ndim=sid_ndim, delta=delta) if storage == XLEVEL_EXPLICIT: # Mirror at the coarse level under -delta: swap endpoint roles. coarse_lg = get_resolution_level(root_group, coarse_level) # Re-partition from the coarse side so chunk-alignment is # evaluated against the coarse chunk grid (intra/cross split # may differ from the fine-side view when grids don't align). rev_edges = np.stack([parent_valid, fine_global], axis=1) rev_aligned, rev_cross = partition_cross_level_edges( rev_edges, coarse_vchunks, coarse_vlocal, coarse_chunk_list, fine_vchunks, fine_vlocal, fine_chunk_list, ) if rev_aligned: create_links_array(coarse_lg, link_width=2, delta=-delta) for cc, rows in rev_aligned.items(): write_chunk_links(coarse_lg, cc, [rows], delta=-delta) if rev_cross: create_cross_chunk_links_array(coarse_lg, delta=-delta) write_cross_chunk_links( coarse_lg, rev_cross, sid_ndim=sid_ndim, delta=-delta, ) # =================================================================== # Full pyramid builder # ===================================================================
[docs] def build_pyramid( store_path: str | Path, *, factors: list[tuple[float, float]], chunk_scale_factors: list[int | tuple[int, ...]] | None = None, sparsity_strategy: str = "random", sparsity_seed: int | None = None, cross_level_depth: int = DEFAULT_CROSS_LEVEL_DEPTH, cross_level_storage: str = DEFAULT_CROSS_LEVEL_STORAGE, ) -> dict[str, Any]: """Build a multi-resolution pyramid for an existing store. Pass ``factors=[(coarsen_2, sparsity_3), ...]`` where ``factors[i]`` is applied to produce level ``i+1`` from level ``i``. Either factor at ``1.0`` opts out of that axis. Uses the per-object pyramid: each surviving object's vertices are aggregated into bin centroids (metavertices); metavertices may be shared between objects and OIDs are preserved across levels. Args: store_path: Path to the store with level 0. factors: List of ``(coarsen_factor, sparsity_factor)`` tuples, one per coarser level. chunk_scale_factors: Optional per-level multipliers applied to the source level's ``chunk_shape`` to derive each target level's ``chunk_shape``. Aligned with ``factors`` (same length). Each entry is either a scalar int (uniform per axis) or a per-axis tuple. ``None`` (default) means all-ones: every level inherits root ``chunk_shape``. sparsity_strategy: Object selection strategy. sparsity_seed: Random seed. cross_level_depth: Maximum absolute level delta for materialized cross-pyramid-level link arrays. ``0`` = none, ``N`` = up to ``±N`` per pair (or ``+N`` only when ``cross_level_storage='implicit'``), ``-1`` = walk all available level pairs. Default ``1``. cross_level_storage: ``"none"`` / ``"implicit"`` / ``"explicit"``. ``"explicit"`` materializes both ``+N`` (at the finer level) and ``-N`` (at the coarser level); ``"implicit"`` writes only ``+N``. Default ``"explicit"``. Returns: Summary dict. """ if cross_level_storage not in VALID_XLEVEL_STORAGE: raise ValueError( f"cross_level_storage={cross_level_storage!r} not in " f"{sorted(VALID_XLEVEL_STORAGE)}" ) if cross_level_depth < -1: raise ValueError( f"cross_level_depth must be ≥ -1 (got {cross_level_depth})" ) if chunk_scale_factors is not None and len(chunk_scale_factors) != len(factors): raise ValueError( f"chunk_scale_factors length {len(chunk_scale_factors)} != " f"factors length {len(factors)}", ) summaries: list[dict[str, Any]] = [] for i, fac in enumerate(factors): if isinstance(fac, (tuple, list)) and len(fac) == 2: cf, sf = float(fac[0]), float(fac[1]) else: raise ValueError( f"factors[{i}] must be a (coarsen_factor, sparsity_factor) " f"tuple; got {fac!r}" ) chunk_scale = ( chunk_scale_factors[i] if chunk_scale_factors is not None else 1 ) summaries.append(coarsen_level( store_path, source_level=i, target_level=i + 1, coarsen_factor=cf, sparsity_factor=sf, chunk_scale_factor=chunk_scale, sparsity_strategy=sparsity_strategy, sparsity_seed=sparsity_seed, cross_level_storage=cross_level_storage, )) # Compose deeper-delta cross-level links from the inline-emitted +1 # arrays. Also stamps root cross-level metadata + the multiscale # links capability. _finalize_cross_level_for_store( store_path, cross_level_depth=cross_level_depth, cross_level_storage=cross_level_storage, ) return { "levels_created": len(summaries), "level_specs": summaries, "method": COARSEN_PER_OBJECT, "cross_level_depth": cross_level_depth, "cross_level_storage": cross_level_storage, }