Source code for pyinterp.fill

"""Replace undefined values."""
from __future__ import annotations

from typing import TYPE_CHECKING, TypeVar, overload

import numpy

from . import core, grid, interface
from .grid import NUM_DIMS_2
from .typing import NDArray2DFloat32, NDArray2DFloat64

if TYPE_CHECKING:
    from .typing import NDArray1D, NDArray2D, NDArray3D, NDArray4D

#: TypeVar for 2D float arrays to preserve type through function calls
_Float2DArray = TypeVar('_Float2DArray', NDArray2DFloat32, NDArray2DFloat64)


def _core_function(prefix: str, dtype: numpy.dtype) -> str:
    """Determine the core function name based on the data type."""
    if dtype == numpy.float32:
        return f'{prefix}_float32'
    elif dtype == numpy.float64:
        return f'{prefix}_float64'
    else:
        raise ValueError(f'Unsupported dtype {dtype}. '
                         'Must be float32 or float64.')


@overload
def loess(  # type: ignore[overload-overlap]
    mesh: grid.Grid4D,
    nx: int = 3,
    ny: int = 3,
    value_type: str | None = None,
    num_threads: int = 0,
) -> NDArray4D:
    ...


@overload
def loess(  # type: ignore[overload-overlap]
    mesh: grid.Grid3D,
    nx: int = 3,
    ny: int = 3,
    value_type: str | None = None,
    num_threads: int = 0,
) -> NDArray3D:
    ...


@overload
def loess(
    mesh: grid.Grid2D,
    nx: int = 3,
    ny: int = 3,
    value_type: str | None = None,
    num_threads: int = 0,
) -> NDArray2D:
    ...


