Source code for iris.fileformats.netcdf.loader

# Copyright Iris contributors
#
# This file is part of Iris and is released under the BSD license.
# See LICENSE in the root of the repository for full licensing details.
"""Support loading Iris cubes from NetCDF files using the CF conventions for metadata interpretation.

See : `NetCDF User's Guide <https://docs.unidata.ucar.edu/nug/current/>`_
and `netCDF4 python module <https://github.com/Unidata/netcdf4-python>`_.

Also : `CF Conventions <https://cfconventions.org/>`_.

"""

from collections.abc import Iterable, Iterator, Mapping
from contextlib import contextmanager
from copy import deepcopy
from enum import Enum, auto
import threading
import warnings

import numpy as np

from iris._lazy_data import as_lazy_data
from iris.aux_factory import (
    AtmosphereSigmaFactory,
    HybridHeightFactory,
    HybridPressureFactory,
    OceanSFactory,
    OceanSg1Factory,
    OceanSg2Factory,
    OceanSigmaFactory,
    OceanSigmaZFactory,
)
import iris.config
import iris.coord_systems
import iris.coords
import iris.fileformats.cf
from iris.fileformats.netcdf import _thread_safe_nc
from iris.fileformats.netcdf.saver import _CF_ATTRS
import iris.io
import iris.util
import iris.warnings

# Show actions activation statistics.
DEBUG = False

# Get the logger : shared logger for all in 'iris.fileformats.netcdf'.
from . import logger

# An expected part of the public loader API, but includes thread safety
#  concerns so is housed in _thread_safe_nc.
NetCDFDataProxy = _thread_safe_nc.NetCDFDataProxy


class _WarnComboIgnoringBoundsLoad(
    iris.warnings.IrisIgnoringBoundsWarning,
    iris.warnings.IrisLoadWarning,
):
    """One-off combination of warning classes - enhances user filtering."""

    pass


def _actions_engine():
    # Return an 'actions engine', which provides a pyke-rules-like interface to
    # the core cf translation code.
    # Deferred import to avoid circularity.
    import iris.fileformats._nc_load_rules.engine as nc_actions_engine

    engine = nc_actions_engine.Engine()
    return engine


def _assert_case_specific_facts(engine, cf, cf_group):
    # Initialise a data store for built cube elements.
    # This is used to patch element attributes *not* setup by the actions
    # process, after the actions code has run.
    engine.cube_parts["coordinates"] = []
    engine.cube_parts["cell_measures"] = []
    engine.cube_parts["ancillary_variables"] = []

    # Assert facts for CF coordinates.
    for cf_name in cf_group.coordinates.keys():
        engine.add_case_specific_fact("coordinate", (cf_name,))

    # Assert facts for CF auxiliary coordinates.
    for cf_name in cf_group.auxiliary_coordinates.keys():
        engine.add_case_specific_fact("auxiliary_coordinate", (cf_name,))

    # Assert facts for CF cell measures.
    for cf_name in cf_group.cell_measures.keys():
        engine.add_case_specific_fact("cell_measure", (cf_name,))

    # Assert facts for CF ancillary variables.
    for cf_name in cf_group.ancillary_variables.keys():
        engine.add_case_specific_fact("ancillary_variable", (cf_name,))

    # Assert facts for CF grid_mappings.
    for cf_name in cf_group.grid_mappings.keys():
        engine.add_case_specific_fact("grid_mapping", (cf_name,))

    # Assert facts for CF labels.
    for cf_name in cf_group.labels.keys():
        engine.add_case_specific_fact("label", (cf_name,))

    # Assert facts for CF formula terms associated with the cf_group
    # of the CF data variable.

    # Collect varnames of formula-root variables as we go.
    # NOTE: use dictionary keys as an 'OrderedSet'
    #   - see: https://stackoverflow.com/a/53657523/2615050
    # This is to ensure that we can handle the resulting facts in a definite
    # order, as using a 'set' led to indeterminate results.
    formula_root = {}
    for cf_var in cf.cf_group.formula_terms.values():
        for cf_root, cf_term in cf_var.cf_terms_by_root.items():
            # Only assert this fact if the formula root variable is
            # defined in the CF group of the CF data variable.
            if cf_root in cf_group:
                formula_root[cf_root] = True
                engine.add_case_specific_fact(
                    "formula_term",
                    (cf_var.cf_name, cf_root, cf_term),
                )

    for cf_root in formula_root.keys():
        engine.add_case_specific_fact("formula_root", (cf_root,))


