Source code for iris._constraints

# Copyright Iris contributors
#
# This file is part of Iris and is released under the BSD license.
# See LICENSE in the root of the repository for full licensing details.
"""Provide objects for building up expressions useful for pattern matching."""

from collections.abc import Iterable, Mapping
import operator

import numpy as np

import iris.exceptions


[docs] class Constraint: """Cubes can be pattern matched and filtered according to specific criteria. Constraints are the mechanism by which cubes can be pattern matched and filtered according to specific criteria. Once a constraint has been defined, it can be applied to cubes using the :meth:`Constraint.extract` method. """ def __init__(self, name=None, cube_func=None, coord_values=None, **kwargs): """Use for filtering cube loading or cube list extraction. Creates a new instance of a Constraint which can be used for filtering cube loading or cube list extraction. Parameters ---------- name : str or None, optional If a string, it is used as the name to match against the :attr:`iris.cube.Cube.names` property. cube_func : callable or None, optional If a callable, it must accept a Cube as its first and only argument and return either True or False. coord_values : dict or None, optional If a dict, it must map coordinate name to the condition on the associated coordinate. ***kwargs : dict, optional The remaining keyword arguments are converted to coordinate constraints. The name of the argument gives the name of a coordinate, and the value of the argument is the condition to meet on that coordinate:: Constraint(model_level_number=10) Coordinate level constraints can be of several types: * **string, int or float** - the value of the coordinate to match. e.g. ``model_level_number=10`` * **list of values** - the possible values that the coordinate may have to match. e.g. ``model_level_number=[10, 12]`` * **callable** - a function which accepts a :class:`iris.coords.Cell` instance as its first and only argument returning True or False if the value of the Cell is desired. e.g. ``model_level_number=lambda cell: 5 < cell < 10`` Examples -------- The :ref:`user guide <loading_iris_cubes>` covers cube much of constraining in detail, however an example which uses all of the features of this class is given here for completeness:: Constraint(name='air_potential_temperature', cube_func=lambda cube: cube.units == 'kelvin', coord_values={'latitude':lambda cell: 0 < cell < 90}, model_level_number=[10, 12]) & Constraint(ensemble_member=2) .. note:: Whilst ``&`` is supported, the ``|`` that might reasonably be expected is not. This is because each constraint describes a boxlike region, and thus the intersection of these constraints (obtained with ``&``) will also describe a boxlike region. Allowing the union of two constraints (with the ``|`` symbol) would allow the description of a non-boxlike region. These are difficult to describe with cubes and so it would be ambiguous what should be extracted. To generate multiple cubes, each constrained to a different range of the same coordinate, use :py:func:`iris.load_cubes` or :py:func:`iris.cube.CubeList.extract_cubes`. A cube can be constrained to multiple ranges within the same coordinate using something like the following constraint:: def latitude_bands(cell): return (0 < cell < 30) or (60 < cell < 90) Constraint(cube_func=latitude_bands) Constraint filtering is performed at the cell level. For further details on how cell comparisons are performed see :class:`iris.coords.Cell`. """ if not (name is None or isinstance(name, str)): raise TypeError("name must be None or string, got %r" % name) if not (cube_func is None or callable(cube_func)): raise TypeError("cube_func must be None or callable, got %r" % cube_func) if not (coord_values is None or isinstance(coord_values, Mapping)): raise TypeError( "coord_values must be None or a " "collections.Mapping, got %r" % coord_values ) coord_values = coord_values or {} duplicate_keys = set(coord_values.keys()) & set(kwargs.keys()) if duplicate_keys: raise ValueError( "Duplicate coordinate conditions specified for: " "%s" % list(duplicate_keys) ) self._name = name self._cube_func = cube_func self._coord_values = coord_values.copy() self._coord_values.update(kwargs) self._coord_constraints = [] for coord_name, coord_thing in self._coord_values.items(): self._coord_constraints.append(_CoordConstraint(coord_name, coord_thing)) def __eq__(self, other): # Equivalence is defined, but is naturally limited for any Constraints # based on callables, i.e. "cube_func", or value functions for # attributes/names/coords : These can only be == if they contain the # *same* callable object (i.e. same object identity). eq = ( isinstance(other, Constraint) and self._name == other._name and self._cube_func == other._cube_func and self._coord_constraints == other._coord_constraints ) # NOTE: theoretically, you could compare coord constraints as a *set*, # as order should not affect matching. # Not totally sure, so for now let's not. return eq def __hash__(self): # We want constraints to have hashes, so they can act as e.g. # dictionary keys or tuple elements. # So, we *must* provide this, as overloading '__eq__' automatically # disables it. # Just use basic object identity. return id(self) def __repr__(self): args = [] if self._name: args.append(("name", self._name)) if self._cube_func: args.append(("cube_func", self._cube_func)) if self._coord_values: args.append(("coord_values", self._coord_values)) return "Constraint(%s)" % ", ".join("%s=%r" % (k, v) for k, v in args) def _coordless_match(self, cube): """Return whether this constraint matches the given cube. Return whether this constraint matches the given cube when not taking coordinates into account. """ match = True if self._name: # Require to also check against cube.name() for the fallback # "unknown" default case, when there is no name metadata available. match = self._name in cube._names or self._name == cube.name() if match and self._cube_func: match = self._cube_func(cube) return match
[docs] def extract(self, cube): """Return the subset of the given cube which matches this constraint. Return the subset of the given cube which matches this constraint, else return None. """ resultant_CIM = self._CIM_extract(cube) slice_tuple = resultant_CIM.as_slice() result = None if slice_tuple is not None: # Slicing the cube is an expensive operation. if all([item == slice(None) for item in slice_tuple]): # Don't perform a full slice, just return the cube. result = cube else: # Performing the partial slice. result = cube[slice_tuple] return result
def _CIM_extract(self, cube): # Returns _ColumnIndexManager # Cater for scalar cubes by setting the dimensionality to 1 # when cube.ndim is 0. resultant_CIM = _ColumnIndexManager(cube.ndim or 1) if not self._coordless_match(cube): resultant_CIM.all_false() else: for coord_constraint in self._coord_constraints: resultant_CIM = resultant_CIM & coord_constraint.extract(cube) return resultant_CIM def __and__(self, other): return ConstraintCombination(self, other, operator.__and__) def __rand__(self, other): return ConstraintCombination(other, self, operator.__and__)
class ConstraintCombination(Constraint): """Represents the binary combination of two Constraint instances.""" def __init__(self, lhs, rhs, operator): """Instance created by providing two Constraint instances. Instance created by providing two Constraint instances and the appropriate :mod:`operator`. """ try: lhs_constraint = as_constraint(lhs) rhs_constraint = as_constraint(rhs) except TypeError: raise TypeError( "Can only combine Constraint instances, " "got: %s and %s" % (type(lhs), type(rhs)) ) self.lhs = lhs_constraint self.rhs = rhs_constraint self.operator = operator def __eq__(self, other): eq = ( isinstance(other, ConstraintCombination) and self.lhs == other.lhs and self.rhs == other.rhs and self.operator == other.operator ) return eq def __hash__(self): # Must re-define if you overload __eq__ : Use object identity. return id(self) def _coordless_match(self, cube): return self.operator( self.lhs._coordless_match(cube), self.rhs._coordless_match(cube) ) def __repr__(self): return "ConstraintCombination(%r, %r, %r)" % ( self.lhs, self.rhs, self.operator, ) def _CIM_extract(self, cube): return self.operator(self.lhs._CIM_extract(cube), self.rhs._CIM_extract(cube)) class _CoordConstraint: """Represents the atomic elements which might build up a Constraint.""" def __init__(self, coord_name, coord_thing): """Create a coordinate constraint. Create a coordinate constraint given the coordinate name and a thing to compare it with. Parameters ---------- coord_name : str The name of the coordinate to constrain. coord_thing : The object to compare. """ self.coord_name = coord_name self._coord_thing = coord_thing def __repr__(self): return "_CoordConstraint(%r, %r)" % ( self.coord_name, self._coord_thing, ) def __eq__(self, other): eq = ( isinstance(other, _CoordConstraint) and self.coord_name == other.coord_name and self._coord_thing == other._coord_thing ) return eq def __hash__(self): # Must re-define if you overload __eq__ : Use object identity. return id(self) def extract(self, cube): """Return the column based indices of the cube which match the constraint.""" from iris.coords import Cell, DimCoord # Cater for scalar cubes by setting the dimensionality to 1 # when cube.ndim is 0. cube_cim = _ColumnIndexManager(cube.ndim or 1) try: coord = cube.coord(self.coord_name) except iris.exceptions.CoordinateNotFoundError: cube_cim.all_false() return cube_cim dims = cube.coord_dims(coord) if len(dims) > 1: msg = "Cannot apply constraints to multidimensional coordinates" raise iris.exceptions.CoordinateMultiDimError(msg) try_quick = False if callable(self._coord_thing): call_func = self._coord_thing elif isinstance(self._coord_thing, Iterable) and not isinstance( self._coord_thing, (str, Cell) ): desired_values = list(self._coord_thing) # A dramatic speedup can be had if we don't have bounds. if coord.has_bounds(): def call_func(cell): return cell in desired_values else: def call_func(cell): return cell.point in desired_values else: def call_func(c): return c == self._coord_thing try_quick = isinstance(coord, DimCoord) and not isinstance( self._coord_thing, Cell ) # Simple, yet dramatic, optimisation for the monotonic case. if try_quick: try: i = coord.nearest_neighbour_index(self._coord_thing) except TypeError: try_quick = False if try_quick: r = np.zeros(coord.shape, dtype=np.bool_) if coord.cell(i) == self._coord_thing: r[i] = True else: r = np.array([call_func(cell) for cell in coord.cells()]) if dims: cube_cim[dims[0]] = r elif not all(r): cube_cim.all_false() return cube_cim class _ColumnIndexManager: """Represent column aligned slices which can be operated on. Represent column aligned slices which can be operated on using ``&``, ``|`` or ``^``. :: # 4 Dimensional slices import numpy as np cim = _ColumnIndexManager(4) cim[1] = np.array([3, 4, 5]) > 3 print(cim.as_slice()) """ def __init__(self, ndims): """_ColumnIndexManager always created to span the given number of dimensions.""" self._column_arrays = [True] * ndims self.ndims = ndims def __and__(self, other): return self._bitwise_operator(other, operator.__and__) def __or__(self, other): return self._bitwise_operator(other, operator.__or__) def __xor__(self, other): return self._bitwise_operator(other, operator.__xor__) def _bitwise_operator(self, other, operator): if not isinstance(other, _ColumnIndexManager): return NotImplemented if self.ndims != other.ndims: raise ValueError( "Cannot do %s for %r and %r as they have a " "different number of dimensions." % operator ) r = _ColumnIndexManager(self.ndims) # iterate over each dimension an combine appropriately for i, (lhs, rhs) in enumerate(zip(self, other)): r[i] = operator(lhs, rhs) return r def all_false(self): """Turn all slices into False.""" for i in range(self.ndims): self[i] = False def __getitem__(self, key): return self._column_arrays[key] def __setitem__(self, key, value): is_vector = isinstance(value, np.ndarray) and value.ndim == 1 if is_vector or isinstance(value, bool): self._column_arrays[key] = value else: raise TypeError( "Expecting value to be a 1 dimensional numpy array" ", or a boolean. Got %s" % (type(value)) ) def as_slice(self): """Turn a _ColumnIndexManager into a tuple. Turn a _ColumnIndexManager into a tuple which can be used in an indexing operation. If no index is possible, None will be returned. """ result = [None] * self.ndims for dim, dimension_array in enumerate(self): # If dimension_array has not been set, span the entire dimension if isinstance(dimension_array, np.ndarray): where_true = np.where(dimension_array)[0] # If the array had no True values in it, then the dimension # is equivalent to False if len(where_true) == 0: result = None break # If there was exactly one match, the key should be an integer if where_true.shape == (1,): result[dim] = where_true[0] else: # Finally, we can either provide a slice if possible, # or a tuple of indices which match. In order to determine # if we can provide a slice, calculate the deltas between # the indices and check if they are the same. delta = np.diff(where_true, axis=0) # if the diff is consistent we can create a slice object if all(delta[0] == delta): result[dim] = slice(where_true[0], where_true[-1] + 1, delta[0]) else: # otherwise, key is a tuple result[dim] = tuple(where_true) # Handle the case where dimension_array is a boolean elif dimension_array: result[dim] = slice(None, None) else: result = None break if result is None: return result else: return tuple(result) def list_of_constraints(constraints): """Turn constraints into list of valid constraints using :func:`as_constraint`.""" if isinstance(constraints, str) or not isinstance(constraints, Iterable): constraints = [constraints] return [as_constraint(constraint) for constraint in constraints] def as_constraint(thing: Constraint | str | None) -> Constraint: """Cast an object into a cube constraint where possible. Cast an object into a cube constraint where possible, otherwise a TypeError will be raised. If the given object is already a valid constraint then the given object will be returned, else a TypeError will be raised. """ if isinstance(thing, Constraint): return thing elif thing is None: return Constraint() elif isinstance(thing, str): return Constraint(thing) else: raise TypeError("%r cannot be cast to a constraint." % thing)
[docs] class AttributeConstraint(Constraint): """Provides a simple Cube-attribute based :class:`Constraint`.""" def __init__(self, **attributes): """Provide a simple Cube-attribute based :class:`Constraint`. Example usage:: iris.AttributeConstraint(STASH='m01s16i004') iris.AttributeConstraint( STASH=lambda stash: str(stash).endswith('i005')) .. note:: Attribute constraint names are case sensitive. """ self._attributes = attributes super().__init__(cube_func=self._cube_func) def __eq__(self, other): eq = ( isinstance(other, AttributeConstraint) and self._attributes == other._attributes ) return eq def __hash__(self): # Must re-define if you overload __eq__ : Use object identity. return id(self) def _cube_func(self, cube): match = True for name, value in self._attributes.items(): if name in cube.attributes: cube_attr = cube.attributes.get(name) # if we have a callable, then call it with the value, # otherwise, assert equality if callable(value): if not value(cube_attr): match = False break else: if cube_attr != value: match = False break else: match = False break return match def __repr__(self): return "AttributeConstraint(%r)" % self._attributes
[docs] class NameConstraint(Constraint): """Provide a simple Cube name based :class:`Constraint`.""" def __init__( self, standard_name="none", long_name="none", var_name="none", STASH="none", ): """Provide a simple Cube name based :class:`Constraint`. Provide a simple Cube name based :class:`Constraint`, which matches against each of the names provided, which may be either standard name, long name, NetCDF variable name and/or the STASH from the attributes dictionary. The name constraint will only succeed if *all* of the provided names match. Parameters ---------- standard_name : optional A string or callable representing the standard name to match against. long_name : optional A string or callable representing the long name to match against. var_name : optional A string or callable representing the NetCDF variable name to match against. STASH : optional A string or callable representing the UM STASH code to match against. Notes ----- The default value of each of the keyword arguments is the string "none", rather than the singleton None, as None may be a legitimate value to be matched against e.g., to constrain against all cubes where the standard_name is not set, then use standard_name=None. Returns ------- bool Examples -------- Example usage:: iris.NameConstraint(long_name='air temp', var_name=None) iris.NameConstraint(long_name=lambda name: 'temp' in name) iris.NameConstraint(standard_name='air_temperature', STASH=lambda stash: stash.item == 203) """ self.standard_name = standard_name self.long_name = long_name self.var_name = var_name self.STASH = STASH self._names = ("standard_name", "long_name", "var_name", "STASH") super().__init__(cube_func=self._cube_func) def __eq__(self, other): eq = isinstance(other, NameConstraint) and all( getattr(self, attname) == getattr(other, attname) for attname in self._names ) return eq def __hash__(self): # Must re-define if you overload __eq__ : Use object identity. return id(self) def _cube_func(self, cube): def matcher(target, value): if callable(value): result = False if target is not None: # # Don't pass None through into the callable. Users should # use the "name=None" pattern instead. Otherwise, users # will need to explicitly handle the None case, which is # unnecessary and pretty darn ugly e.g., # # lambda name: name is not None and name.startswith('ick') # result = value(target) else: result = value == target return result match = True for name in self._names: expected = getattr(self, name) if expected != "none": if name == "STASH": actual = cube.attributes.get(name) else: actual = getattr(cube, name) match = matcher(actual, expected) # Make this is a short-circuit match. if match is False: break return match def __repr__(self): names = [] for name in self._names: value = getattr(self, name) if value != "none": names.append("{}={!r}".format(name, value)) return "{}({})".format(self.__class__.__name__, ", ".join(names))