Building an implementation#
This chapter aims at giving details about the concepts supporting the generic
model FilesDatabase. The ultimate
purpose is to build a custom implementation supporting any file set.
File (and folder) name convention#
The central piece of the model is the definition of a file name convention. Usually, file names contain a great deal of information that can be used for selection. It may display periods for time series, geographical extent for tiles and so on.
A file name convention has a dual purpose: interpreting a file name to deduce a
record of information, and conversely using a record of information to generate
a filename. Interpretation relies on a regex whereas generation uses an
f-string. Both functionalities also need FileNameField
definitions to specify the type of the data extracted from the file name, how to
encode and decode it, and which filters can be used onto it.
Let’s take the SWOT altimery mission Low Rate product as an example:
SWOT_L2_LR_SSH_Expert_001_010_20240101T000000_20240101T030000_PGC0_01.nc
This file name contains the following information
L2_LR_SSHis the product (Level 2 Low Rate Sea Surface Height)Expertis the products’ subset (other subset are Basic, WindWave and Unsmoothed)001is the cycle number010is the pass number20240101T000000is the timestamp for the first measurement20240101T030000is the timestamp for the last measurementPGC0_01is the version of the product
To represent this convention, we define a regex with explicit regex groups for representing the fields. The groups names are important as they will be used to automatically build an API (see the following sections).
import re
pattern = re.compile(
r'SWOT_(?P<level>.*)_LR_SSH_(?P<subset>.*)_(?P<cycle_number>\d{3})_(?P<pass_number>\d{3})_'
r'(?P<time>\d{8}T\d{6}_\d{8}T\d{6})_(?P<version>P[I|G][A-Z]\d{1}_\d{2}).nc'
)
Online tools are useful to help building the regex for non-critical use. In addition, inspiration can be found in the many existing implementations.
Once the regex is identified, the groups must be associated to fields by
subclassing FileNameField. File name
fields are used to declare which type of data can be extracted from a file name,
and how to decode and encode them. The following example shows a field matching
the cycle_number group declared in the file name convention’s regex
from fcollections.core import FileNameFieldInteger
field = FileNameFieldInteger('cycle_number')
field.type
list[int] | slice | int
field.decode('10')
10
field.encode(10)
'10'
In addition, file name fields provide a testing method that can compare two objects strictly or loosely related to the field’s type. The tested object must be an instance of the type declared in the field, whereas the reference object is dependent on the field implementation and can be more loosely related to the field’s type. For example, the integer field checks if an integer is in a list, a range or equal to another integer.
field.test(10, 10)
True
field.test(10, 9)
False
field.test([9, 10, 11], 10)
True
field.test(slice(9, 11), 10)
True
The resulting convention is the composition of the regex, the fields and optionally the generation f-string (not shown in this example).
from fcollections.core import FileNameConvention, FileNameFieldPeriod, FileNameFieldString
convention = FileNameConvention(
regex=pattern,
fields=[
FileNameFieldInteger('cycle_number'),
FileNameFieldInteger('pass_number'),
FileNameFieldPeriod('time', '%Y%m%dT%H%M%S', '_',),
FileNameFieldString('level'),
FileNameFieldString('subset'),
FileNameFieldString('version')])
Note
File name conventions can also be applied on folders
Layout#
A files collection is usually stored on a file system storage at a root path, with nested folders for organizing the data. By default, all subfolders are scanned to build the files metadata table. However, in most case only a selection of files is needed: a global scan is then inefficient.
Let us consider the following layout related to our running example
collection_root/
├── PIC2/
│ ├── Basic/
│ │ ├── cycle_001/
| | ├── SWOT_L2_LR_SSH_Expert_001_010_20240101T000000_20240101T030000_PGC0_01.nc
│ │ ├── cycle_002/
│ │ └── cycle_003/
│ └── Expert/
│ ├── cycle_001/
│ └── cycle_002/
└── PID0/
├── Basic/
│ ├── cycle_001/
│ └── cycle_002/
└── Expert/
├── cycle_001/
└── cycle_002/
In order to finely select the subfolders to explore and speed up the queries,
the Layout class declares how to interpret
folders, files and their organization. It is a simple list of
FileNameConvention.
For our example, the following layout matched the actual files collection structure
from fcollections.core import Layout
layout = Layout([FileNameConvention(re.compile(r'^(?P<version>P[I|G][A-Z]\d{1}_\d{2})$'), [convention.get_field('version')]),
FileNameConvention(re.compile(r'^(?P<subset>Basic|Expert|Unsmoothed|WindWave)$'), [convention.get_field('subset')]),
FileNameConvention(re.compile(r'^cycle_(?P<cycle_number>\d{3})$'), [convention.get_field('cycle_number')]),
# Don't forget the file (leaf node in this Layout)
convention])
Warning
When defining a layout, we often have the same field defined in both the file
and folders. It is highly recommended to write regexes matching the whole names
for the folders with fields in common (regex should begin with ^ and end
with $)
To avoid redefining the same fields, a
utility method can
extract a field from the file name convention.
The simplest case for organizing the files is a single folder containing all the files: a simple layout containing only the file naming convention can be defined
layout_flat = Layout([convention])
File listing and automatic filters#
Once one or more layouts are established, we can scan a file set to build a series of records. Records contain decoded information from each file name and are converted to a pandas DataFrame. The dataframe columns are named after the regex groups.
import fsspec.implementations.memory as fs_mem
from fcollections.core import FileSystemMetadataCollector, DirNode
fs = fs_mem.MemoryFileSystem()
fs.touch('/SWOT_L2_LR_SSH_Expert_001_010_20240101T000000_20240101T030000_PGC0_01.nc')
fs.touch('/SWOT_L2_LR_SSH_Expert_001_012_20240101T030000_20240101T030000_PGC0_01.nc')
fs.touch('/SWOT_L2_LR_SSH_Expert_002_010_20240102T000000_20240102T030000_PGC0_01.nc')
root_node = DirNode("/", {"name": "/"}, fs, 0)
collector = FileSystemMetadataCollector([layout_flat], root_node)
collector.to_dataframe()
| cycle_number | pass_number | time | level | subset | version | filename | |
|---|---|---|---|---|---|---|---|
| 0 | 1 | 10 | [2024-01-01T00:00:00.000000, 2024-01-01T03:00:... | L2 | Expert | PGC0_01 | /SWOT_L2_LR_SSH_Expert_001_010_20240101T000000... |
| 1 | 1 | 12 | [2024-01-01T03:00:00.000000, 2024-01-01T03:00:... | L2 | Expert | PGC0_01 | /SWOT_L2_LR_SSH_Expert_001_012_20240101T030000... |
| 2 | 2 | 10 | [2024-01-02T00:00:00.000000, 2024-01-02T03:00:... | L2 | Expert | PGC0_01 | /SWOT_L2_LR_SSH_Expert_002_010_20240102T000000... |
The collector looks for files in the given path, parses the file names and returns the records. It is possible to set filters on the records to get a subset of the files. The filters are automatically configured from the convention’s fields and are passed as keywords arguments. For example, we can set filters to retrieve only one half orbit from our altimetry dataset.
collector.to_dataframe(cycle_number=1, subset='Expert', pass_number=12)
| cycle_number | pass_number | time | level | subset | version | filename | |
|---|---|---|---|---|---|---|---|
| 0 | 1 | 12 | [2024-01-01T03:00:00.000000, 2024-01-01T03:00:... | L2 | Expert | PGC0_01 | /SWOT_L2_LR_SSH_Expert_001_012_20240101T030000... |
In addition, the collector can add information given by the filesystem. The
list of the desired information can be passed to the stat_fields argument.
Each value will be shown as a column in the returned dataframe.
collector.to_dataframe(stat_fields=['size', 'created'])
| cycle_number | pass_number | time | level | subset | version | filename | size | created | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | 1 | 10 | [2024-01-01T00:00:00.000000, 2024-01-01T03:00:... | L2 | Expert | PGC0_01 | /SWOT_L2_LR_SSH_Expert_001_010_20240101T000000... | 0 | 1.772107e+09 |
| 1 | 1 | 12 | [2024-01-01T03:00:00.000000, 2024-01-01T03:00:... | L2 | Expert | PGC0_01 | /SWOT_L2_LR_SSH_Expert_001_012_20240101T030000... | 0 | 1.772107e+09 |
| 2 | 2 | 10 | [2024-01-02T00:00:00.000000, 2024-01-02T03:00:... | L2 | Expert | PGC0_01 | /SWOT_L2_LR_SSH_Expert_002_010_20240102T000000... | 0 | 1.772107e+09 |
Note
Using fs.ls(..., detail=True) will give the available values for the
stat_fields arguments
Reading files#
Reading files is the last element needed before building a functional API. Once
the desired files are listed, we can read and combine them using the
IFilesReader interface. A basic
wrapper around xarray.open_mfdataset() can be instanciated in order to
freeze the reading arguments. In most case, giving the engine and the data
combination method is sufficient.
In the following example, we show how to generate a simple reader to concatenate
stub data along the num_lines dimension.
import tempfile
import numpy as np
import xarray as xr
import fsspec.implementations.local as fs_loc
from fcollections.core import OpenMfDataset
# Create stub data
path = tempfile.mkdtemp()
ds = xr.DataArray(np.random.random((9860, 69)), dims=('num_lines', 'num_pixels')).to_dataset(name='ssha')
ds.to_netcdf(f'{path}/SWOT_L2_LR_SSH_Expert_001_011_20240101T000000_20240101T030000_PGC0_01.nc')
ds.to_netcdf(f'{path}/SWOT_L2_LR_SSH_Expert_001_012_20240101T030000_20240101T060000_PGC0_01.nc')
reader = OpenMfDataset({'engine': 'h5netcdf',
'concat_dim' :'num_lines',
'combine': 'nested'})
reader.read([
f'{path}/SWOT_L2_LR_SSH_Expert_001_011_20240101T000000_20240101T030000_PGC0_01.nc',
f'{path}/SWOT_L2_LR_SSH_Expert_001_012_20240101T030000_20240101T060000_PGC0_01.nc'])
<xarray.Dataset> Size: 11MB
Dimensions: (num_lines: 19720, num_pixels: 69)
Dimensions without coordinates: num_lines, num_pixels
Data variables:
ssha (num_lines, num_pixels) float64 11MB dask.array<chunksize=(9860, 69), meta=np.ndarray>Query API#
Combining all elements is done by subclassing FilesDatabase.
The subclass must define class arguments for the layouts and the reader. The
file listing method from the FileSystemMetadataCollector
is integrated in the interface and can be called from the new object to get the
records.
from fcollections.core import FilesDatabase
class MyDatabase(FilesDatabase):
layouts = [layout_flat, layout]
reader = reader
fc = MyDatabase(path)
fc.list_files()
| cycle_number | pass_number | time | level | subset | version | filename | |
|---|---|---|---|---|---|---|---|
| 0 | 1 | 12 | [2024-01-01T03:00:00.000000, 2024-01-01T06:00:... | L2 | Expert | PGC0_01 | /tmp/tmpsl5_070d/SWOT_L2_LR_SSH_Expert_001_012... |
| 1 | 1 | 11 | [2024-01-01T00:00:00.000000, 2024-01-01T03:00:... | L2 | Expert | PGC0_01 | /tmp/tmpsl5_070d/SWOT_L2_LR_SSH_Expert_001_011... |
Apart from listing files, the FilesDatabase
exposes the query method. This method mixes the previously introduced
concepts of files’ listing, selection, reading and combination to produce a
dataset. Like for the FileSystemMetadataCollector,
filters are automatically built from the file name convention.
fc.query(pass_number=11)
<xarray.Dataset> Size: 5MB
Dimensions: (num_lines: 9860, num_pixels: 69)
Dimensions without coordinates: num_lines, num_pixels
Data variables:
ssha (num_lines, num_pixels) float64 5MB dask.array<chunksize=(9860, 69), meta=np.ndarray>The documentation for the available filters can be displayed with the query
method help.
fc.query?
Handling duplicates and unmixable subsets#
File sets often piles up multiple versions or subsets of the same data. In addition, some duplicates may be found even within the same subsets. In this particular case, filtering the files before reading them becomes a necessity rather than a possibility.
To get a consistent dataset from the query method, a FilesDatabase
subclass can define the following
SubsetsUnmixer: this class interprets the records of the files’ metadata table to detect subset mixing. Thepartition_keysattribute serves as the subset definition. Using this partition keys, the unmixer can split the records table into multiple sub-tables containing homogeneous data. Theauto_pick_lastattribute is a list of keys that will help sorting and picking the subset. If the auto pick does not include all of the partition keys, it can prove insufficient so we expect manual filtering from the user. In case the manual inputs are missing when needed, an error will be raised to help the user properly configure the filtersDeduplicator: this class interprets the records of a files’ metadata table to detect duplicates. Theuniqueattribute serves as the unicity definition. Theauto_pick_lastattribute is a list of keys that will be sorted and deduplicated
The subset unmixing and picking is ran just after listing the files. The user is
encouraged to pick one subset with some filters applied to file listing. For
example, in the SWOT Low Rate products, the subset field should be set to either
Basic, Expert, WindWave or Unsmoothed. After one subset is
selected, the deduplication process is ran to remove files containing the same
data. In our case, multiple versions may pile up so the auto pick property will
choose the latest. Once the files’ list is properly processed, it can be read to
form an homogeneous dataset for the user.
To illustrate how to use the SubsetsUnmixer
and Deduplicator, we can modify our
previous custom implementation
from fcollections.core import Deduplicator, SubsetsUnmixer
deduplicator = Deduplicator(
# Altimetry granules are defined using cycle and pass numbers
unique=('cycle_number', 'pass_number'),
# Auto pick will automatically keep the latest version
auto_pick_last=('version',))
subset_unmixer = SubsetsUnmixer(
# We should not mix data part of a different subset Basic/Expert/
# Unsmoothed/WindWave
partition_keys=['subset'])
class MyDatabase(FilesDatabase):
layouts = [layout_flat, layout]
reader = reader
deduplicator = deduplicator
unmixer = subset_unmixer
fc = MyDatabase(path)
# Create a duplicate
ds.to_netcdf(f'{path}/SWOT_L2_LR_SSH_Expert_001_011_20240101T000000_20240101T030000_PIC0_01.nc')
fc.list_files(deduplicate=False)
| cycle_number | pass_number | time | level | subset | version | filename | |
|---|---|---|---|---|---|---|---|
| 0 | 1 | 11 | [2024-01-01T00:00:00.000000, 2024-01-01T03:00:... | L2 | Expert | PIC0_01 | /tmp/tmpsl5_070d/SWOT_L2_LR_SSH_Expert_001_011... |
| 1 | 1 | 12 | [2024-01-01T03:00:00.000000, 2024-01-01T06:00:... | L2 | Expert | PGC0_01 | /tmp/tmpsl5_070d/SWOT_L2_LR_SSH_Expert_001_012... |
| 2 | 1 | 11 | [2024-01-01T00:00:00.000000, 2024-01-01T03:00:... | L2 | Expert | PGC0_01 | /tmp/tmpsl5_070d/SWOT_L2_LR_SSH_Expert_001_011... |
fc.list_files(deduplicate=True)
| cycle_number | pass_number | time | level | subset | version | filename | |
|---|---|---|---|---|---|---|---|
| 0 | 1 | 11 | [2024-01-01T00:00:00.000000, 2024-01-01T03:00:... | L2 | Expert | PIC0_01 | /tmp/tmpsl5_070d/SWOT_L2_LR_SSH_Expert_001_011... |
| 1 | 1 | 12 | [2024-01-01T03:00:00.000000, 2024-01-01T06:00:... | L2 | Expert | PGC0_01 | /tmp/tmpsl5_070d/SWOT_L2_LR_SSH_Expert_001_012... |
# In case there is also Unsmoothed products, 'subset' argument will become
# mandatory to filter out other subsets
ds.to_netcdf(f'{path}/SWOT_L2_LR_SSH_Unsmoothed_001_011_20240101T000000_20240101T030000_PIC0_01.nc')
# An error is expected if we enforce one subset only
fc.list_files(unmix=True)
| cycle_number | pass_number | time | level | subset | version | filename | |
|---|---|---|---|---|---|---|---|
| 0 | 1 | 11 | [2024-01-01T00:00:00.000000, 2024-01-01T03:00:... | L2 | Expert | PIC0_01 | /tmp/tmpsl5_070d/SWOT_L2_LR_SSH_Expert_001_011... |
| 1 | 1 | 12 | [2024-01-01T03:00:00.000000, 2024-01-01T06:00:... | L2 | Expert | PGC0_01 | /tmp/tmpsl5_070d/SWOT_L2_LR_SSH_Expert_001_012... |
| 2 | 1 | 11 | [2024-01-01T00:00:00.000000, 2024-01-01T03:00:... | L2 | Expert | PGC0_01 | /tmp/tmpsl5_070d/SWOT_L2_LR_SSH_Expert_001_011... |
fc.list_files(subset='Expert', unmix=True)
| cycle_number | pass_number | time | level | subset | version | filename | |
|---|---|---|---|---|---|---|---|
| 0 | 1 | 11 | [2024-01-01T00:00:00.000000, 2024-01-01T03:00:... | L2 | Expert | PIC0_01 | /tmp/tmpsl5_070d/SWOT_L2_LR_SSH_Expert_001_011... |
| 1 | 1 | 12 | [2024-01-01T03:00:00.000000, 2024-01-01T06:00:... | L2 | Expert | PGC0_01 | /tmp/tmpsl5_070d/SWOT_L2_LR_SSH_Expert_001_012... |
| 2 | 1 | 11 | [2024-01-01T00:00:00.000000, 2024-01-01T03:00:... | L2 | Expert | PGC0_01 | /tmp/tmpsl5_070d/SWOT_L2_LR_SSH_Expert_001_011... |
Note
In our example, we explicitely allowed versions mixing by setting the
version field in the deduplicator. We could have chosen otherwise and
set this field in the subset unmixer instead, enforcing one version per
subset
Mixins#
A custom FilesDatabase may need additional functionalities apart from
listing and reading files. These functionalities are usually specific to the use
case or the data contained in the files. This lack of genericity justifies using
a multiple inheritance mecanism to inject them in the generic model. Because the
classes adding the functionalities are abstract, they should be mixed with other
classes to get a complete implementation: these abstract classes are then called
mixins.
Two mixins are currently available:
PeriodMixin: works with time series and can analyze the data to get the time coverage or detect holesDownloadMixin. Appends a download endpoint to a remote database.
To add a mixin in a custom implementation, the subclass should declare both the
mixin and the FilesDatabase generic
class
from fcollections.core import PeriodMixin
class MyDatabase(FilesDatabase, PeriodMixin):
layouts = [layout, layout_flat]
reader = reader
fc = MyDatabase(path)
fc.time_coverage()
[2024-01-01T00:00:00.000000, 2024-01-01T06:00:00.000000]