def _actions_activation_stats(engine, cf_name):
    print("-" * 80)
    print("CF Data Variable: %r" % cf_name)

    engine.print_stats()

    print("Rules Triggered:")

    for rule in sorted(list(engine.rules_triggered)):
        print("\t%s" % rule)

    print("Case Specific Facts:")
    kb_facts = engine.get_kb()

    for key in kb_facts.entity_lists.keys():
        for arg in kb_facts.entity_lists[key].case_specific_facts:
            print("\t%s%s" % (key, arg))


def _set_attributes(attributes, key, value):
    """Set attributes dictionary, converting unicode strings appropriately."""
    if isinstance(value, str):
        try:
            attributes[str(key)] = str(value)
        except UnicodeEncodeError:
            attributes[str(key)] = value
    else:
        attributes[str(key)] = value


def _add_unused_attributes(iris_object, cf_var):
    """Populate the attributes of a cf element with the "unused" attributes.

    Populate the attributes of a cf element with the "unused" attributes
    from the associated CF-netCDF variable. That is, all those that aren't CF
    reserved terms.

    """

    def attribute_predicate(item):
        return item[0] not in _CF_ATTRS

    tmpvar = filter(attribute_predicate, cf_var.cf_attrs_unused())
    attrs_dict = iris_object.attributes
    if hasattr(attrs_dict, "locals"):
        # Treat cube attributes (i.e. a CubeAttrsDict) as a special case.
        # These attrs are "local" (i.e. on the variable), so record them as such.
        attrs_dict = attrs_dict.locals
    for attr_name, attr_value in tmpvar:
        _set_attributes(attrs_dict, attr_name, attr_value)


def _get_actual_dtype(cf_var):
    # Figure out what the eventual data type will be after any scale/offset
    # transforms.
    dummy_data = np.zeros(1, dtype=cf_var.dtype)
    if hasattr(cf_var, "scale_factor"):
        dummy_data = cf_var.scale_factor * dummy_data
    if hasattr(cf_var, "add_offset"):
        dummy_data = cf_var.add_offset + dummy_data
    return dummy_data.dtype


# An arbitrary variable array size, below which we will fetch real data from a variable
# rather than making a lazy array for deferred access.
# Set by experiment at roughly the point where it begins to save us memory, but actually
# mostly done for speed improvement.  See https://github.com/SciTools/iris/pull/5069
_LAZYVAR_MIN_BYTES = 5000


