Source code for zcollection.partitioning.base

# Copyright (c) 2022-2026 CNES.
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""Partitioning Protocol — pure-numpy partition key extraction."""

from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
from collections.abc import Iterable, Iterator

import numpy


if TYPE_CHECKING:
    from ..data import Dataset

PartitionKey = tuple[tuple[str, int], ...]


[docs] @runtime_checkable class Partitioning(Protocol): """Maps dataset rows along a partitioning axis to partition keys. Implementations operate on plain numpy — Dask is layered higher up. """ #: A human-readable name for this partitioning strategy, e.g. "sequence". name: str @property def axis(self) -> tuple[str, ...]: """Variables (along the partitioning dimension) used to derive the key.""" ... @property def dimension(self) -> str: """The dataset dimension this partitioning splits.""" ...
[docs] def split(self, dataset: Dataset) -> Iterator[tuple[PartitionKey, slice]]: """Yield (partition_key, slice) for each contiguous run.""" ...
[docs] def encode(self, key: PartitionKey) -> str: """Encode a key as a relative storage path.""" ...
[docs] def decode(self, path: str) -> PartitionKey: """Decode a relative storage path into a key.""" ...
[docs] def to_json(self) -> dict[str, Any]: """Return a JSON-serializable description of the partitioning.""" ...
def keys_from_columns( columns: dict[str, numpy.ndarray], ) -> tuple[numpy.ndarray, numpy.ndarray]: """Return (unique_rows, inverse) for the stacked columns. ``unique_rows`` has shape (n_unique, n_cols); ``inverse`` maps each input row to its unique index. Sort order is lexicographic and stable. Args: columns: Mapping of column name to 1D array of partition values. Returns: A tuple of (unique_rows, inverse) where: - unique_rows: A 2D array of shape (n_unique, n_cols) containing the unique combinations of partition values. - inverse: A 1D array mapping each input row to its corresponding unique index in unique_rows. Raises: ValueError: If no columns are provided or if columns have mismatched lengths. """ if not columns: raise ValueError("at least one column is required") arrays = [numpy.asarray(c) for c in columns.values()] n = arrays[0].shape[0] if any(a.shape[0] != n for a in arrays): raise ValueError("all partition columns must share length") stacked = numpy.column_stack(arrays) # numpy.unique with axis=0 gives sorted unique rows + inverse. unique, inverse = numpy.unique(stacked, axis=0, return_inverse=True) return unique, inverse def runs_from_inverse(inverse: numpy.ndarray) -> Iterable[tuple[int, slice]]: """Yield (group_id, slice) for each contiguous run of equal labels. A row that re-appears later produces a separate run — partitions can be fragmented. Callers concatenate fragments belonging to the same group. Args: inverse: A 1D array mapping each input row to its corresponding unique index in the unique rows. Yields: Tuples of (group_id, slice) where: - group_id: An integer representing the unique index of the partition key. - slice: A slice object indicating the start and end indices of the contiguous run of rows corresponding to the group_id. """ if inverse.size == 0: return starts = numpy.flatnonzero(numpy.diff(inverse, prepend=inverse[0] - 1)) starts = numpy.concatenate([starts, [inverse.size]]) for i in range(len(starts) - 1): s, e = int(starts[i]), int(starts[i + 1]) yield int(inverse[s]), slice(s, e)