[docs] def loess(mesh: grid.Grid2D | grid.Grid3D | grid.Grid4D, nx: int = 3, ny: int = 3, value_type: str | None = None, num_threads: int = 0) -> NDArray2D | NDArray3D | NDArray4D: """Filter values using a locally weighted regression function (LOESS). Apply LOESS filtering to fill or smooth grid values using the tri-cube weight function: :math:`w(x)=(1-|d|^3)^3`. Args: mesh: Grid function on a uniform 2-dimensional grid to be filled. nx: Number of points of the half-window to be taken into account along the X-axis. Defaults to ``3``. ny: Number of points of the half-window to be taken into account along the Y-axis. Defaults to ``3``. value_type: Type of values processed by the filter. Supported are ``undefined``, ``defined``, ``all``. Default to ``undefined``. num_threads: The number of threads to use for the computation. If 0 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. Defaults to ``0``. Returns: The grid will have NaN filled with extrapolated values. """ value_type = value_type or 'undefined' instance = mesh._instance function = interface._core_function('loess', instance) if value_type not in ['undefined', 'defined', 'all']: raise ValueError(f'value type {value_type!r} is not defined') return getattr(core.fill, function)(instance, nx, ny, getattr(core.fill.ValueType, value_type.capitalize()), num_threads)
[docs] def gauss_seidel( array: _Float2DArray, first_guess: str = 'zonal_average', is_circle: bool = False, max_iteration: int | None = None, epsilon: float = 1e-4, relaxation: float | None = None, num_threads: int = 0, in_place: bool = False, ) -> tuple[int, float, _Float2DArray]: r"""Replace all undefined values in a 2D grid using Gauss-Seidel method. Apply the Gauss-Seidel method by relaxation to fill all undefined values (NaN) in a 2D grid. Args: array: 2D array to be filled with shape (ny, nx). first_guess: Specifies the type of first guess grid. Supported values are: * ``zero`` means use ``0.0`` as an initial guess; * ``zonal_average`` means that zonal averages (i.e, averages in the X-axis direction) will be used. Defaults to ``zonal_average``. is_circle: If True, the X-axis is circular (e.g., longitude). Defaults to ``False``. max_iteration: Maximum number of iterations to be used by relaxation. The default value is equal to the product of the grid dimensions. epsilon: Tolerance for ending relaxation before the maximum number of iterations limit. Defaults to ``1e-4``. relaxation: Relaxation constant. If ``0 < relaxation < 1``, the new value is an average weighted by the old and the one given by the Gauss-Seidel scheme. In this case, convergence is slowed down (under-relaxation). Over-relaxation consists in choosing a value of ``relaxation`` strictly greater than 1. For the method to converge, it is necessary that: ``1 < relaxation < 2``. If this parameter is not set, the method will choose the optimal value that allows the convergence criterion to be achieved in :math:`O(N)` iterations, for a grid of size :math:`N_x=N_y=N`, ``relaxation`` = :math:`{2\over{1+{\pi\over{N}}}}`; if the grid is of size :math:`Nx \times Ny`, :math:`N = N_{x}N_{y}\sqrt{2\over{N_{x}^2+N_{y}^2}}` num_threads: The number of threads to use for the computation. If 0 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. Defaults to ``0``. in_place: If True, the input array is modified in place. If False, a copy is made and filled. Defaults to ``False``. Returns: A tuple containing the number of iterations performed, the final residual value, and the filled array. Note: This function operates on 2D grids only. For greater dimensional data, apply this function to each 2D slice independently: .. code-block:: python # For a 3D array with shape (ny, nx, nz) filled_3d = np.empty_like(data_3d) for iz in range(data_3d.shape[2]): iterations, residual, filled_3d[:, :, iz] = gauss_seidel( data_3d[:, :, iz], is_circle=True ) """ if first_guess not in ['zero', 'zonal_average']: raise ValueError(f'first_guess type {first_guess!r} is not defined') ny, nx = array.shape if relaxation is None: n = nx if nx == ny else nx * ny * numpy.sqrt(2 / (nx**2 + ny**2)) relaxation = 2 / (1 + numpy.pi / n) max_iteration = max_iteration or ny * nx first_guess_enum = getattr( core.fill.FirstGuess, ''.join(item.capitalize() for item in first_guess.split('_'))) filled = array if in_place else numpy.copy(array) callable = _core_function('gauss_seidel', array.dtype) iterations, residual = getattr(core.fill, callable)(filled, first_guess_enum, is_circle, max_iteration, epsilon, relaxation, num_threads) return iterations, residual, filled
[docs] def multi_grid( array: _Float2DArray, first_guess: str = 'zonal_average', is_circle: bool = False, max_iterations: int | None = None, epsilon: float = 1e-4, pre_smooth: int = 2, post_smooth: int = 2, num_threads: int = 0, in_place: bool = False, ) -> tuple[int, float, _Float2DArray]: """Replace undefined values (NaN) in a 2-D grid using the multigrid V-cycle. Args: array: The array to be processed. first_guess: Method to use for the first guess. Supported values are ``zero`` (use ``0.0``) and ``zonal_average`` (use zonal averages along the X axis). Defaults to ``zonal_average``. is_circle: True if the X axis of the array defines a circle. max_iterations: Maximum number of iterations to be used by the multigrid method. If None, defaults to the product of the array dimensions. epsilon: Tolerance for ending the multigrid method before reaching the maximum number of iterations. Defaults to ``1e-4``. pre_smooth: Number of smoothing iterations to perform before restriction. Defaults to ``2``. post_smooth: Number of smoothing iterations to perform after prolongation. Defaults to ``2``. num_threads: Number of threads to use for the computation. If 0 all CPUs are used. If 1 is given, no parallel computing code is used, which is useful for debugging. Defaults to ``0``. in_place: If True, the input array is modified in place. If False, a copy is made and filled. Defaults to ``False``. Returns: A tuple containing the number of iterations performed, the final residual value, and the filled array. Note: This function operates on 2D grids only. For greater dimensional data, apply this function to each 2D slice independently: .. code-block:: python # For a 3D array with shape (ny, nx, nz) filled_3d = np.empty_like(data_3d) for iz in range(data_3d.shape[2]): iterations, residual, filled_3d[:, :, iz] = multi_grid( data_3d[:, :, iz], is_circle=True ) """ if first_guess not in ['zero', 'zonal_average']: raise ValueError(f'first_guess type {first_guess!r} is not defined') max_iterations = max_iterations or 1000 first_guess_enum = getattr( core.fill.FirstGuess, ''.join(item.capitalize() for item in first_guess.split('_'))) filled = array if in_place else numpy.copy(array) callable = _core_function('multigrid', array.dtype) iterations, residual = getattr(core.fill, callable)(filled, first_guess_enum, is_circle, max_iterations, epsilon, pre_smooth, post_smooth, num_threads) return iterations, residual, filled
[docs] def fft_inpaint( array: _Float2DArray, first_guess: str = 'zonal_average', is_circle: bool = False, max_iterations: int | None = None, epsilon: float = 1e-4, sigma: float = 10.0, num_threads: int = 0, in_place: bool = False, ) -> tuple[int, float, _Float2DArray]: """Fill undefined values in a grid using spectral in-painting. Replace NaN values in a 2D array using a spectral in-painting approach. Uses a Fast Fourier Transform (FFT) for periodic boundaries (is_circle=True) or a Discrete Cosine Transform (DCT) for reflective boundaries (is_circle=False). A Gaussian low-pass filter (sigma) controls the smoothness of the fill. Args: array: The array to be processed first_guess: Method to use for the first guess. is_circle: If true, uses a Fast Fourier Transform (FFT) assuming periodic boundaries. If false, uses a Discrete Cosine Transform (DCT) assuming reflective boundaries. Defaults to ``False``. max_iterations: Maximum number of iterations. Defaults to ``500``. epsilon: Tolerance for ending relaxation. Defaults to ``1e-4``. sigma: Standard deviation of the Gaussian low-pass filter in pixel units. Controls the smoothness of the fill. Defaults to ``10.0``. num_threads: The number of threads to use for the computation. Defaults to ``0``. in_place: If True, the input array is modified in place. If False, a copy is made and filled. Defaults to ``False``. Returns: A tuple containing the number of iterations performed, the maximum residual value, and the filled array. Note: This function operates on 2D arrays only and requires a C contiguous array. If you have higher dimensional data, apply this function to each 2D slice independently: .. code-block:: python # For a 3D array with shape (ny, nx, nz) filled_3d = np.empty_like(data_3d) for iz in range(data_3d.shape[2]): c_contiguous_data = np.ascontiguousarray(data_3d[:, :, iz]) iterations, residual, filled_3d[:, :, iz] = fft_inpaint( c_contiguous_data, is_circle=True ) """ if first_guess not in ['zero', 'zonal_average']: raise ValueError(f'first_guess type {first_guess!r} is not defined') if max_iterations is None: max_iterations = array.shape[0] * array.shape[1] first_guess_enum = getattr( core.fill.FirstGuess, ''.join(item.capitalize() for item in first_guess.split('_'))) filled = array if in_place else numpy.copy(array) callable = _core_function('fft_inpaint', array.dtype) iterations, residual = getattr(core.fill, callable)( filled, # type: ignore[arg-type] first_guess_enum, is_circle, max_iterations, epsilon, sigma, num_threads, ) return iterations, residual, filled
def matrix(x: NDArray2DFloat32 | NDArray2DFloat64, fill_value: float | None = None, in_place: bool = True) -> NDArray2DFloat32 | NDArray2DFloat64: """Fill in the gaps between defined values in a 2-dimensional array. Args: x: data to be filled. fill_value: Value used to fill undefined values. Should be compatible with the dtype of x (float32 or float64). in_place: If true, the data is filled in place. Defaults to ``True``. Returns: The data filled. """ fill_value = fill_value or numpy.nan if len(x.shape) != NUM_DIMS_2: raise ValueError('x must be a 2-dimensional array') dtype = x.dtype if not in_place: x = numpy.copy(x) # type: ignore[assignment] if dtype == numpy.float32: core.fill.matrix_float32( x, # type: ignore[arg-type] fill_value, ) core.fill.matrix_float64( x, # type: ignore[arg-type] fill_value, ) return x def vector( x: NDArray1D, fill_value: numpy.generic | None = None, in_place: bool = True, ) -> NDArray1D: """Fill in the gaps between defined values in a 1-dimensional array. Args: x: data to be filled. fill_value: Value used to fill undefined values. The type should be compatible with the dtype of x (float for float32/float64, int for int64, datetime64 for datetime64, timedelta64 for timedelta64). in_place: If true, the data is filled in place. Defaults to ``True``. Returns: The data filled. """ if not isinstance(x, numpy.ndarray): raise ValueError('x must be a numpy.ndarray') if len(x.shape) != 1: raise ValueError('x must be a 1-dimensional array') dtype = x.dtype if not in_place: x = numpy.copy(x) if dtype == numpy.float32: core.fill.vector_float32( x, # type: ignore[arg-type] float('NaN') if fill_value is None else float(fill_value), ) elif dtype == numpy.float64: core.fill.vector_float64( x, # type: ignore[arg-type] float('NaN') if fill_value is None else float(fill_value), ) elif dtype == numpy.int64: core.fill.vector_int64( x, # type: ignore[arg-type] 2**63 - 1 if fill_value is None else int(fill_value), ) elif numpy.issubdtype(dtype, numpy.datetime64) or numpy.issubdtype( dtype, numpy.timedelta64): core.fill.vector_int64( x.view(int), # type: ignore[arg-type] numpy.datetime64(fill_value).view(int), # type: ignore[type-var] ) else: raise ValueError(f'unsupported data type {dtype}') return x