def _get_cf_var_data(cf_var, filename):
    """Get an array representing the data of a CF variable.

    This is typically a lazy array based around a NetCDFDataProxy, but if the variable
    is "sufficiently small", we instead fetch the data as a real (numpy) array.
    The latter is especially valuable for scalar coordinates, which are otherwise
    unnecessarily slow + wasteful of memory.

    """
    global CHUNK_CONTROL
    if hasattr(cf_var, "_data_array"):
        # The variable is not an actual netCDF4 file variable, but an emulating
        # object with an attached data array (either numpy or dask), which can be
        # returned immediately as-is.  This is used as a hook to translate data to/from
        # netcdf data container objects in other packages, such as xarray.
        # See https://github.com/SciTools/iris/issues/4994 "Xarray bridge".
        result = cf_var._data_array
    else:
        total_bytes = cf_var.size * cf_var.dtype.itemsize
        if total_bytes < _LAZYVAR_MIN_BYTES:
            # Don't make a lazy array, as it will cost more memory AND more time to access.
            # Instead fetch the data immediately, as a real array, and return that.
            result = cf_var[:]

        else:
            # Get lazy chunked data out of a cf variable.
            # Creates Dask wrappers around data arrays for any cube components which
            # can have lazy values, e.g. Cube, Coord, CellMeasure, AuxiliaryVariable.
            dtype = _get_actual_dtype(cf_var)

            # Make a data-proxy that mimics array access and can fetch from the file.
            fill_value = getattr(
                cf_var.cf_data,
                "_FillValue",
                _thread_safe_nc.default_fillvals[cf_var.dtype.str[1:]],
            )
            proxy = NetCDFDataProxy(
                cf_var.shape, dtype, filename, cf_var.cf_name, fill_value
            )
            # Get the chunking specified for the variable : this is either a shape, or
            # maybe the string "contiguous".
            if CHUNK_CONTROL.mode is ChunkControl.Modes.AS_DASK:
                result = as_lazy_data(proxy, meta=proxy.dask_meta, chunks="auto")
            else:
                chunks = cf_var.cf_data.chunking()
                if chunks is None:
                    # Occurs for non-version-4 netcdf
                    chunks = "contiguous"
                # In the "contiguous" case, pass chunks=None to 'as_lazy_data'.
                if chunks == "contiguous":
                    if (
                        CHUNK_CONTROL.mode is ChunkControl.Modes.FROM_FILE
                        and isinstance(cf_var, iris.fileformats.cf.CFDataVariable)
                    ):
                        raise KeyError(
                            f"{cf_var.cf_name} does not contain pre-existing chunk specifications."
                            f" Instead, you might wish to use CHUNK_CONTROL.set(), or just use default"
                            f" behaviour outside of a context manager. "
                        )
                    # Equivalent to chunks=None, but value required by chunking control
                    chunks = list(cf_var.shape)

                # Modify the chunking in the context of an active chunking control.
                # N.B. settings specific to this named var override global ('*') ones.
                dim_chunks = CHUNK_CONTROL.var_dim_chunksizes.get(
                    cf_var.cf_name
                ) or CHUNK_CONTROL.var_dim_chunksizes.get("*")
                dims = cf_var.cf_data.dimensions
                if CHUNK_CONTROL.mode is ChunkControl.Modes.FROM_FILE:
                    dims_fixed = np.ones(len(dims), dtype=bool)
                elif not dim_chunks:
                    dims_fixed = None
                else:
                    # Modify the chunks argument, and pass in a list of 'fixed' dims, for
                    # any of our dims which are controlled.
                    dims_fixed = np.zeros(len(dims), dtype=bool)
                    for i_dim, dim_name in enumerate(dims):
                        dim_chunksize = dim_chunks.get(dim_name)
                        if dim_chunksize:
                            if dim_chunksize == -1:
                                chunks[i_dim] = cf_var.shape[i_dim]
                            else:
                                chunks[i_dim] = dim_chunksize
                            dims_fixed[i_dim] = True
                if dims_fixed is None:
                    dims_fixed = [dims_fixed]
                result = as_lazy_data(
                    proxy,
                    meta=proxy.dask_meta,
                    chunks=chunks,
                    dims_fixed=tuple(dims_fixed),
                )
    return result


