# Copyright (c) 2022-2026 CNES.
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""Codec profiles and the :class:`CodecStack` data class.
A profile is a (named) recipe that produces a :class:`CodecStack` for a
variable. Three profiles ship by default:
- ``local-fast``: no sharding, Zstd L3.
- ``cloud-balanced``: sharded (~128 MiB shards), Zstd L3. *Default*.
- ``cloud-cold``: sharded (~512 MiB shards), Zstd L9.
Codec descriptors are kept as simple dicts so the schema serializes cleanly
without depending on Zarr v3 codec object stability. The ``io`` layer
translates these into ``zarr.codecs`` objects at write time via
:func:`resolve_codec`.
"""
from typing import Any
from collections.abc import Callable, Iterable
from dataclasses import dataclass
import numpy
import zarr.codecs as zcodecs
#: A single codec is described as ``{"name": str, "configuration": dict}``,
#: matching the Zarr v3 spec for codec metadata. This makes round-tripping
#: through ``_zcollection.json`` trivial.
CodecDescriptor = dict[str, Any]
[docs]
@dataclass(frozen=True, slots=True)
class CodecStack:
"""The persisted codec pipeline for one variable.
Each codec is held as a JSON-clean ``{"name": ..., "configuration": ...}``
dict so the schema round-trips through ``_zcollection.json`` without
depending on Zarr v3 codec object identity. Materialisation into
concrete ``zarr.codecs`` instances happens lazily in
:func:`resolve_codec` at write time.
"""
#: Array-to-array codecs (Zarr v3 *filters*), applied in order to the
#: chunk's numpy array before serialisation.
array_to_array: tuple[CodecDescriptor, ...] = ()
#: The single array-to-bytes codec (Zarr v3 *serializer*) that turns
#: array elements into a byte string for one chunk. The Zarr v3 spec
#: requires exactly one codec in this slot.
array_to_bytes: CodecDescriptor | None = None
#: Bytes-to-bytes codecs (Zarr v3 *compressors*), applied in order to
#: the per-chunk byte string. With sharding on, they compress each
#: inner chunk inside a shard; with sharding off, they compress each
#: chunk directly.
bytes_to_bytes: tuple[CodecDescriptor, ...] = ()
#: When ``True``, the variable's chunks are bundled into shards via
#: :class:`zarr.codecs.ShardingCodec`. The codecs in
#: :attr:`bytes_to_bytes` then compress each *inner chunk* inside a
#: shard. When ``False``, each chunk is compressed directly and shards
#: are not used.
sharded: bool = False
#: Target byte budget for each shard when :attr:`sharded` is ``True``;
#: ``None`` otherwise. The actual shard shape is picked by
#: :func:`zcollection.codecs.sharding.shard_decision`, which honours
#: this as a hint, not a hard cap.
shard_target_bytes: int | None = None
[docs]
def to_json(self) -> dict[str, Any]:
"""Return the codec stack as a JSON-serialisable dictionary."""
return {
"array_to_array": list(self.array_to_array),
"array_to_bytes": self.array_to_bytes,
"bytes_to_bytes": list(self.bytes_to_bytes),
"sharded": self.sharded,
"shard_target_bytes": self.shard_target_bytes,
}
[docs]
@classmethod
def from_json(cls, payload: dict[str, Any]) -> CodecStack:
"""Build a codec stack from its JSON representation."""
return cls(
array_to_array=tuple(payload.get("array_to_array", [])),
array_to_bytes=payload.get("array_to_bytes"),
bytes_to_bytes=tuple(payload.get("bytes_to_bytes", [])),
sharded=bool(payload.get("sharded", False)),
shard_target_bytes=payload.get("shard_target_bytes"),
)
def _bytes_codec() -> CodecDescriptor:
"""Return the standard array-to-bytes serializer (little-endian ``bytes``)."""
return {"name": "bytes", "configuration": {"endian": "little"}}
def _zstd(level: int = 3) -> CodecDescriptor:
"""Return a Zstd bytes-to-bytes (compressor) descriptor at ``level``."""
return {
"name": "zstd",
"configuration": {"level": level, "checksum": False},
}
@dataclass(frozen=True, slots=True)
class _Profile:
"""A codec profile specification."""
#: Profile name.
name: str
#: Whether the profile produces sharded stacks.
sharded: bool
#: Target byte budget for each shard, consumed by
#: :class:`~zarr.codecs.ShardingCodec` (the array-to-bytes serializer
#: in a sharded stack). ``None`` when :attr:`sharded` is ``False``.
target_shard_bytes: int | None
#: The single bytes-to-bytes codec (compressor) inserted into the
#: stack — profiles do not currently compose multiple compressors.
compressor: CodecDescriptor
def codecs(self) -> CodecStack:
"""Return the :class:`CodecStack` materialised from this profile."""
return CodecStack(
array_to_array=(),
array_to_bytes=_bytes_codec(),
bytes_to_bytes=(self.compressor,),
sharded=self.sharded,
shard_target_bytes=self.target_shard_bytes
if self.sharded
else None,
)
#: The built-in codec profiles, keyed by name.
PROFILES: dict[str, _Profile] = {
"local-fast": _Profile(
name="local-fast",
sharded=False,
target_shard_bytes=None,
compressor=_zstd(3),
),
"cloud-balanced": _Profile(
name="cloud-balanced",
sharded=True,
target_shard_bytes=128 << 20,
compressor=_zstd(3),
),
"cloud-cold": _Profile(
name="cloud-cold",
sharded=True,
target_shard_bytes=512 << 20,
compressor=_zstd(9),
),
}
#: The default profile name, used when no profile is specified.
DEFAULT_PROFILE: str = "cloud-balanced"
[docs]
def profile_names() -> tuple[str, ...]:
"""Return the names of all registered codec profiles."""
return tuple(PROFILES)
[docs]
def profile(
name: str | None = None,
*,
filters: Iterable[CodecDescriptor] | None = None,
compressor: CodecDescriptor | None = None,
) -> CodecStack:
"""Build a :class:`CodecStack` from a named profile, with overrides.
Args:
name: Profile name. ``None`` uses :data:`DEFAULT_PROFILE`.
filters: Optional array-to-array codec descriptors that override
the profile's empty filter list.
compressor: Optional bytes-to-bytes codec descriptor that
replaces the profile's entire compressor pipeline with this
single codec.
Returns:
A :class:`CodecStack` materialised from the profile, with the
overrides applied.
Raises:
KeyError: If ``name`` is not a registered profile.
"""
if name is None:
name = DEFAULT_PROFILE
if name not in PROFILES:
raise KeyError(
f"unknown codec profile {name!r}; available: {profile_names()!r}"
)
spec = PROFILES[name]
base = spec.codecs()
return CodecStack(
array_to_array=tuple(filters) if filters else base.array_to_array,
array_to_bytes=base.array_to_bytes,
bytes_to_bytes=(
(compressor,) if compressor is not None else base.bytes_to_bytes
),
sharded=base.sharded,
shard_target_bytes=base.shard_target_bytes,
)
[docs]
def auto_codecs(
dtype: numpy.dtype,
profile_name: str | None = None,
) -> CodecStack:
"""Pick a :class:`CodecStack` for a variable using the named profile.
The result is currently dtype-agnostic — ``dtype`` is accepted on
the public surface so future profiles can specialise (e.g. byte-grain
filters for booleans, transposes for high-rank arrays) without an API
break. For now it is ignored.
Args:
dtype: The variable dtype. Reserved for forward compatibility;
does not affect the returned stack today.
profile_name: Profile name. ``None`` uses :data:`DEFAULT_PROFILE`.
Returns:
A :class:`CodecStack` materialised from the profile.
Raises:
KeyError: If ``profile_name`` is not a registered profile.
"""
del dtype # reserved; see docstring.
name = profile_name or DEFAULT_PROFILE
if name not in PROFILES:
raise KeyError(f"unknown codec profile {name!r}")
return PROFILES[name].codecs()
def shard_target_bytes(profile_name: str | None = None) -> int | None:
"""Return the target shard byte budget for a profile.
Args:
profile_name: Profile name. ``None`` uses :data:`DEFAULT_PROFILE`.
Returns:
The profile's target shard byte budget, or ``None`` if the
profile does not shard.
Raises:
KeyError: If ``profile_name`` is not a registered profile.
"""
name = profile_name or DEFAULT_PROFILE
if name not in PROFILES:
raise KeyError(f"unknown codec profile {name!r}")
spec = PROFILES[name]
return spec.target_shard_bytes if spec.sharded else None
#: Mapping from Zarr v3 codec names to builder functions that take a
#: configuration dict and return a Zarr v3 codec instance.
_CODEC_BUILDERS: dict[str, Callable[[dict[str, Any]], Any]] = {
"bytes": lambda cfg: zcodecs.BytesCodec(endian=cfg.get("endian", "little")),
"zstd": lambda cfg: zcodecs.ZstdCodec(
level=cfg.get("level", 3),
checksum=cfg.get("checksum", False),
),
"blosc": lambda cfg: zcodecs.BloscCodec(**cfg),
"gzip": lambda cfg: zcodecs.GzipCodec(**cfg),
"crc32c": lambda _cfg: zcodecs.Crc32cCodec(),
"transpose": lambda cfg: zcodecs.TransposeCodec(order=cfg["order"]),
"vlen-utf8": lambda _cfg: zcodecs.VLenUTF8Codec(),
"vlen-bytes": lambda _cfg: zcodecs.VLenBytesCodec(),
}
def resolve_codec(descriptor: CodecDescriptor) -> Any:
"""Convert a JSON codec descriptor into a Zarr v3 codec instance.
Used by the I/O layer at write time to materialise the persisted
descriptors into concrete ``zarr.codecs`` objects.
Args:
descriptor: A codec descriptor dict with keys ``"name"`` and
``"configuration"`` (the latter is treated as empty when
absent or ``None``).
Returns:
An instance of the corresponding Zarr v3 codec.
Raises:
KeyError: If ``descriptor["name"]`` is not in the codec
registry. The set of supported names is the keys of
:data:`_CODEC_BUILDERS` (``bytes``, ``zstd``, ``blosc``,
``gzip``, ``crc32c``, ``transpose``, ``vlen-utf8``,
``vlen-bytes``).
"""
name = descriptor["name"]
cfg = descriptor.get("configuration", {}) or {}
builder = _CODEC_BUILDERS.get(name)
if builder is None:
raise KeyError(f"unsupported codec descriptor: {descriptor!r}")
return builder(cfg)