Note
Go to the end to download the full example code.
Overview of a Collection#
This section walks through the main features of a v3
Collection: building a schema, creating a partitioned
collection on a store, inserting data, querying with filters, and updating
variables in place.
Run with:
python examples/ex_collection.py
from pathlib import Path
import pprint
import shutil
import tempfile
import numpy
import zcollection as zc
Initialization#
v3 stores are URL-driven. LocalStore (POSIX), MemoryStore
(in-process, useful for tests), and IcechunkStore (transactional) are
built in. Pass a string URL to create_collection() and
the right backend is chosen for you.
target = Path(tempfile.gettempdir()) / "zc-ex-collection"
if target.exists():
shutil.rmtree(target)
Build a schema#
A v3 collection is created from an explicit
DatasetSchema. The fluent
SchemaBuilder (aliased as zc.Schema()) declares
dimensions and variables up front, including their codecs.
def build_schema() -> zc.DatasetSchema:
"""Build the dataset schema for the example."""
return (
zc.Schema()
.with_dimension("time", chunks=4096)
.with_dimension("x_ac", size=240, chunks=240)
.with_variable("time", dtype="int64", dimensions=("time",))
.with_variable("partition", dtype="int64", dimensions=("time",))
.with_variable("var1", dtype="float32", dimensions=("time", "x_ac"))
.with_variable(
"var2",
dtype="float32",
dimensions=("time", "x_ac"),
fill_value=numpy.float32("nan"),
)
.build()
)
schema = build_schema()
Build a sample dataset#
A Dataset is the in-memory pairing of a schema
with concrete numpy (or dask) arrays. There is only one Variable class
in v3 — the Array / DelayedArray split from v2 is gone.
def build_dataset(
schema: zc.DatasetSchema, n_partitions: int = 3
) -> zc.Dataset:
"""Build a synthetic dataset matching the given schema."""
rng = numpy.random.default_rng(42)
rows_per_part = 10_000
n = n_partitions * rows_per_part
time = numpy.arange(n, dtype="int64")
partition = numpy.repeat(
numpy.arange(n_partitions, dtype="int64"), rows_per_part
)
var1 = rng.standard_normal(size=(n, 240), dtype="float32")
var2 = numpy.full((n, 240), numpy.float32("nan"))
return zc.Dataset(
schema=schema,
variables={
"time": zc.Variable(schema.variables["time"], time),
"partition": zc.Variable(schema.variables["partition"], partition),
"var1": zc.Variable(schema.variables["var1"], var1),
"var2": zc.Variable(schema.variables["var2"], var2),
},
)
zds = build_dataset(schema)
print(zds)
<zcollection.data.dataset.Dataset '/'> Size: 55.39 MB
Dimensions: (time: 30000, x_ac: 240)
Data variables:
time (time) int64 234.38 kB numpy.ndarray<size=234.38 kB>
partition (time) int64 234.38 kB numpy.ndarray<size=234.38 kB>
var1 (time, x_ac) float32 27.47 MB numpy.ndarray<size=27.47 MB>
var2 (time, x_ac) float32 27.47 MB numpy.ndarray<size=27.47 MB>
Create the collection#
Every keyword is named — there is no positional (axis, ds, partitioner,
path, fs) form anymore. The collection inherits its schema from the call;
the schema is then frozen on disk and enforced by every later insert.
Partitionings are pure-numpy splitters that map a dataset’s rows to
partition keys. Here we use a synthetic
Sequence bucket variable. For real
time series, prefer Date.
collection = zc.create_collection(
f"file://{target}",
schema=schema,
axis="time",
partitioning=zc.partitioning.Sequence(("partition",), dimension="time"),
)
Insert data#
Insertion is partitioned automatically. The default merge strategy is
replace (last write wins for a given partition); other strategies live
under zcollection.merge.
wrote 3 partitions: ['partition=0', 'partition=1', 'partition=2']
Note
Use zcollection.merge.time_series() to incrementally append
along a sorted time axis without overwriting prior data:
from zcollection import merge
collection.insert(zds, merge=merge.time_series)
Reopen and list partitions#
collection = zc.open_collection(f"file://{target}", mode="rw")
pprint.pprint(list(collection.partitions()))
['partition=0', 'partition=1', 'partition=2']
Query#
query() returns a
Dataset (or None if no partition matches).
Filters use a typed expression language over the partition keys; no
eval is involved.
loaded = collection.query()
assert loaded is not None
print(f"full query: {loaded}")
filtered = collection.query(filters="partition == 1")
assert filtered is not None
assert filtered["partition"].to_numpy().tolist() == [1] * 10_000
print("filter pushdown: OK")
full query: <zcollection.data.dataset.Dataset '/'> Size: 55.39 MB
Dimensions: (time: 30000, x_ac: 240)
Data variables:
time (time) int64 234.38 kB numpy.ndarray<size=234.38 kB>
partition (time) int64 234.38 kB numpy.ndarray<size=234.38 kB>
var1 (time, x_ac) float32 27.47 MB numpy.ndarray<size=27.47 MB>
var2 (time, x_ac) float32 27.47 MB numpy.ndarray<size=27.47 MB>
Attributes:
_zc_partition_key : [['partition', 0]]
filter pushdown: OK
Subset variables
subset = collection.query(variables=("time", "var1"))
assert subset is not None
print(f"variable subset: {tuple(subset.variables)}")
variable subset: ('time', 'var1')
Update variables in place#
update() rewrites the partitions matching
filters after applying fn to each one. fn receives a Dataset
and must return a new Dataset with the same dimensions.
def square_var1(ds: zc.Dataset) -> zc.Dataset:
"""Return a dataset with ``var1`` squared."""
arr = ds["var1"].to_numpy() ** 2
return zc.Dataset(
schema=ds.schema,
variables={
**{name: ds[name] for name in ds},
"var1": zc.Variable(ds.schema.variables["var1"], arr),
},
)
collection.update(square_var1, filters="partition == 0")
after = collection.query(filters="partition == 0")
assert after is not None
print(f"updated max(var1)= {float(after['var1'].to_numpy().max()):.3f}")
updated max(var1)= 28.058
Map a function over partitions#
map() applies fn to every partition
(no write-back) and returns {partition_path: result}. Use it for
reductions, statistics, or building secondary indices.
* partition=0: mean(var1) = +0.9993
* partition=1: mean(var1) = -0.0007
* partition=2: mean(var1) = +0.0009
Drop partitions#
Surgical deletes use the same filter language. Reserved layout files
(zarr.json, _immutable, _catalog) are never touched.
dropped: ['partition=2']
remaining: ['partition=0', 'partition=1']