class _OrderedAddableList(list):
    """A custom container object for actions recording.

    Used purely in actions debugging, to accumulate a record of which actions
    were activated.

    It replaces a set, so as to preserve the ordering of operations, with
    possible repeats, and it also numbers the entries.

    The actions routines invoke an 'add' method, so this effectively replaces
    a set.add with a list.append.

    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._n_add = 0

    def add(self, msg):
        self._n_add += 1
        n_add = self._n_add
        self.append(f"#{n_add:03d} : {msg}")


def _load_cube(engine, cf, cf_var, filename):
    global CHUNK_CONTROL

    # Translate dimension chunk-settings specific to this cube (i.e. named by
    # it's data-var) into global ones, for the duration of this load.
    # Thus, by default, we will create any AuxCoords, CellMeasures et al with
    # any  per-dimension chunksizes specified for the cube.
    these_settings = CHUNK_CONTROL.var_dim_chunksizes.get(cf_var.cf_name, {})
    with CHUNK_CONTROL.set(**these_settings):
        return _load_cube_inner(engine, cf, cf_var, filename)


def _load_cube_inner(engine, cf, cf_var, filename):
    from iris.cube import Cube

    """Create the cube associated with the CF-netCDF data variable."""
    data = _get_cf_var_data(cf_var, filename)
    cube = Cube(data)

    # Reset the actions engine.
    engine.reset()

    # Initialise engine rule processing hooks.
    engine.cf_var = cf_var
    engine.cube = cube
    engine.cube_parts = {}
    engine.requires = {}
    engine.rules_triggered = _OrderedAddableList()
    engine.filename = filename

    # Assert all the case-specific facts.
    # This extracts 'facts' specific to this data-variable (aka cube), from
    # the info supplied in the CFGroup object.
    _assert_case_specific_facts(engine, cf, cf_var.cf_group)

    # Run the actions engine.
    # This creates various cube elements and attaches them to the cube.
    # It also records various other info on the engine, to be processed later.
    engine.activate()

    # Having run the rules, now add the "unused" attributes to each cf element.
    def fix_attributes_all_elements(role_name):
        elements_and_names = engine.cube_parts.get(role_name, [])

        for iris_object, cf_var_name in elements_and_names:
            _add_unused_attributes(iris_object, cf.cf_group[cf_var_name])

    # Populate the attributes of all coordinates, cell-measures and ancillary-vars.
    fix_attributes_all_elements("coordinates")
    fix_attributes_all_elements("ancillary_variables")
    fix_attributes_all_elements("cell_measures")

    # Also populate attributes of the top-level cube itself.
    _add_unused_attributes(cube, cf_var)

    # Work out reference names for all the coords.
    names = {
        coord.var_name: coord.standard_name or coord.var_name or "unknown"
        for coord in cube.coords()
    }

    # Add all the cube cell methods.
    cube.cell_methods = [
        iris.coords.CellMethod(
            method=method.method,
            intervals=method.intervals,
            comments=method.comments,
            coords=[
                names[coord_name] if coord_name in names else coord_name
                for coord_name in method.coord_names
            ],
        )
        for method in cube.cell_methods
    ]

    if DEBUG:
        # Show activation statistics for this data-var (i.e. cube).
        _actions_activation_stats(engine, cf_var.cf_name)

    return cube


def _load_aux_factory(engine, cube):
    """Convert any CF-netCDF dimensionless coordinate to an AuxCoordFactory."""
    formula_type = engine.requires.get("formula_type")
    if formula_type in [
        "atmosphere_sigma_coordinate",
        "atmosphere_hybrid_height_coordinate",
        "atmosphere_hybrid_sigma_pressure_coordinate",
        "ocean_sigma_z_coordinate",
        "ocean_sigma_coordinate",
        "ocean_s_coordinate",
        "ocean_s_coordinate_g1",
        "ocean_s_coordinate_g2",
    ]:

        def coord_from_term(term):
            # Convert term names to coordinates (via netCDF variable names).
            name = engine.requires["formula_terms"].get(term, None)
            if name is not None:
                for coord, cf_var_name in engine.cube_parts["coordinates"]:
                    if cf_var_name == name:
                        return coord
                warnings.warn(
                    "Unable to find coordinate for variable {!r}".format(name),
                    category=iris.warnings.IrisFactoryCoordNotFoundWarning,
                )

        if formula_type == "atmosphere_sigma_coordinate":
            pressure_at_top = coord_from_term("ptop")
            sigma = coord_from_term("sigma")
            surface_air_pressure = coord_from_term("ps")
            factory = AtmosphereSigmaFactory(
                pressure_at_top, sigma, surface_air_pressure
            )
        elif formula_type == "atmosphere_hybrid_height_coordinate":
            delta = coord_from_term("a")
            sigma = coord_from_term("b")
            orography = coord_from_term("orog")
            factory = HybridHeightFactory(delta, sigma, orography)
        elif formula_type == "atmosphere_hybrid_sigma_pressure_coordinate":
            # Hybrid pressure has two valid versions of its formula terms:
            # "p0: var1 a: var2 b: var3 ps: var4" or
            # "ap: var1 b: var2 ps: var3" where "ap = p0 * a"
            # Attempt to get the "ap" term.
            delta = coord_from_term("ap")
            if delta is None:
                # The "ap" term is unavailable, so try getting terms "p0"
                # and "a" terms in order to derive an "ap" equivalent term.
                coord_p0 = coord_from_term("p0")
                if coord_p0 is not None:
                    if coord_p0.shape != (1,):
                        msg = (
                            "Expecting {!r} to be a scalar reference "
                            "pressure coordinate, got shape {!r}".format(
                                coord_p0.var_name, coord_p0.shape
                            )
                        )
                        raise ValueError(msg)
                    if coord_p0.has_bounds():
                        msg = (
                            "Ignoring atmosphere hybrid sigma pressure "
                            "scalar coordinate {!r} bounds.".format(coord_p0.name())
                        )
                        warnings.warn(
                            msg,
                            category=_WarnComboIgnoringBoundsLoad,
                        )
                    coord_a = coord_from_term("a")
                    if coord_a is not None:
                        if coord_a.units.is_unknown():
                            # Be graceful, and promote unknown to dimensionless units.
                            coord_a.units = "1"
                        delta = coord_a * coord_p0.points[0]
                        delta.units = coord_a.units * coord_p0.units
                        delta.rename("vertical pressure")
                        delta.var_name = "ap"
                        cube.add_aux_coord(delta, cube.coord_dims(coord_a))

            sigma = coord_from_term("b")
            surface_air_pressure = coord_from_term("ps")
            factory = HybridPressureFactory(delta, sigma, surface_air_pressure)
        elif formula_type == "ocean_sigma_z_coordinate":
            sigma = coord_from_term("sigma")
            eta = coord_from_term("eta")
            depth = coord_from_term("depth")
            depth_c = coord_from_term("depth_c")
            nsigma = coord_from_term("nsigma")
            zlev = coord_from_term("zlev")
            factory = OceanSigmaZFactory(sigma, eta, depth, depth_c, nsigma, zlev)
        elif formula_type == "ocean_sigma_coordinate":
            sigma = coord_from_term("sigma")
            eta = coord_from_term("eta")
            depth = coord_from_term("depth")
            factory = OceanSigmaFactory(sigma, eta, depth)
        elif formula_type == "ocean_s_coordinate":
            s = coord_from_term("s")
            eta = coord_from_term("eta")
            depth = coord_from_term("depth")
            a = coord_from_term("a")
            depth_c = coord_from_term("depth_c")
            b = coord_from_term("b")
            factory = OceanSFactory(s, eta, depth, a, b, depth_c)
        elif formula_type == "ocean_s_coordinate_g1":
            s = coord_from_term("s")
            c = coord_from_term("c")
            eta = coord_from_term("eta")
            depth = coord_from_term("depth")
            depth_c = coord_from_term("depth_c")
            factory = OceanSg1Factory(s, c, eta, depth, depth_c)
        elif formula_type == "ocean_s_coordinate_g2":
            s = coord_from_term("s")
            c = coord_from_term("c")
            eta = coord_from_term("eta")
            depth = coord_from_term("depth")
            depth_c = coord_from_term("depth_c")
            factory = OceanSg2Factory(s, c, eta, depth, depth_c)
        cube.add_aux_factory(factory)


def _translate_constraints_to_var_callback(constraints):
    """Translate load constraints into a simple data-var filter function, if possible.

    Returns
    -------
    bool or None

    Notes
    -----
    For now, ONLY handles a single NameConstraint with no 'STASH' component.

    """
    import iris._constraints

    constraints = iris._constraints.list_of_constraints(constraints)
    result = None
    if len(constraints) == 1:
        (constraint,) = constraints
        if (
            isinstance(constraint, iris._constraints.NameConstraint)
            and constraint.STASH == "none"
        ):
            # As long as it doesn't use a STASH match, then we can treat it as
            # a testing against name properties of cf_var.
            # That's just like testing against name properties of a cube, except that they may not all exist.
            def inner(cf_datavar):
                match = True
                for name in constraint._names:
                    expected = getattr(constraint, name)
                    if name != "STASH" and expected != "none":
                        attr_name = "cf_name" if name == "var_name" else name
                        # Fetch property : N.B. CFVariable caches the property values
                        # The use of a default here is the only difference from the code in NameConstraint.
                        if not hasattr(cf_datavar, attr_name):
                            continue
                        actual = getattr(cf_datavar, attr_name, "")
                        if actual != expected:
                            match = False
                            break
                return match

            result = inner
    return result


[docs] def load_cubes(file_sources, callback=None, constraints=None): """Load cubes from a list of NetCDF filenames/OPeNDAP URLs. Parameters ---------- file_sources : str or list One or more NetCDF filenames/OPeNDAP URLs to load from. OR open datasets. callback : function, optional Function which can be passed on to :func:`iris.io.run_callback`. constraints : optional Returns ------- Generator of loaded NetCDF :class:`iris.cube.Cube`. """ # Deferred import to avoid circular imports. from iris.fileformats.cf import CFReader from iris.io import run_callback from .ugrid_load import ( _build_mesh_coords, _meshes_from_cf, ) # Create a low-level data-var filter from the original load constraints, if they are suitable. var_callback = _translate_constraints_to_var_callback(constraints) # Create an actions engine. engine = _actions_engine() if isinstance(file_sources, str) or not isinstance(file_sources, Iterable): file_sources = [file_sources] for file_source in file_sources: # Ingest the file. At present may be a filepath or an open netCDF4.Dataset. with CFReader(file_source) as cf: meshes = _meshes_from_cf(cf) # Process each CF data variable. data_variables = list(cf.cf_group.data_variables.values()) + list( cf.cf_group.promoted.values() ) for cf_var in data_variables: if var_callback and not var_callback(cf_var): # Deliver only selected results. continue # cf_var-specific mesh handling, if a mesh is present. # Build the mesh_coords *before* loading the cube - avoids # mesh-related attributes being picked up by # _add_unused_attributes(). mesh_name = None mesh = None mesh_coords, mesh_dim = [], None mesh_name = getattr(cf_var, "mesh", None) if mesh_name is not None: try: mesh = meshes[mesh_name] except KeyError: message = ( f"File does not contain mesh: '{mesh_name}' - " f"referenced by variable: '{cf_var.cf_name}' ." ) logger.debug(message) if mesh is not None: mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var) cube = _load_cube(engine, cf, cf_var, cf.filename) # Attach the mesh (if present) to the cube. for mesh_coord in mesh_coords: cube.add_aux_coord(mesh_coord, mesh_dim) # Process any associated formula terms and attach # the corresponding AuxCoordFactory. try: _load_aux_factory(engine, cube) except ValueError as e: warnings.warn( "{}".format(e), category=iris.warnings.IrisLoadWarning, ) # Perform any user registered callback function. cube = run_callback(callback, cube, cf_var, file_source) # Callback mechanism may return None, which must not be yielded if cube is None: continue yield cube
[docs] class ChunkControl(threading.local): """Provide user control of Chunk Control."""
[docs] class Modes(Enum): """Modes Enums.""" DEFAULT = auto() FROM_FILE = auto() AS_DASK = auto()
def __init__(self, var_dim_chunksizes=None): """Provide user control of Dask chunking. The NetCDF loader is controlled by the single instance of this: the :data:`~iris.fileformats.netcdf.loader.CHUNK_CONTROL` object. A chunk size can be set for a specific (named) file dimension, when loading specific (named) variables, or for all variables. When a selected variable is a CF data-variable, which loads as a :class:`~iris.cube.Cube`, then the given dimension chunk size is *also* fixed for all variables which are components of that :class:`~iris.cube.Cube`, i.e. any :class:`~iris.coords.Coord`, :class:`~iris.coords.CellMeasure`, :class:`~iris.coords.AncillaryVariable` etc. This can be overridden, if required, by variable-specific settings. For this purpose, :class:`~iris.mesh.MeshCoord` and :class:`~iris.mesh.Connectivity` are not :class:`~iris.cube.Cube` components, and chunk control on a :class:`~iris.cube.Cube` data-variable will not affect them. """ self.var_dim_chunksizes = var_dim_chunksizes or {} self.mode = self.Modes.DEFAULT
[docs] @contextmanager def set( self, var_names: str | Iterable[str] | None = None, **dimension_chunksizes: Mapping[str, int], ) -> Iterator[None]: r"""Control the Dask chunk sizes applied to NetCDF variables during loading. Parameters ---------- var_names : str or list of str, default=None Apply the `dimension_chunksizes` controls only to these variables, or when building :class:`~iris.cube.Cube` from these data variables. If ``None``, settings apply to all loaded variables. **dimension_chunksizes : dict of {str: int} Kwargs specifying chunksizes for dimensions of file variables. Each key-value pair defines a chunk size for a named file dimension, e.g. ``{'time': 10, 'model_levels':1}``. Values of ``-1`` will lock the chunk size to the full size of that dimension. Notes ----- This function acts as a context manager, for use in a ``with`` block. >>> import iris >>> from iris.fileformats.netcdf.loader import CHUNK_CONTROL >>> with CHUNK_CONTROL.set("air_temperature", time=180, latitude=-1): ... cube = iris.load(iris.sample_data_path("E1_north_america.nc"))[0] When `var_names` is present, the chunk size adjustments are applied only to the selected variables. However, for a CF data variable, this extends to all components of the (raw) :class:`~iris.cube.Cube` created from it. **Un**-adjusted dimensions have chunk sizes set in the 'usual' way. That is, according to the normal behaviour of :func:`iris._lazy_data.as_lazy_data`, which is: chunk size is based on the file variable chunking, or full variable shape; this is scaled up or down by integer factors to best match the Dask default chunk size, i.e. the setting configured by ``dask.config.set({'array.chunk-size': '250MiB'})``. """ old_mode = self.mode old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes) if var_names is None: var_names = ["*"] elif isinstance(var_names, str): var_names = [var_names] try: for var_name in var_names: # Note: here we simply treat '*' as another name. # A specific name match should override a '*' setting, but # that is implemented elsewhere. if not isinstance(var_name, str): msg = ( # type: ignore[unreachable] "'var_names' should be an iterable of strings, " f"not {var_names!r}." ) raise ValueError(msg) dim_chunks = self.var_dim_chunksizes.setdefault(var_name, {}) for dim_name, chunksize in dimension_chunksizes.items(): if not (isinstance(dim_name, str) and isinstance(chunksize, int)): msg = ( "'dimension_chunksizes' kwargs should be a dict " f"of `str: int` pairs, not {dimension_chunksizes!r}." ) raise ValueError(msg) dim_chunks[dim_name] = chunksize yield finally: self.var_dim_chunksizes = old_var_dim_chunksizes self.mode = old_mode
[docs] @contextmanager def from_file(self) -> Iterator[None]: r"""Ensure the chunk sizes are loaded in from NetCDF file variables. Raises ------ KeyError If any NetCDF data variables - those that become :class:`~iris.cube.Cube` - do not specify chunk sizes. Notes ----- This function acts as a context manager, for use in a ``with`` block. """ old_mode = self.mode old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes) try: self.mode = self.Modes.FROM_FILE yield finally: self.mode = old_mode self.var_dim_chunksizes = old_var_dim_chunksizes
[docs] @contextmanager def as_dask(self) -> Iterator[None]: """Rely on Dask :external+dask:doc:`array` to control chunk sizes. Notes ----- This function acts as a context manager, for use in a ``with`` block. """ old_mode = self.mode old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes) try: self.mode = self.Modes.AS_DASK yield finally: self.mode = old_mode self.var_dim_chunksizes = old_var_dim_chunksizes
# Note: the CHUNK_CONTROL object controls chunk sizing in the # :meth:`_get_cf_var_data` method. # N.B. :meth:`_load_cube` also modifies this when loading each cube, # introducing an additional context in which any cube-specific settings are # 'promoted' into being global ones. #: The global :class:`ChunkControl` object providing user-control of Dask chunking #: when Iris loads NetCDF files. CHUNK_CONTROL: ChunkControl = ChunkControl()