# Copyright Iris contributors
#
# This file is part of Iris and is released under the BSD license.
# See LICENSE in the root of the repository for full licensing details.
"""Support loading Iris cubes from NetCDF files using the CF conventions for metadata interpretation.
See : `NetCDF User's Guide <https://docs.unidata.ucar.edu/nug/current/>`_
and `netCDF4 python module <https://github.com/Unidata/netcdf4-python>`_.
Also : `CF Conventions <https://cfconventions.org/>`_.
"""
from collections.abc import Iterable, Iterator, Mapping
from contextlib import contextmanager
from copy import deepcopy
from enum import Enum, auto
import threading
import warnings
import numpy as np
from iris._lazy_data import as_lazy_data
from iris.aux_factory import (
AtmosphereSigmaFactory,
HybridHeightFactory,
HybridPressureFactory,
OceanSFactory,
OceanSg1Factory,
OceanSg2Factory,
OceanSigmaFactory,
OceanSigmaZFactory,
)
import iris.config
import iris.coord_systems
import iris.coords
import iris.fileformats.cf
from iris.fileformats.netcdf import _thread_safe_nc
from iris.fileformats.netcdf.saver import _CF_ATTRS
import iris.io
import iris.util
import iris.warnings
# Show actions activation statistics.
DEBUG = False
# Get the logger : shared logger for all in 'iris.fileformats.netcdf'.
from . import logger
# An expected part of the public loader API, but includes thread safety
# concerns so is housed in _thread_safe_nc.
NetCDFDataProxy = _thread_safe_nc.NetCDFDataProxy
class _WarnComboIgnoringBoundsLoad(
iris.warnings.IrisIgnoringBoundsWarning,
iris.warnings.IrisLoadWarning,
):
"""One-off combination of warning classes - enhances user filtering."""
pass
def _actions_engine():
# Return an 'actions engine', which provides a pyke-rules-like interface to
# the core cf translation code.
# Deferred import to avoid circularity.
import iris.fileformats._nc_load_rules.engine as nc_actions_engine
engine = nc_actions_engine.Engine()
return engine
def _assert_case_specific_facts(engine, cf, cf_group):
# Initialise a data store for built cube elements.
# This is used to patch element attributes *not* setup by the actions
# process, after the actions code has run.
engine.cube_parts["coordinates"] = []
engine.cube_parts["cell_measures"] = []
engine.cube_parts["ancillary_variables"] = []
# Assert facts for CF coordinates.
for cf_name in cf_group.coordinates.keys():
engine.add_case_specific_fact("coordinate", (cf_name,))
# Assert facts for CF auxiliary coordinates.
for cf_name in cf_group.auxiliary_coordinates.keys():
engine.add_case_specific_fact("auxiliary_coordinate", (cf_name,))
# Assert facts for CF cell measures.
for cf_name in cf_group.cell_measures.keys():
engine.add_case_specific_fact("cell_measure", (cf_name,))
# Assert facts for CF ancillary variables.
for cf_name in cf_group.ancillary_variables.keys():
engine.add_case_specific_fact("ancillary_variable", (cf_name,))
# Assert facts for CF grid_mappings.
for cf_name in cf_group.grid_mappings.keys():
engine.add_case_specific_fact("grid_mapping", (cf_name,))
# Assert facts for CF labels.
for cf_name in cf_group.labels.keys():
engine.add_case_specific_fact("label", (cf_name,))
# Assert facts for CF formula terms associated with the cf_group
# of the CF data variable.
# Collect varnames of formula-root variables as we go.
# NOTE: use dictionary keys as an 'OrderedSet'
# - see: https://stackoverflow.com/a/53657523/2615050
# This is to ensure that we can handle the resulting facts in a definite
# order, as using a 'set' led to indeterminate results.
formula_root = {}
for cf_var in cf.cf_group.formula_terms.values():
for cf_root, cf_term in cf_var.cf_terms_by_root.items():
# Only assert this fact if the formula root variable is
# defined in the CF group of the CF data variable.
if cf_root in cf_group:
formula_root[cf_root] = True
engine.add_case_specific_fact(
"formula_term",
(cf_var.cf_name, cf_root, cf_term),
)
for cf_root in formula_root.keys():
engine.add_case_specific_fact("formula_root", (cf_root,))
def _actions_activation_stats(engine, cf_name):
print("-" * 80)
print("CF Data Variable: %r" % cf_name)
engine.print_stats()
print("Rules Triggered:")
for rule in sorted(list(engine.rules_triggered)):
print("\t%s" % rule)
print("Case Specific Facts:")
kb_facts = engine.get_kb()
for key in kb_facts.entity_lists.keys():
for arg in kb_facts.entity_lists[key].case_specific_facts:
print("\t%s%s" % (key, arg))
def _set_attributes(attributes, key, value):
"""Set attributes dictionary, converting unicode strings appropriately."""
if isinstance(value, str):
try:
attributes[str(key)] = str(value)
except UnicodeEncodeError:
attributes[str(key)] = value
else:
attributes[str(key)] = value
def _add_unused_attributes(iris_object, cf_var):
"""Populate the attributes of a cf element with the "unused" attributes.
Populate the attributes of a cf element with the "unused" attributes
from the associated CF-netCDF variable. That is, all those that aren't CF
reserved terms.
"""
def attribute_predicate(item):
return item[0] not in _CF_ATTRS
tmpvar = filter(attribute_predicate, cf_var.cf_attrs_unused())
attrs_dict = iris_object.attributes
if hasattr(attrs_dict, "locals"):
# Treat cube attributes (i.e. a CubeAttrsDict) as a special case.
# These attrs are "local" (i.e. on the variable), so record them as such.
attrs_dict = attrs_dict.locals
for attr_name, attr_value in tmpvar:
_set_attributes(attrs_dict, attr_name, attr_value)
def _get_actual_dtype(cf_var):
# Figure out what the eventual data type will be after any scale/offset
# transforms.
dummy_data = np.zeros(1, dtype=cf_var.dtype)
if hasattr(cf_var, "scale_factor"):
dummy_data = cf_var.scale_factor * dummy_data
if hasattr(cf_var, "add_offset"):
dummy_data = cf_var.add_offset + dummy_data
return dummy_data.dtype
# An arbitrary variable array size, below which we will fetch real data from a variable
# rather than making a lazy array for deferred access.
# Set by experiment at roughly the point where it begins to save us memory, but actually
# mostly done for speed improvement. See https://github.com/SciTools/iris/pull/5069
_LAZYVAR_MIN_BYTES = 5000
def _get_cf_var_data(cf_var, filename):
"""Get an array representing the data of a CF variable.
This is typically a lazy array based around a NetCDFDataProxy, but if the variable
is "sufficiently small", we instead fetch the data as a real (numpy) array.
The latter is especially valuable for scalar coordinates, which are otherwise
unnecessarily slow + wasteful of memory.
"""
global CHUNK_CONTROL
if hasattr(cf_var, "_data_array"):
# The variable is not an actual netCDF4 file variable, but an emulating
# object with an attached data array (either numpy or dask), which can be
# returned immediately as-is. This is used as a hook to translate data to/from
# netcdf data container objects in other packages, such as xarray.
# See https://github.com/SciTools/iris/issues/4994 "Xarray bridge".
result = cf_var._data_array
else:
total_bytes = cf_var.size * cf_var.dtype.itemsize
if total_bytes < _LAZYVAR_MIN_BYTES:
# Don't make a lazy array, as it will cost more memory AND more time to access.
# Instead fetch the data immediately, as a real array, and return that.
result = cf_var[:]
else:
# Get lazy chunked data out of a cf variable.
# Creates Dask wrappers around data arrays for any cube components which
# can have lazy values, e.g. Cube, Coord, CellMeasure, AuxiliaryVariable.
dtype = _get_actual_dtype(cf_var)
# Make a data-proxy that mimics array access and can fetch from the file.
fill_value = getattr(
cf_var.cf_data,
"_FillValue",
_thread_safe_nc.default_fillvals[cf_var.dtype.str[1:]],
)
proxy = NetCDFDataProxy(
cf_var.shape, dtype, filename, cf_var.cf_name, fill_value
)
# Get the chunking specified for the variable : this is either a shape, or
# maybe the string "contiguous".
if CHUNK_CONTROL.mode is ChunkControl.Modes.AS_DASK:
result = as_lazy_data(proxy, meta=proxy.dask_meta, chunks="auto")
else:
chunks = cf_var.cf_data.chunking()
if chunks is None:
# Occurs for non-version-4 netcdf
chunks = "contiguous"
# In the "contiguous" case, pass chunks=None to 'as_lazy_data'.
if chunks == "contiguous":
if (
CHUNK_CONTROL.mode is ChunkControl.Modes.FROM_FILE
and isinstance(cf_var, iris.fileformats.cf.CFDataVariable)
):
raise KeyError(
f"{cf_var.cf_name} does not contain pre-existing chunk specifications."
f" Instead, you might wish to use CHUNK_CONTROL.set(), or just use default"
f" behaviour outside of a context manager. "
)
# Equivalent to chunks=None, but value required by chunking control
chunks = list(cf_var.shape)
# Modify the chunking in the context of an active chunking control.
# N.B. settings specific to this named var override global ('*') ones.
dim_chunks = CHUNK_CONTROL.var_dim_chunksizes.get(
cf_var.cf_name
) or CHUNK_CONTROL.var_dim_chunksizes.get("*")
dims = cf_var.cf_data.dimensions
if CHUNK_CONTROL.mode is ChunkControl.Modes.FROM_FILE:
dims_fixed = np.ones(len(dims), dtype=bool)
elif not dim_chunks:
dims_fixed = None
else:
# Modify the chunks argument, and pass in a list of 'fixed' dims, for
# any of our dims which are controlled.
dims_fixed = np.zeros(len(dims), dtype=bool)
for i_dim, dim_name in enumerate(dims):
dim_chunksize = dim_chunks.get(dim_name)
if dim_chunksize:
if dim_chunksize == -1:
chunks[i_dim] = cf_var.shape[i_dim]
else:
chunks[i_dim] = dim_chunksize
dims_fixed[i_dim] = True
if dims_fixed is None:
dims_fixed = [dims_fixed]
result = as_lazy_data(
proxy,
meta=proxy.dask_meta,
chunks=chunks,
dims_fixed=tuple(dims_fixed),
)
return result
class _OrderedAddableList(list):
"""A custom container object for actions recording.
Used purely in actions debugging, to accumulate a record of which actions
were activated.
It replaces a set, so as to preserve the ordering of operations, with
possible repeats, and it also numbers the entries.
The actions routines invoke an 'add' method, so this effectively replaces
a set.add with a list.append.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._n_add = 0
def add(self, msg):
self._n_add += 1
n_add = self._n_add
self.append(f"#{n_add:03d} : {msg}")
def _load_cube(engine, cf, cf_var, filename):
global CHUNK_CONTROL
# Translate dimension chunk-settings specific to this cube (i.e. named by
# it's data-var) into global ones, for the duration of this load.
# Thus, by default, we will create any AuxCoords, CellMeasures et al with
# any per-dimension chunksizes specified for the cube.
these_settings = CHUNK_CONTROL.var_dim_chunksizes.get(cf_var.cf_name, {})
with CHUNK_CONTROL.set(**these_settings):
return _load_cube_inner(engine, cf, cf_var, filename)
def _load_cube_inner(engine, cf, cf_var, filename):
from iris.cube import Cube
"""Create the cube associated with the CF-netCDF data variable."""
data = _get_cf_var_data(cf_var, filename)
cube = Cube(data)
# Reset the actions engine.
engine.reset()
# Initialise engine rule processing hooks.
engine.cf_var = cf_var
engine.cube = cube
engine.cube_parts = {}
engine.requires = {}
engine.rules_triggered = _OrderedAddableList()
engine.filename = filename
# Assert all the case-specific facts.
# This extracts 'facts' specific to this data-variable (aka cube), from
# the info supplied in the CFGroup object.
_assert_case_specific_facts(engine, cf, cf_var.cf_group)
# Run the actions engine.
# This creates various cube elements and attaches them to the cube.
# It also records various other info on the engine, to be processed later.
engine.activate()
# Having run the rules, now add the "unused" attributes to each cf element.
def fix_attributes_all_elements(role_name):
elements_and_names = engine.cube_parts.get(role_name, [])
for iris_object, cf_var_name in elements_and_names:
_add_unused_attributes(iris_object, cf.cf_group[cf_var_name])
# Populate the attributes of all coordinates, cell-measures and ancillary-vars.
fix_attributes_all_elements("coordinates")
fix_attributes_all_elements("ancillary_variables")
fix_attributes_all_elements("cell_measures")
# Also populate attributes of the top-level cube itself.
_add_unused_attributes(cube, cf_var)
# Work out reference names for all the coords.
names = {
coord.var_name: coord.standard_name or coord.var_name or "unknown"
for coord in cube.coords()
}
# Add all the cube cell methods.
cube.cell_methods = [
iris.coords.CellMethod(
method=method.method,
intervals=method.intervals,
comments=method.comments,
coords=[
names[coord_name] if coord_name in names else coord_name
for coord_name in method.coord_names
],
)
for method in cube.cell_methods
]
if DEBUG:
# Show activation statistics for this data-var (i.e. cube).
_actions_activation_stats(engine, cf_var.cf_name)
return cube
def _load_aux_factory(engine, cube):
"""Convert any CF-netCDF dimensionless coordinate to an AuxCoordFactory."""
formula_type = engine.requires.get("formula_type")
if formula_type in [
"atmosphere_sigma_coordinate",
"atmosphere_hybrid_height_coordinate",
"atmosphere_hybrid_sigma_pressure_coordinate",
"ocean_sigma_z_coordinate",
"ocean_sigma_coordinate",
"ocean_s_coordinate",
"ocean_s_coordinate_g1",
"ocean_s_coordinate_g2",
]:
def coord_from_term(term):
# Convert term names to coordinates (via netCDF variable names).
name = engine.requires["formula_terms"].get(term, None)
if name is not None:
for coord, cf_var_name in engine.cube_parts["coordinates"]:
if cf_var_name == name:
return coord
warnings.warn(
"Unable to find coordinate for variable {!r}".format(name),
category=iris.warnings.IrisFactoryCoordNotFoundWarning,
)
if formula_type == "atmosphere_sigma_coordinate":
pressure_at_top = coord_from_term("ptop")
sigma = coord_from_term("sigma")
surface_air_pressure = coord_from_term("ps")
factory = AtmosphereSigmaFactory(
pressure_at_top, sigma, surface_air_pressure
)
elif formula_type == "atmosphere_hybrid_height_coordinate":
delta = coord_from_term("a")
sigma = coord_from_term("b")
orography = coord_from_term("orog")
factory = HybridHeightFactory(delta, sigma, orography)
elif formula_type == "atmosphere_hybrid_sigma_pressure_coordinate":
# Hybrid pressure has two valid versions of its formula terms:
# "p0: var1 a: var2 b: var3 ps: var4" or
# "ap: var1 b: var2 ps: var3" where "ap = p0 * a"
# Attempt to get the "ap" term.
delta = coord_from_term("ap")
if delta is None:
# The "ap" term is unavailable, so try getting terms "p0"
# and "a" terms in order to derive an "ap" equivalent term.
coord_p0 = coord_from_term("p0")
if coord_p0 is not None:
if coord_p0.shape != (1,):
msg = (
"Expecting {!r} to be a scalar reference "
"pressure coordinate, got shape {!r}".format(
coord_p0.var_name, coord_p0.shape
)
)
raise ValueError(msg)
if coord_p0.has_bounds():
msg = (
"Ignoring atmosphere hybrid sigma pressure "
"scalar coordinate {!r} bounds.".format(coord_p0.name())
)
warnings.warn(
msg,
category=_WarnComboIgnoringBoundsLoad,
)
coord_a = coord_from_term("a")
if coord_a is not None:
if coord_a.units.is_unknown():
# Be graceful, and promote unknown to dimensionless units.
coord_a.units = "1"
delta = coord_a * coord_p0.points[0]
delta.units = coord_a.units * coord_p0.units
delta.rename("vertical pressure")
delta.var_name = "ap"
cube.add_aux_coord(delta, cube.coord_dims(coord_a))
sigma = coord_from_term("b")
surface_air_pressure = coord_from_term("ps")
factory = HybridPressureFactory(delta, sigma, surface_air_pressure)
elif formula_type == "ocean_sigma_z_coordinate":
sigma = coord_from_term("sigma")
eta = coord_from_term("eta")
depth = coord_from_term("depth")
depth_c = coord_from_term("depth_c")
nsigma = coord_from_term("nsigma")
zlev = coord_from_term("zlev")
factory = OceanSigmaZFactory(sigma, eta, depth, depth_c, nsigma, zlev)
elif formula_type == "ocean_sigma_coordinate":
sigma = coord_from_term("sigma")
eta = coord_from_term("eta")
depth = coord_from_term("depth")
factory = OceanSigmaFactory(sigma, eta, depth)
elif formula_type == "ocean_s_coordinate":
s = coord_from_term("s")
eta = coord_from_term("eta")
depth = coord_from_term("depth")
a = coord_from_term("a")
depth_c = coord_from_term("depth_c")
b = coord_from_term("b")
factory = OceanSFactory(s, eta, depth, a, b, depth_c)
elif formula_type == "ocean_s_coordinate_g1":
s = coord_from_term("s")
c = coord_from_term("c")
eta = coord_from_term("eta")
depth = coord_from_term("depth")
depth_c = coord_from_term("depth_c")
factory = OceanSg1Factory(s, c, eta, depth, depth_c)
elif formula_type == "ocean_s_coordinate_g2":
s = coord_from_term("s")
c = coord_from_term("c")
eta = coord_from_term("eta")
depth = coord_from_term("depth")
depth_c = coord_from_term("depth_c")
factory = OceanSg2Factory(s, c, eta, depth, depth_c)
cube.add_aux_factory(factory)
def _translate_constraints_to_var_callback(constraints):
"""Translate load constraints into a simple data-var filter function, if possible.
Returns
-------
bool or None
Notes
-----
For now, ONLY handles a single NameConstraint with no 'STASH' component.
"""
import iris._constraints
constraints = iris._constraints.list_of_constraints(constraints)
result = None
if len(constraints) == 1:
(constraint,) = constraints
if (
isinstance(constraint, iris._constraints.NameConstraint)
and constraint.STASH == "none"
):
# As long as it doesn't use a STASH match, then we can treat it as
# a testing against name properties of cf_var.
# That's just like testing against name properties of a cube, except that they may not all exist.
def inner(cf_datavar):
match = True
for name in constraint._names:
expected = getattr(constraint, name)
if name != "STASH" and expected != "none":
attr_name = "cf_name" if name == "var_name" else name
# Fetch property : N.B. CFVariable caches the property values
# The use of a default here is the only difference from the code in NameConstraint.
if not hasattr(cf_datavar, attr_name):
continue
actual = getattr(cf_datavar, attr_name, "")
if actual != expected:
match = False
break
return match
result = inner
return result
[docs]
def load_cubes(file_sources, callback=None, constraints=None):
"""Load cubes from a list of NetCDF filenames/OPeNDAP URLs.
Parameters
----------
file_sources : str or list
One or more NetCDF filenames/OPeNDAP URLs to load from.
OR open datasets.
callback : function, optional
Function which can be passed on to :func:`iris.io.run_callback`.
constraints : optional
Returns
-------
Generator of loaded NetCDF :class:`iris.cube.Cube`.
"""
# Deferred import to avoid circular imports.
from iris.fileformats.cf import CFReader
from iris.io import run_callback
from .ugrid_load import (
_build_mesh_coords,
_meshes_from_cf,
)
# Create a low-level data-var filter from the original load constraints, if they are suitable.
var_callback = _translate_constraints_to_var_callback(constraints)
# Create an actions engine.
engine = _actions_engine()
if isinstance(file_sources, str) or not isinstance(file_sources, Iterable):
file_sources = [file_sources]
for file_source in file_sources:
# Ingest the file. At present may be a filepath or an open netCDF4.Dataset.
with CFReader(file_source) as cf:
meshes = _meshes_from_cf(cf)
# Process each CF data variable.
data_variables = list(cf.cf_group.data_variables.values()) + list(
cf.cf_group.promoted.values()
)
for cf_var in data_variables:
if var_callback and not var_callback(cf_var):
# Deliver only selected results.
continue
# cf_var-specific mesh handling, if a mesh is present.
# Build the mesh_coords *before* loading the cube - avoids
# mesh-related attributes being picked up by
# _add_unused_attributes().
mesh_name = None
mesh = None
mesh_coords, mesh_dim = [], None
mesh_name = getattr(cf_var, "mesh", None)
if mesh_name is not None:
try:
mesh = meshes[mesh_name]
except KeyError:
message = (
f"File does not contain mesh: '{mesh_name}' - "
f"referenced by variable: '{cf_var.cf_name}' ."
)
logger.debug(message)
if mesh is not None:
mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var)
cube = _load_cube(engine, cf, cf_var, cf.filename)
# Attach the mesh (if present) to the cube.
for mesh_coord in mesh_coords:
cube.add_aux_coord(mesh_coord, mesh_dim)
# Process any associated formula terms and attach
# the corresponding AuxCoordFactory.
try:
_load_aux_factory(engine, cube)
except ValueError as e:
warnings.warn(
"{}".format(e),
category=iris.warnings.IrisLoadWarning,
)
# Perform any user registered callback function.
cube = run_callback(callback, cube, cf_var, file_source)
# Callback mechanism may return None, which must not be yielded
if cube is None:
continue
yield cube
[docs]
class ChunkControl(threading.local):
"""Provide user control of Chunk Control."""
[docs]
class Modes(Enum):
"""Modes Enums."""
DEFAULT = auto()
FROM_FILE = auto()
AS_DASK = auto()
def __init__(self, var_dim_chunksizes=None):
"""Provide user control of Dask chunking.
The NetCDF loader is controlled by the single instance of this: the
:data:`~iris.fileformats.netcdf.loader.CHUNK_CONTROL` object.
A chunk size can be set for a specific (named) file dimension, when
loading specific (named) variables, or for all variables.
When a selected variable is a CF data-variable, which loads as a
:class:`~iris.cube.Cube`, then the given dimension chunk size is *also*
fixed for all variables which are components of that :class:`~iris.cube.Cube`,
i.e. any :class:`~iris.coords.Coord`, :class:`~iris.coords.CellMeasure`,
:class:`~iris.coords.AncillaryVariable` etc.
This can be overridden, if required, by variable-specific settings.
For this purpose, :class:`~iris.mesh.MeshCoord` and
:class:`~iris.mesh.Connectivity` are not
:class:`~iris.cube.Cube` components, and chunk control on a
:class:`~iris.cube.Cube` data-variable will not affect them.
"""
self.var_dim_chunksizes = var_dim_chunksizes or {}
self.mode = self.Modes.DEFAULT
[docs]
@contextmanager
def set(
self,
var_names: str | Iterable[str] | None = None,
**dimension_chunksizes: Mapping[str, int],
) -> Iterator[None]:
r"""Control the Dask chunk sizes applied to NetCDF variables during loading.
Parameters
----------
var_names : str or list of str, default=None
Apply the `dimension_chunksizes` controls only to these variables,
or when building :class:`~iris.cube.Cube` from these data variables.
If ``None``, settings apply to all loaded variables.
**dimension_chunksizes : dict of {str: int}
Kwargs specifying chunksizes for dimensions of file variables.
Each key-value pair defines a chunk size for a named file
dimension, e.g. ``{'time': 10, 'model_levels':1}``.
Values of ``-1`` will lock the chunk size to the full size of that
dimension.
Notes
-----
This function acts as a context manager, for use in a ``with`` block.
>>> import iris
>>> from iris.fileformats.netcdf.loader import CHUNK_CONTROL
>>> with CHUNK_CONTROL.set("air_temperature", time=180, latitude=-1):
... cube = iris.load(iris.sample_data_path("E1_north_america.nc"))[0]
When `var_names` is present, the chunk size adjustments are applied
only to the selected variables. However, for a CF data variable, this
extends to all components of the (raw) :class:`~iris.cube.Cube` created
from it.
**Un**-adjusted dimensions have chunk sizes set in the 'usual' way.
That is, according to the normal behaviour of
:func:`iris._lazy_data.as_lazy_data`, which is: chunk size is based on
the file variable chunking, or full variable shape; this is scaled up
or down by integer factors to best match the Dask default chunk size,
i.e. the setting configured by
``dask.config.set({'array.chunk-size': '250MiB'})``.
"""
old_mode = self.mode
old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes)
if var_names is None:
var_names = ["*"]
elif isinstance(var_names, str):
var_names = [var_names]
try:
for var_name in var_names:
# Note: here we simply treat '*' as another name.
# A specific name match should override a '*' setting, but
# that is implemented elsewhere.
if not isinstance(var_name, str):
msg = ( # type: ignore[unreachable]
"'var_names' should be an iterable of strings, "
f"not {var_names!r}."
)
raise ValueError(msg)
dim_chunks = self.var_dim_chunksizes.setdefault(var_name, {})
for dim_name, chunksize in dimension_chunksizes.items():
if not (isinstance(dim_name, str) and isinstance(chunksize, int)):
msg = (
"'dimension_chunksizes' kwargs should be a dict "
f"of `str: int` pairs, not {dimension_chunksizes!r}."
)
raise ValueError(msg)
dim_chunks[dim_name] = chunksize
yield
finally:
self.var_dim_chunksizes = old_var_dim_chunksizes
self.mode = old_mode
[docs]
@contextmanager
def from_file(self) -> Iterator[None]:
r"""Ensure the chunk sizes are loaded in from NetCDF file variables.
Raises
------
KeyError
If any NetCDF data variables - those that become
:class:`~iris.cube.Cube` - do not specify chunk sizes.
Notes
-----
This function acts as a context manager, for use in a ``with`` block.
"""
old_mode = self.mode
old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes)
try:
self.mode = self.Modes.FROM_FILE
yield
finally:
self.mode = old_mode
self.var_dim_chunksizes = old_var_dim_chunksizes
[docs]
@contextmanager
def as_dask(self) -> Iterator[None]:
"""Rely on Dask :external+dask:doc:`array` to control chunk sizes.
Notes
-----
This function acts as a context manager, for use in a ``with`` block.
"""
old_mode = self.mode
old_var_dim_chunksizes = deepcopy(self.var_dim_chunksizes)
try:
self.mode = self.Modes.AS_DASK
yield
finally:
self.mode = old_mode
self.var_dim_chunksizes = old_var_dim_chunksizes
# Note: the CHUNK_CONTROL object controls chunk sizing in the
# :meth:`_get_cf_var_data` method.
# N.B. :meth:`_load_cube` also modifies this when loading each cube,
# introducing an additional context in which any cube-specific settings are
# 'promoted' into being global ones.
#: The global :class:`ChunkControl` object providing user-control of Dask chunking
#: when Iris loads NetCDF files.
CHUNK_CONTROL: ChunkControl = ChunkControl()