You are viewing the latest unreleased documentation 3.10.0.dev17. You can switch to a stable version.

Source code for iris.fileformats.pp

# Copyright Iris contributors
#
# This file is part of Iris and is released under the BSD license.
# See LICENSE in the root of the repository for full licensing details.
"""Provides UK Met Office Post Process (PP) format specific capabilities."""

from abc import ABCMeta, abstractmethod
import collections
from copy import deepcopy
import operator
import os
import re
import struct
import warnings

import cf_units
import cftime
import dask
import dask.array as da
import numpy as np
import numpy.ma as ma

from iris._lazy_data import as_concrete_data, as_lazy_data, is_lazy_data
import iris.config
import iris.coord_systems
import iris.exceptions

# NOTE: this is for backwards-compatitibility *ONLY*
# We could simply remove it for v2.0 ?
from iris.fileformats._pp_lbproc_pairs import LBPROC_MAP as lbproc_map  # noqa: F401
from iris.fileformats._pp_lbproc_pairs import LBPROC_PAIRS  # noqa: F401
import iris.fileformats.pp_load_rules
from iris.fileformats.pp_save_rules import verify
import iris.fileformats.rules
import iris.warnings

try:
    import mo_pack
except ImportError:
    mo_pack = None


__all__ = [
    "EARTH_RADIUS",
    "PPField",
    "STASH",
    "as_fields",
    "load",
    "load_cubes",
    "load_pairs_from_fields",
    "save",
    "save_fields",
    "save_pairs_from_cube",
]


EARTH_RADIUS = 6371229.0


PP_HEADER_DEPTH = 256
PP_WORD_DEPTH = 4
NUM_LONG_HEADERS = 45
NUM_FLOAT_HEADERS = 19

# The header definition for header release 2.
#: A list of (header_name, position_in_header(tuple of)) pairs for
#: header release 2 - using the one-based UM/FORTRAN indexing convention.
UM_HEADER_2 = [
    ("lbyr", (1,)),
    ("lbmon", (2,)),
    ("lbdat", (3,)),
    ("lbhr", (4,)),
    ("lbmin", (5,)),
    ("lbday", (6,)),
    ("lbyrd", (7,)),
    ("lbmond", (8,)),
    ("lbdatd", (9,)),
    ("lbhrd", (10,)),
    ("lbmind", (11,)),
    ("lbdayd", (12,)),
    ("lbtim", (13,)),
    ("lbft", (14,)),
    ("lblrec", (15,)),
    ("lbcode", (16,)),
    ("lbhem", (17,)),
    ("lbrow", (18,)),
    ("lbnpt", (19,)),
    ("lbext", (20,)),
    ("lbpack", (21,)),
    ("lbrel", (22,)),
    ("lbfc", (23,)),
    ("lbcfc", (24,)),
    ("lbproc", (25,)),
    ("lbvc", (26,)),
    ("lbrvc", (27,)),
    ("lbexp", (28,)),
    ("lbegin", (29,)),
    ("lbnrec", (30,)),
    ("lbproj", (31,)),
    ("lbtyp", (32,)),
    ("lblev", (33,)),
    ("lbrsvd", (34, 35, 36, 37)),
    ("lbsrce", (38,)),
    ("lbuser", (39, 40, 41, 42, 43, 44, 45)),
    ("brsvd", (46, 47, 48, 49)),
    ("bdatum", (50,)),
    ("bacc", (51,)),
    ("blev", (52,)),
    ("brlev", (53,)),
    ("bhlev", (54,)),
    ("bhrlev", (55,)),
    ("bplat", (56,)),
    ("bplon", (57,)),
    ("bgor", (58,)),
    ("bzy", (59,)),
    ("bdy", (60,)),
    ("bzx", (61,)),
    ("bdx", (62,)),
    ("bmdi", (63,)),
    ("bmks", (64,)),
]

# The header definition for header release 3.
#: A list of (header_name, position_in_header(tuple of)) pairs for
#: header release 3 - using the one-based UM/FORTRAN indexing convention.
UM_HEADER_3 = [
    ("lbyr", (1,)),
    ("lbmon", (2,)),
    ("lbdat", (3,)),
    ("lbhr", (4,)),
    ("lbmin", (5,)),
    ("lbsec", (6,)),
    ("lbyrd", (7,)),
    ("lbmond", (8,)),
    ("lbdatd", (9,)),
    ("lbhrd", (10,)),
    ("lbmind", (11,)),
    ("lbsecd", (12,)),
    ("lbtim", (13,)),
    ("lbft", (14,)),
    ("lblrec", (15,)),
    ("lbcode", (16,)),
    ("lbhem", (17,)),
    ("lbrow", (18,)),
    ("lbnpt", (19,)),
    ("lbext", (20,)),
    ("lbpack", (21,)),
    ("lbrel", (22,)),
    ("lbfc", (23,)),
    ("lbcfc", (24,)),
    ("lbproc", (25,)),
    ("lbvc", (26,)),
    ("lbrvc", (27,)),
    ("lbexp", (28,)),
    ("lbegin", (29,)),
    ("lbnrec", (30,)),
    ("lbproj", (31,)),
    ("lbtyp", (32,)),
    ("lblev", (33,)),
    ("lbrsvd", (34, 35, 36, 37)),
    ("lbsrce", (38,)),
    ("lbuser", (39, 40, 41, 42, 43, 44, 45)),
    ("brsvd", (46, 47, 48, 49)),
    ("bdatum", (50,)),
    ("bacc", (51,)),
    ("blev", (52,)),
    ("brlev", (53,)),
    ("bhlev", (54,)),
    ("bhrlev", (55,)),
    ("bplat", (56,)),
    ("bplon", (57,)),
    ("bgor", (58,)),
    ("bzy", (59,)),
    ("bdy", (60,)),
    ("bzx", (61,)),
    ("bdx", (62,)),
    ("bmdi", (63,)),
    ("bmks", (64,)),
]

# A map from header-release-number to header definition
UM_HEADERS = {2: UM_HEADER_2, 3: UM_HEADER_3}

# Offset value to convert from UM_HEADER positions to PP_HEADER offsets.
UM_TO_PP_HEADER_OFFSET = 1

#: A dictionary mapping IB values to their names.
EXTRA_DATA = {
    1: "x",
    2: "y",
    3: "lower_y_domain",
    4: "lower_x_domain",
    5: "upper_y_domain",
    6: "upper_x_domain",
    7: "lower_z_domain",
    8: "upper_z_domain",
    10: "field_title",
    11: "domain_title",
    12: "x_lower_bound",
    13: "x_upper_bound",
    14: "y_lower_bound",
    15: "y_upper_bound",
}


#: Maps lbuser[0] to numpy data type. "default" will be interpreted if
#: no match is found, providing a warning in such a case.
LBUSER_DTYPE_LOOKUP = {
    1: np.dtype(">f4"),
    2: np.dtype(">i4"),
    3: np.dtype(">i4"),
    -1: np.dtype(">f4"),
    -2: np.dtype(">i4"),
    -3: np.dtype(">i4"),
    "default": np.dtype(">f4"),
}


class _WarnComboLoadingMask(
    iris.warnings.IrisLoadWarning,
    iris.warnings.IrisMaskValueMatchWarning,
):
    """One-off combination of warning classes - enhances user filtering."""

    pass


class _WarnComboLoadingDefaulting(
    iris.warnings.IrisDefaultingWarning,
    iris.warnings.IrisLoadWarning,
):
    """One-off combination of warning classes - enhances user filtering."""

    pass


class _WarnComboIgnoringLoad(
    iris.warnings.IrisIgnoringWarning,
    iris.warnings.IrisLoadWarning,
):
    """One-off combination of warning classes - enhances user filtering."""

    pass


[docs] class STASH(collections.namedtuple("STASH", "model section item")): """A class to hold a single STASH code. Create instances using: >>> model = 1 >>> section = 2 >>> item = 3 >>> my_stash = iris.fileformats.pp.STASH(model, section, item) Access the sub-components via: >>> my_stash.model 1 >>> my_stash.section 2 >>> my_stash.item 3 String conversion results in the MSI format: >>> print(iris.fileformats.pp.STASH(1, 16, 203)) m01s16i203 A stash object can be compared directly to its string representation: >>> iris.fileformats.pp.STASH(1, 0, 4) == 'm01s0i004' True """ __slots__ = ()
[docs] def __new__(cls, model, section, item): """Create namedtuple STASH instance. Parameters ---------- model : A positive integer less than 100, or None. section : A non-negative integer less than 100, or None. item : A positive integer less than 1000, or None. """ model = cls._validate_member("model", model, 1, 99) section = cls._validate_member("section", section, 0, 99) item = cls._validate_member("item", item, 1, 999) return super().__new__(cls, model, section, item)
[docs] @staticmethod def from_msi(msi): """Convert a STASH code MSI string to a STASH instance.""" if not isinstance(msi, str): raise TypeError("Expected STASH code MSI string, got %r" % (msi,)) msi_match = re.match( r"^\s*m(\d+|\?+)s(\d+|\?+)i(\d+|\?+)\s*$", msi, re.IGNORECASE ) if msi_match is None: raise ValueError( 'Expected STASH code MSI string "mXXsXXiXXX", ' "got %r" % (msi,) ) return STASH(*msi_match.groups())
@staticmethod def _validate_member(name, value, lower_limit, upper_limit): # Returns a valid integer or None. try: value = int(value) if not lower_limit <= value <= upper_limit: value = None except (TypeError, ValueError): value = None return value def __str__(self): model = self._format_member(self.model, 2) section = self._format_member(self.section, 2) item = self._format_member(self.item, 3) return "m{}s{}i{}".format(model, section, item) def _format_member(self, value, num_digits): if value is None: result = "?" * num_digits else: format_spec = "0" + str(num_digits) result = format(value, format_spec) return result
[docs] def lbuser3(self): """Return the lbuser[3] value that this stash represents.""" return (self.section or 0) * 1000 + (self.item or 0)
[docs] def lbuser6(self): """Return the lbuser[6] value that this stash represents.""" return self.model or 0
@property def is_valid(self): return "?" not in str(self) def __hash__(self): return super().__hash__() def __eq__(self, other): if isinstance(other, str): return super().__eq__(STASH.from_msi(other)) else: return super().__eq__(other) def __ne__(self, other): return not self.__eq__(other)
class SplittableInt: """A class to hold integers which can easily get each decimal digit individually. >>> three_six_two = SplittableInt(362) >>> print(three_six_two) 362 >>> print(three_six_two[0]) 2 >>> print(three_six_two[2]) 3 Notes ----- No support for negative numbers. """ def __init__(self, value, name_mapping_dict=None): """Build a SplittableInt given the positive integer value provided. Parameters ---------- name_mapping_dict : dict A special mapping to provide name based access to specific integer positions. Examples -------- >>> a = SplittableInt(1234, {'hundreds': 2}) >>> print(a.hundreds) 2 >>> a.hundreds = 9 >>> print(a.hundreds) 9 >>> print(a) 1934 """ if value < 0: raise ValueError( "Negative numbers not supported with splittable integers object" ) # define the name lookup first (as this is the way __setattr__ is # plumbed) #: A dictionary mapping special attribute names on this object #: to the slices/indices required to access them. self._name_lookup = name_mapping_dict or {} self._value = value self._calculate_str_value_from_value() def __int__(self): return int(self._value) def _calculate_str_value_from_value(self): # Reverse the string to get the appropriate index when getting the # sliced value self._strvalue = [int(c) for c in str(self._value)[::-1]] # Associate the names in the lookup table to attributes for name, index in self._name_lookup.items(): object.__setattr__(self, name, self[index]) def _calculate_value_from_str_value(self): self._value = np.sum([10**i * val for i, val in enumerate(self._strvalue)]) def __len__(self): return len(self._strvalue) def __getitem__(self, key): try: val = self._strvalue[key] except IndexError: val = 0 # if the key returns a list of values, then combine them together # to an integer if isinstance(val, list): val = sum([10**i * val for i, val in enumerate(val)]) return val def __setitem__(self, key, value): # The setitem method has been overridden so that assignment using # ``val[0] = 1`` style syntax updates # the entire object appropriately. if not isinstance(value, int) or value < 0: raise ValueError("Can only set %s as a positive integer value." % key) if isinstance(key, slice): if ( (key.start is not None and key.start < 0) or (key.step is not None and key.step < 0) or (key.stop is not None and key.stop < 0) ): raise ValueError( "Cannot assign a value with slice objects" " containing negative indices." ) # calculate the current length of the value of this string current_length = len(range(*key.indices(len(self)))) # get indices for as many digits as have been requested. Putting # the upper limit on the number of digits at 100. indices = range(*key.indices(100)) if len(indices) < len(str(value)): raise ValueError( "Cannot put %s into %s as it has too many digits." % (value, key) ) # Iterate over each of the indices in the slice, # zipping them together with the associated digit for index, digit in zip(indices, str(value).zfill(current_length)[::-1]): # assign each digit to the associated index self.__setitem__(index, int(digit)) else: # If we are trying to set to an index which does not currently # exist in _strvalue then extend it to the # appropriate length if (key + 1) > len(self): new_str_value = [0] * (key + 1) new_str_value[: len(self)] = self._strvalue self._strvalue = new_str_value self._strvalue[key] = value for name, index in self._name_lookup.items(): if index == key: object.__setattr__(self, name, value) self._calculate_value_from_str_value() def __setattr__(self, name, value): # if the attribute is a special value, update the index value which # will in turn update the attribute value if name != "_name_lookup" and name in self._name_lookup: self[self._name_lookup[name]] = value else: object.__setattr__(self, name, value) def __str__(self): return str(self._value) def __repr__(self): return "SplittableInt(%r, name_mapping_dict=%r)" % ( self._value, self._name_lookup, ) def __eq__(self, other): result = NotImplemented if isinstance(other, SplittableInt): result = self._value == other._value elif isinstance(other, int): result = self._value == other return result def __ne__(self, other): result = self.__eq__(other) if result is not NotImplemented: result = not result return result def _compare(self, other, op): result = NotImplemented if isinstance(other, SplittableInt): result = op(self._value, other._value) elif isinstance(other, int): result = op(self._value, other) return result def __lt__(self, other): return self._compare(other, operator.lt) def __le__(self, other): return self._compare(other, operator.le) def __gt__(self, other): return self._compare(other, operator.gt) def __ge__(self, other): return self._compare(other, operator.ge) class PPDataProxy: """A reference to the data payload of a single PP field.""" __slots__ = ( "shape", "src_dtype", "path", "offset", "data_len", "_lbpack", "boundary_packing", "mdi", ) def __init__( self, shape, src_dtype, path, offset, data_len, lbpack, boundary_packing, mdi, ): self.shape = shape self.src_dtype = src_dtype self.path = path self.offset = offset self.data_len = data_len self.lbpack = lbpack self.boundary_packing = boundary_packing self.mdi = mdi # lbpack @property def lbpack(self): value = self._lbpack if not isinstance(self._lbpack, SplittableInt): mapping = dict(n5=slice(4, None), n4=3, n3=2, n2=1, n1=0) value = SplittableInt(self._lbpack, mapping) return value @lbpack.setter def lbpack(self, value): self._lbpack = value @property def dtype(self): return self.src_dtype.newbyteorder("=") @property def fill_value(self): return self.mdi @property def ndim(self): return len(self.shape) def __getitem__(self, keys): with open(self.path, "rb") as pp_file: pp_file.seek(self.offset, os.SEEK_SET) data_bytes = pp_file.read(self.data_len) data = _data_bytes_to_shaped_array( data_bytes, self.lbpack, self.boundary_packing, self.shape, self.src_dtype, self.mdi, ) result = data.__getitem__(keys) return np.asanyarray(result, dtype=self.dtype) def __repr__(self): fmt = ( "<{self.__class__.__name__} shape={self.shape}" " src_dtype={self.dtype!r} path={self.path!r}" " offset={self.offset}>" ) return fmt.format(self=self) def __getstate__(self): # Because we have __slots__, this is needed to support Pickle.dump() return [(name, getattr(self, name)) for name in self.__slots__] def __setstate__(self, state): # Because we have __slots__, this is needed to support Pickle.load() # (Use setattr, as there is no object dictionary.) for key, value in state: setattr(self, key, value) def __eq__(self, other): result = NotImplemented if isinstance(other, PPDataProxy): result = True for attr in self.__slots__: if getattr(self, attr) != getattr(other, attr): result = False break return result def __ne__(self, other): result = self.__eq__(other) if result is not NotImplemented: result = not result return result def _data_bytes_to_shaped_array( data_bytes, lbpack, boundary_packing, data_shape, data_type, mdi, mask=None ): """Convert binary payload into a numpy array. Convert the already read binary data payload into a numpy array, unpacking and decompressing as per the F3 specification. """ if lbpack.n1 in (0, 2): data = np.frombuffer(data_bytes, dtype=data_type) elif lbpack.n1 == 1: if mo_pack is not None: try: decompress_wgdos = mo_pack.decompress_wgdos except AttributeError: decompress_wgdos = mo_pack.unpack_wgdos else: msg = ( "Unpacking PP fields with LBPACK of {} " "requires mo_pack to be installed".format(lbpack.n1) ) raise ValueError(msg) data = decompress_wgdos(data_bytes, data_shape[0], data_shape[1], mdi) elif lbpack.n1 == 4: if mo_pack is not None and hasattr(mo_pack, "decompress_rle"): decompress_rle = mo_pack.decompress_rle else: msg = ( "Unpacking PP fields with LBPACK of {} " "requires mo_pack to be installed".format(lbpack.n1) ) raise ValueError(msg) data = decompress_rle(data_bytes, data_shape[0], data_shape[1], mdi) else: raise iris.exceptions.NotYetImplementedError( "PP fields with LBPACK of %s are not yet supported." % lbpack ) # Ensure we have a writeable data buffer. # NOTE: "data.setflags(write=True)" is not valid for numpy >= 1.16.0. if not data.flags["WRITEABLE"]: data = data.copy() # Ensure the data is in the native byte order if not data.dtype.isnative: data.byteswap(True) data.dtype = data.dtype.newbyteorder("=") if boundary_packing is not None: # Convert a long string of numbers into a "lateral boundary # condition" array, which is split into 4 quartiles, North # East, South, West and where North and South contain the corners. compressed_data = data data = np.ma.masked_all(data_shape) data.fill_value = mdi boundary_height = boundary_packing.y_halo + boundary_packing.rim_width boundary_width = boundary_packing.x_halo + boundary_packing.rim_width y_height, x_width = data_shape # The height of the east and west components. mid_height = y_height - 2 * boundary_height n_s_shape = boundary_height, x_width e_w_shape = mid_height, boundary_width # Keep track of our current position in the array. current_posn = 0 north = compressed_data[: boundary_height * x_width] current_posn += len(north) data[-boundary_height:, :] = north.reshape(*n_s_shape) east = compressed_data[ current_posn : current_posn + boundary_width * mid_height ] current_posn += len(east) data[boundary_height:-boundary_height, -boundary_width:] = east.reshape( *e_w_shape ) south = compressed_data[current_posn : current_posn + boundary_height * x_width] current_posn += len(south) data[:boundary_height, :] = south.reshape(*n_s_shape) west = compressed_data[ current_posn : current_posn + boundary_width * mid_height ] current_posn += len(west) data[boundary_height:-boundary_height, :boundary_width] = west.reshape( *e_w_shape ) elif lbpack.n2 == 2: if mask is None: # If we are given no mask to apply, then just return raw data, even # though it does not have the correct shape. # For dask-delayed loading, this means that mask, data and the # combination can be properly handled within a dask graph. # However, we still mask any MDI values in the array (below). pass else: land_mask = mask.data.astype(np.bool_) sea_mask = ~land_mask new_data = np.ma.masked_all(land_mask.shape) new_data.fill_value = mdi if lbpack.n3 == 1: # Land mask packed data. # Sometimes the data comes in longer than it should be (i.e. it # looks like the compressed data is compressed, but the # trailing data hasn't been clipped off!). new_data[land_mask] = data[: land_mask.sum()] elif lbpack.n3 == 2: # Sea mask packed data. new_data[sea_mask] = data[: sea_mask.sum()] else: raise ValueError("Unsupported mask compression.") data = new_data else: # Reform in row-column order actual_length = np.prod(data.shape) if (expected_length := np.prod(data_shape)) != actual_length: if (expected_length < actual_length) and (data.ndim == 1): # known use case where mule adds padding to data payload # for a collapsed field. data = data[:expected_length] else: emsg = ( f"PP field data containing {actual_length} words does not " f"match expected length of {expected_length} words." ) raise ValueError(emsg) data.shape = data_shape # Mask the array if mdi in data: data = ma.masked_values(data, mdi, copy=False) return data # The special headers of the PPField classes which get some improved # functionality _SPECIAL_HEADERS = ( "lbtim", "lbcode", "lbpack", "lbproc", "data", "stash", "t1", "t2", ) def _header_defn(release_number): """Return zero-indexed header definition for a particular release of a PPField.""" um_header = UM_HEADERS[release_number] offset = UM_TO_PP_HEADER_OFFSET return [ (name, tuple(position - offset for position in positions)) for name, positions in um_header ] def _pp_attribute_names(header_defn): """Return the allowed attributes of a PPField. Return the allowed attributes of a PPField: all of the normal headers (i.e. not the _SPECIAL_HEADERS), the _SPECIAL_HEADERS with '_' prefixed, the possible extra data headers. """ normal_headers = list( name for name, positions in header_defn if name not in _SPECIAL_HEADERS ) special_headers = list("_" + name for name in _SPECIAL_HEADERS) extra_data = list(EXTRA_DATA.values()) special_attributes = [ "_raw_header", "raw_lbtim", "raw_lbpack", "boundary_packing", "_index_in_structured_load_file", ] return normal_headers + special_headers + extra_data + special_attributes
[docs] class PPField(metaclass=ABCMeta): """Base class for PP fields. A generic class for PP fields - not specific to a particular header release number. A PPField instance can easily access the PP header "words" as attributes with some added useful capabilities:: for field in iris.fileformats.pp.load(filename): print(field.lbyr) print(field.lbuser) print(field.lbuser[0]) print(field.lbtim) print(field.lbtim.ia) print(field.t1) """ # NB. Subclasses must define the attribute HEADER_DEFN to be their # zero-based header definition. See PPField2 and PPField3 for examples. __slots__ = () def __init__(self, header=None): # Combined header longs and floats data cache. self._raw_header = header self.raw_lbtim = None self.raw_lbpack = None self.boundary_packing = None self._index_in_structured_load_file = None if header is not None: self.raw_lbtim = header[self.HEADER_DICT["lbtim"][0]] self.raw_lbpack = header[self.HEADER_DICT["lbpack"][0]]
[docs] def __getattr__(self, key): """Return the value of the key. Method supports deferred attribute creation, which offers a significant loading optimisation, particularly when not all attributes are referenced and therefore created on the instance. When an 'ordinary' HEADER_DICT attribute is required, its associated header offset is used to lookup the data value/s from the combined header longs and floats data cache. The attribute is then set with this value/s on the instance. Thus future lookups for this attribute will be optimised, avoiding the __getattr__ lookup mechanism again. When a 'special' HEADER_DICT attribute (leading underscore) is required, its associated 'ordinary' (no leading underscore) header offset is used to lookup the data value/s from the combined header longs and floats data cache. The 'ordinary' attribute is then set with this value/s on the instance. This is required as 'special' attributes have supporting property convenience functionality base on the attribute value e.g. see 'lbpack' and 'lbtim'. Note that, for 'special' attributes the interface is via the 'ordinary' attribute but the underlying attribute value is stored within the 'special' attribute. """ try: loc = self.HEADER_DICT[key] except KeyError: if key[0] == "_" and key[1:] in self.HEADER_DICT: # Must be a special attribute. loc = self.HEADER_DICT[key[1:]] else: cls = self.__class__.__name__ msg = "{!r} object has no attribute {!r}".format(cls, key) raise AttributeError(msg) if len(loc) == 1: value = self._raw_header[loc[0]] else: start = loc[0] stop = loc[-1] + 1 value = tuple(self._raw_header[start:stop]) # Now cache the attribute value on the instance. if key[0] == "_": # First we need to assign to the attribute so that the # special attribute is calculated, then we retrieve it. setattr(self, key[1:], value) value = getattr(self, key) else: setattr(self, key, value) return value
@property @abstractmethod def t1(self): pass @property @abstractmethod def t2(self): pass
[docs] def __repr__(self): """Return a string representation of the PP field.""" # Define an ordering on the basic header names attribute_priority_lookup = {name: loc[0] for name, loc in self.HEADER_DEFN} # With the attributes sorted the order will remain stable if extra # attributes are added. public_attribute_names = list(attribute_priority_lookup.keys()) + list( EXTRA_DATA.values() ) self_attrs = [ (name, getattr(self, name, None)) for name in public_attribute_names ] self_attrs = [pair for pair in self_attrs if pair[1] is not None] self_attrs.append(("data", self.data)) # sort the attributes by position in the pp header followed, # then by alphabetical order. attributes = sorted( self_attrs, key=lambda pair: ( attribute_priority_lookup.get(pair[0], 999), pair[0], ), ) return ( "PP Field" + "".join(["\n %s: %s" % (k, v) for k, v in attributes]) + "\n" )
@property def stash(self): """Stash property giving access to the associated STASH object. Now supporting __eq__. """ if ( not hasattr(self, "_stash") or self.lbuser[6] != self._stash.lbuser6() or self.lbuser[3] != self._stash.lbuser3() ): self._stash = STASH( self.lbuser[6], self.lbuser[3] // 1000, self.lbuser[3] % 1000 ) return self._stash @stash.setter def stash(self, stash): if isinstance(stash, str): self._stash = STASH.from_msi(stash) elif isinstance(stash, STASH): self._stash = stash else: raise ValueError("Cannot set stash to {!r}".format(stash)) # Keep the lbuser up to date. self.lbuser = list(self.lbuser) self.lbuser[6] = self._stash.lbuser6() self.lbuser[3] = self._stash.lbuser3() @property def lbtim(self): return self._lbtim @lbtim.setter def lbtim(self, value): value = int(value) self.raw_lbtim = value self._lbtim = SplittableInt(value, {"ia": slice(2, None), "ib": 1, "ic": 0}) # lbcode @property def lbcode(self): return self._lbcode @lbcode.setter def lbcode(self, new_value): if not isinstance(new_value, SplittableInt): # add the ix/iy values for lbcode new_value = SplittableInt(new_value, {"iy": slice(0, 2), "ix": slice(2, 4)}) self._lbcode = new_value # lbpack @property def lbpack(self): return self._lbpack @lbpack.setter def lbpack(self, new_value): if not isinstance(new_value, SplittableInt): self.raw_lbpack = new_value # add the n1/n2/n3/n4/n5 values for lbpack name_mapping = dict(n5=slice(4, None), n4=3, n3=2, n2=1, n1=0) new_value = SplittableInt(new_value, name_mapping) else: self.raw_lbpack = new_value._value self._lbpack = new_value @property def lbproc(self): return self._lbproc @lbproc.setter def lbproc(self, value): value = int(value) if value < 0: raise ValueError("PPField.lbproc cannot be a negative number.") self._lbproc = value @property def data(self): """:class:`numpy.ndarray` representing multidimensional data of the pp file.""" if is_lazy_data(self._data): # Replace with real data on the first access. self._data = as_concrete_data(self._data) return self._data @data.setter def data(self, value): self._data = value
[docs] def core_data(self): return self._data
@property def calendar(self): """Return the calendar of the field.""" # TODO #577 What calendar to return when ibtim.ic in [0, 3] calendar = cf_units.CALENDAR_STANDARD if self.lbtim.ic == 2: calendar = cf_units.CALENDAR_360_DAY elif self.lbtim.ic == 4: calendar = cf_units.CALENDAR_365_DAY return calendar def _read_extra_data(self, pp_file, file_reader, extra_len, little_ended=False): """Read the extra data section and update the self appropriately.""" dtype_endian_char = "<" if little_ended else ">" # While there is still extra data to decode run this loop while extra_len > 0: dtype = "%cL" % dtype_endian_char extra_int_code = struct.unpack_from(dtype, file_reader(PP_WORD_DEPTH))[0] extra_len -= PP_WORD_DEPTH ib = extra_int_code % 1000 ia = extra_int_code // 1000 data_len = ia * PP_WORD_DEPTH if ib == 10: dtype = "%c%ds" % (dtype_endian_char, data_len) field_title = struct.unpack_from(dtype, file_reader(data_len)) self.field_title = field_title[0].rstrip(b"\00").decode() elif ib == 11: dtype = "%c%ds" % (dtype_endian_char, data_len) domain_title = struct.unpack_from(dtype, file_reader(data_len)) self.domain_title = domain_title[0].rstrip(b"\00").decode() elif ib in EXTRA_DATA: attr_name = EXTRA_DATA[ib] dtype = np.dtype("%cf%d" % (dtype_endian_char, PP_WORD_DEPTH)) values = np.fromfile(pp_file, dtype=dtype, count=ia) # Ensure the values are in the native byte order if not values.dtype.isnative: values.byteswap(True) values.dtype = values.dtype.newbyteorder("=") setattr(self, attr_name, values) else: raise ValueError("Unknown IB value for extra data: %s" % ib) extra_len -= data_len @property def x_bounds(self): if hasattr(self, "x_lower_bound") and hasattr(self, "x_upper_bound"): return np.column_stack((self.x_lower_bound, self.x_upper_bound)) @property def y_bounds(self): if hasattr(self, "y_lower_bound") and hasattr(self, "y_upper_bound"): return np.column_stack((self.y_lower_bound, self.y_upper_bound))
[docs] def save(self, file_handle): """Save the PPField to the given file object. Typically created with :func:`open`:: # to append the field to a file with open(filename, 'ab') as fh: a_pp_field.save(fh) # to overwrite/create a file with open(filename, 'wb') as fh: a_pp_field.save(fh) Notes ----- The fields which are automatically calculated are: 'lbext', 'lblrec' and 'lbuser[0]'. Some fields are not currently populated, these are: 'lbegin', 'lbnrec', 'lbuser[1]'. """ # Get the actual data content. data = self.data mdi = self.bmdi if np.any(data == mdi): msg = ( "PPField data contains unmasked points equal to the fill " "value, {}. As saved, these points will read back as " "missing data. To save these as normal values, please " "set the field BMDI not equal to any valid data points." ) warnings.warn( msg.format(mdi), category=_WarnComboLoadingMask, ) if isinstance(data, ma.MaskedArray): if ma.is_masked(data): data = data.filled(fill_value=mdi) else: data = data.data # Make sure the data is big-endian if data.dtype.newbyteorder(">") != data.dtype: # take a copy of the data when byteswapping data = data.byteswap(False) data.dtype = data.dtype.newbyteorder(">") # Create the arrays which will hold the header information lb = np.empty(shape=NUM_LONG_HEADERS, dtype=np.dtype(">u%d" % PP_WORD_DEPTH)) b = np.empty(shape=NUM_FLOAT_HEADERS, dtype=np.dtype(">f%d" % PP_WORD_DEPTH)) # Fill in the header elements from the PPField for name, pos in self.HEADER_DEFN: try: header_elem = getattr(self, name) except AttributeError: raise AttributeError("PPField.save() could not find %s" % name) if pos[0] <= NUM_LONG_HEADERS - UM_TO_PP_HEADER_OFFSET: index = slice(pos[0], pos[-1] + 1) if isinstance(header_elem, SplittableInt): header_elem = int(header_elem) lb[index] = header_elem else: index = slice(pos[0] - NUM_LONG_HEADERS, pos[-1] - NUM_LONG_HEADERS + 1) b[index] = header_elem # Although all of the elements are now populated, we still need to # update some of the elements in case # things have changed (for example, the data length etc.) # Set up a variable to represent the datalength of this PPField in # WORDS. len_of_data_payload = 0 # set up a list to hold the extra data which will need to be encoded # at the end of the data extra_items = [] # iterate through all of the possible extra data fields for ib, extra_data_attr_name in EXTRA_DATA.items(): # try to get the extra data field, returning None if it doesn't # exist extra_elem = getattr(self, extra_data_attr_name, None) if extra_elem is not None: # The special case of character extra data must be caught if isinstance(extra_elem, str): ia = len(extra_elem) # pad any strings up to a multiple of PP_WORD_DEPTH # (this length is # of bytes) ia = (PP_WORD_DEPTH - (ia - 1) % PP_WORD_DEPTH) + (ia - 1) extra_elem = extra_elem.ljust(ia, "\00") # ia is now the datalength in WORDS of the string ia //= PP_WORD_DEPTH else: # ia is the datalength in WORDS ia = np.prod(extra_elem.shape) # flip the byteorder if the data is not big-endian if extra_elem.dtype.newbyteorder(">") != extra_elem.dtype: # take a copy of the extra data when byte swapping extra_elem = extra_elem.byteswap(False) extra_elem.dtype = extra_elem.dtype.newbyteorder(">") # add the number of bytes to the len_of_data_payload variable # + the extra integer which will encode ia/ib len_of_data_payload += PP_WORD_DEPTH * ia + PP_WORD_DEPTH integer_code = 1000 * ia + ib extra_items.append([integer_code, extra_elem]) if ia >= 1000: raise IOError( "PP files cannot write extra data with more" ' than 1000 elements. Tried to write "%s"' " which has %s elements." % (extra_data_attr_name, ib) ) # populate lbext in WORDS lb[self.HEADER_DICT["lbext"][0]] = len_of_data_payload // PP_WORD_DEPTH # Put the data length of pp.data into len_of_data_payload (in BYTES) lbpack = lb[self.HEADER_DICT["lbpack"][0]] if lbpack == 0: len_of_data_payload += data.size * PP_WORD_DEPTH elif lbpack == 1: if mo_pack is not None: try: compress_wgdos = mo_pack.compress_wgdos except AttributeError: compress_wgdos = mo_pack.pack_wgdos packed_data = compress_wgdos( data.astype(np.float32), b[self.HEADER_DICT["bacc"][0] - 45], b[self.HEADER_DICT["bmdi"][0] - 45], ) len_of_data_payload += len(packed_data) else: msg = ( "Writing packed pp data with lbpack of {} " "requires mo_pack to be installed.".format(lbpack) ) raise NotImplementedError(msg) # populate lbrec in WORDS lb[self.HEADER_DICT["lblrec"][0]] = len_of_data_payload // PP_WORD_DEPTH # populate lbuser[0] to have the data's datatype if data.dtype == np.dtype(">f4"): lb[self.HEADER_DICT["lbuser"][0]] = 1 elif data.dtype == np.dtype(">f8"): warnings.warn( "Downcasting array precision from float64 to float32" " for save.If float64 precision is required then" " please save in a different format", category=_WarnComboLoadingDefaulting, ) data = data.astype(">f4") lb[self.HEADER_DICT["lbuser"][0]] = 1 elif data.dtype == np.dtype(">i4"): # NB: there is no physical difference between lbuser[0] of 2 or 3 # so we encode just 2 lb[self.HEADER_DICT["lbuser"][0]] = 2 else: raise IOError( "Unable to write data array to a PP file. " "The datatype was %s." % data.dtype ) # NB: lbegin, lbnrec, lbuser[1] not set up # Now that we have done the manouvering required, write to the file... if not hasattr(file_handle, "write"): raise TypeError( "The file_handle argument must be an instance of a" " Python file object, but got %r. \n e.g. " 'open(filename, "wb") to open a binary file with' " write permission." % type(file_handle) ) pp_file = file_handle # header length pp_file.write(struct.pack(">L", PP_HEADER_DEPTH)) # 45 integers lb.tofile(pp_file) # 19 floats b.tofile(pp_file) # Header length (again) pp_file.write(struct.pack(">L", PP_HEADER_DEPTH)) # Data length (including extra data length) pp_file.write(struct.pack(">L", int(len_of_data_payload))) # the data itself if lbpack == 0: data.tofile(pp_file) elif lbpack == 1: pp_file.write(packed_data) else: msg = "Writing packed pp data with lbpack of {} is not supported.".format( lbpack ) raise NotImplementedError(msg) # extra data elements for int_code, extra_data in extra_items: pp_file.write(struct.pack(">L", int(int_code))) if isinstance(extra_data, str): pp_file.write( struct.pack(">%ss" % len(extra_data), extra_data.encode()) ) else: extra_data = extra_data.astype(np.dtype(">f4")) extra_data.tofile(pp_file) # Data length (again) pp_file.write(struct.pack(">L", int(len_of_data_payload)))
############################################################## # # From here on define helper methods for PP -> Cube conversion. #
[docs] def time_unit(self, time_unit, epoch="epoch"): return cf_units.Unit("%s since %s" % (time_unit, epoch), calendar=self.calendar)
[docs] def coord_system(self): """Return a CoordSystem for this PPField. Returns ------- :class:`~iris.coord_systems.GeogCS` or :class:`~iris.coord_systems.RotatedGeogCS`. """ geog_cs = iris.coord_systems.GeogCS(EARTH_RADIUS) def degrees_ne(angle, ref_angle): """Return whether an angle differs significantly from a set value. The inputs are in degrees. The difference is judged significant if more than 0.0001 degrees. """ return abs(angle - ref_angle) > 0.0001 if degrees_ne(self.bplat, 90.0) or ( degrees_ne(self.bplon, 0.0) and degrees_ne(self.bplon, 180.0) ): # NOTE: when bplat,bplon=90,0 this encodes an unrotated system. # However, the rotated system which is *equivalent* to an unrotated # one actually has blat,bplon=90,180, due to a quirk in the # definition equations. # So we accept BPLON of 0 *or* 180 to mean 'unrotated'. geog_cs = iris.coord_systems.RotatedGeogCS( self.bplat, self.bplon, ellipsoid=geog_cs ) return geog_cs
def _x_coord_name(self): # TODO: Remove once we have the ability to derive this in the rules. x_name = "longitude" if isinstance(self.coord_system(), iris.coord_systems.RotatedGeogCS): x_name = "grid_longitude" return x_name def _y_coord_name(self): # TODO: Remove once we have the ability to derive this in the rules. y_name = "latitude" if isinstance(self.coord_system(), iris.coord_systems.RotatedGeogCS): y_name = "grid_latitude" return y_name
[docs] def copy(self): """Return a deep copy of this PPField. Returns ------- :class:`PPField`: A copy instance of the :class:`PPField`. """ return self._deepcopy({})
def __deepcopy__(self, memo): return self._deepcopy(memo) def _deepcopy(self, memo): field = self.__class__() for attr in self.__slots__: if hasattr(self, attr): value = getattr(self, attr) # Cope with inability to deepcopy a 0-d NumPy array. if attr == "_data" and value is not None and value.ndim == 0: setattr(field, attr, np.array(deepcopy(value[()], memo))) else: setattr(field, attr, deepcopy(value, memo)) return field def __eq__(self, other): result = NotImplemented if isinstance(other, PPField): result = True for attr in self.__slots__: attrs = [hasattr(self, attr), hasattr(other, attr)] if all(attrs): self_attr = getattr(self, attr) other_attr = getattr(other, attr) if not np.all(self_attr == other_attr): result = False break elif any(attrs): result = False break return result def __ne__(self, other): result = self.__eq__(other) if result is not NotImplemented: result = not result return result
class PPField2(PPField): """Hold a single field from a PP file, with a header release number of 2.""" HEADER_DEFN = _header_defn(2) HEADER_DICT = dict(HEADER_DEFN) __slots__ = _pp_attribute_names(HEADER_DEFN) @property def t1(self): """A cftime.datetime object. cftime.datetime object consisting of the lbyr, lbmon, lbdat, lbhr, and lbmin attributes. """ if not hasattr(self, "_t1"): has_year_zero = self.lbyr == 0 or None calendar = None if self.lbmon == 0 or self.lbdat == 0 else self.calendar self._t1 = cftime.datetime( self.lbyr, self.lbmon, self.lbdat, self.lbhr, self.lbmin, calendar=calendar, has_year_zero=has_year_zero, ) return self._t1 @t1.setter def t1(self, dt): self.lbyr = dt.year self.lbmon = dt.month self.lbdat = dt.day self.lbhr = dt.hour self.lbmin = dt.minute self.lbday = int(dt.strftime("%j")) if hasattr(self, "_t1"): delattr(self, "_t1") @property def t2(self): """A cftime.datetime object. cftime.datetime object consisting of the lbyrd, lbmond, lbdatd, lbhrd, and lbmind attributes. """ if not hasattr(self, "_t2"): has_year_zero = self.lbyrd == 0 or None calendar = None if self.lbmond == 0 or self.lbdatd == 0 else self.calendar self._t2 = cftime.datetime( self.lbyrd, self.lbmond, self.lbdatd, self.lbhrd, self.lbmind, calendar=calendar, has_year_zero=has_year_zero, ) return self._t2 @t2.setter def t2(self, dt): self.lbyrd = dt.year self.lbmond = dt.month self.lbdatd = dt.day self.lbhrd = dt.hour self.lbmind = dt.minute self.lbdayd = int(dt.strftime("%j")) if hasattr(self, "_t2"): delattr(self, "_t2") class PPField3(PPField): """Hold a single field from a PP file, with a header release number of 3.""" HEADER_DEFN = _header_defn(3) HEADER_DICT = dict(HEADER_DEFN) __slots__ = _pp_attribute_names(HEADER_DEFN) @property def t1(self): """A cftime.datetime object. cftime.datetime object consisting of the lbyr, lbmon, lbdat, lbhr, lbmin, and lbsec attributes. """ if not hasattr(self, "_t1"): has_year_zero = self.lbyr == 0 or None calendar = None if self.lbmon == 0 or self.lbdat == 0 else self.calendar self._t1 = cftime.datetime( self.lbyr, self.lbmon, self.lbdat, self.lbhr, self.lbmin, self.lbsec, calendar=calendar, has_year_zero=has_year_zero, ) return self._t1 @t1.setter def t1(self, dt): self.lbyr = dt.year self.lbmon = dt.month self.lbdat = dt.day self.lbhr = dt.hour self.lbmin = dt.minute self.lbsec = dt.second if hasattr(self, "_t1"): delattr(self, "_t1") @property def t2(self): """A cftime.datetime object. cftime.datetime object consisting of the lbyrd, lbmond, lbdatd, lbhrd, lbmind, and lbsecd attributes. """ if not hasattr(self, "_t2"): has_year_zero = self.lbyrd == 0 or None calendar = None if self.lbmond == 0 or self.lbdatd == 0 else self.calendar self._t2 = cftime.datetime( self.lbyrd, self.lbmond, self.lbdatd, self.lbhrd, self.lbmind, self.lbsecd, calendar=calendar, has_year_zero=has_year_zero, ) return self._t2 @t2.setter def t2(self, dt): self.lbyrd = dt.year self.lbmond = dt.month self.lbdatd = dt.day self.lbhrd = dt.hour self.lbmind = dt.minute self.lbsecd = dt.second if hasattr(self, "_t2"): delattr(self, "_t2") PP_CLASSES = {2: PPField2, 3: PPField3} def make_pp_field(header): # Choose a PP field class from the value of LBREL lbrel = header[21] if lbrel not in PP_CLASSES: raise ValueError("Unsupported header release number: {}".format(lbrel)) pp_field = PP_CLASSES[lbrel](header) return pp_field LoadedArrayBytes = collections.namedtuple("LoadedArrayBytes", "bytes, dtype")
[docs] def load(filename, read_data=False, little_ended=False): """Return an iterator of PPFields given a filename. Parameters ---------- filename : str String of the filename to load. read_data : bool, default=False Flag whether or not the data should be read, if False an empty data manager will be provided which can subsequently load the data on demand. Default False. little_ended : bool, default=False If True, file contains all little-ended words (header and data). Notes ----- To iterate through all of the fields in a pp file:: for field in iris.fileformats.pp.load(filename): print(field) """ return _interpret_fields( _field_gen(filename, read_data_bytes=read_data, little_ended=little_ended) )
def _interpret_fields(fields): """Turn the fields read with load and FF2PP._extract_field into usable fields. Turn the fields read with load and FF2PP._extract_field into usable fields. One of the primary purposes of this function is to either convert "deferred bytes" into "deferred arrays" or "loaded bytes" into actual numpy arrays (via the _create_field_data) function. """ land_mask_field = None landmask_compressed_fields = [] for field in fields: # Store the first reference to a land mask, and use this as the # definitive mask for future fields in this generator. if ( land_mask_field is None and field.lbuser[6] == 1 and (field.lbuser[3] // 1000) == 0 and (field.lbuser[3] % 1000) == 30 ): land_mask_field = field # Handle land compressed data payloads, when lbpack.n2 is 2. if (field.raw_lbpack // 10 % 10) == 2: # Field uses land-mask packing, so needs a related land-mask field. if land_mask_field is None: landmask_compressed_fields.append(field) # Land-masked fields have their size+shape defined by the # reference landmask field, so we can't yield them if they # are encountered *before* the landmask. # In that case, defer them, and process them all afterwards at # the end of the file. continue # Land-mask compressed fields don't have an lbrow and lbnpt. field.lbrow, field.lbnpt = ( land_mask_field.lbrow, land_mask_field.lbnpt, ) _create_field_data( field, (field.lbrow, field.lbnpt), land_mask_field=land_mask_field, ) else: # Field does not use land-mask packing. _create_field_data(field, (field.lbrow, field.lbnpt)) yield field # At file end, return any land-masked fields that were deferred because # they were encountered before the landmask reference field. if landmask_compressed_fields: if land_mask_field is None: warnings.warn( "Landmask compressed fields existed without a " "landmask to decompress with. The data will have " "a shape of (0, 0) and will not read.", category=iris.warnings.IrisLoadWarning, ) mask_shape = (0, 0) else: mask_shape = (land_mask_field.lbrow, land_mask_field.lbnpt) for field in landmask_compressed_fields: field.lbrow, field.lbnpt = mask_shape _create_field_data(field, mask_shape, land_mask_field=land_mask_field) yield field def _create_field_data(field, data_shape, land_mask_field=None): """Modify a field's ``_data`` attribute. Modify a field's ``_data`` attribute either by: * converting a 'deferred array bytes' tuple into a lazy array, * converting LoadedArrayBytes into an actual numpy array. If 'land_mask_field' is passed (not None), then it contains the associated landmask, which is also a field : Its data array is used as a template for `field`'s data, determining its size, shape and the locations of all the valid (non-missing) datapoints. """ if isinstance(field.core_data(), LoadedArrayBytes): loaded_bytes = field.core_data() field.data = _data_bytes_to_shaped_array( loaded_bytes.bytes, field.lbpack, field.boundary_packing, data_shape, loaded_bytes.dtype, field.bmdi, land_mask_field, ) else: # Wrap the reference to the data payload within a data proxy # in order to support deferred data loading. fname, position, n_bytes, dtype = field.core_data() proxy = PPDataProxy( data_shape, dtype, fname, position, n_bytes, field.raw_lbpack, field.boundary_packing, field.bmdi, ) block_shape = data_shape if 0 not in data_shape else (1, 1) if land_mask_field is None: # For a "normal" (non-landsea-masked) field, the proxy can be # wrapped directly as a deferred array. field.data = as_lazy_data(proxy, chunks=block_shape) else: # This is a landsea-masked field, and its data must be handled in # a different way : Because data shape/size is not known in # advance, the data+mask calculation can't be represented # as a dask-array operation. Instead, we make that calculation # 'delayed', and then use 'from_delayed' to make the result back # into a dask array -- because the final result shape *is* known. @dask.delayed def fetch_valid_values_array(): # Return the data values array (shape+size unknown). return proxy[:] delayed_valid_values = fetch_valid_values_array() # Get the mask data-array from the landsea-mask field. # This is *either* a lazy or a real array, we don't actually care. # If this is a deferred dependency, the delayed calc can see that. mask_field_array = land_mask_field.core_data() # Check whether this field uses a land or a sea mask. if field.lbpack.n3 not in (1, 2): raise ValueError( "Unsupported mask compression : lbpack.n3 = {}.".format( field.lbpack.n3 ) ) if field.lbpack.n3 == 2: # Sea-mask packing : points are inverse of the land-mask. mask_field_array = ~mask_field_array # Define the mask+data calculation as a deferred operation. # NOTE: duplicates the operation in _data_bytes_to_shaped_array. @dask.delayed def calc_array(mask, values): # Note: "mask" is True at *valid* points, not missing ones. # First ensure the mask array is boolean (not int). mask = mask.astype(bool) result = ma.masked_all(mask.shape, dtype=dtype) # Apply the fill-value from the proxy object. # Note: 'values' is just 'proxy' in a dask wrapper. This arg # must be a dask type so that 'delayed' can recognise it, but # that provides no access to the underlying fill value. result.fill_value = proxy.mdi n_values = np.sum(mask) if n_values > 0: # Note: data field can have excess values, but not fewer. result[mask] = values[:n_values] return result delayed_result = calc_array(mask_field_array, delayed_valid_values) lazy_result_array = da.from_delayed( delayed_result, shape=block_shape, dtype=dtype ) field.data = lazy_result_array def _field_gen(filename, read_data_bytes, little_ended=False): """Return generator of "half-formed" PPField instances derived from given filename. A field returned by the generator is only "half-formed" because its `_data` attribute represents a simple one-dimensional stream of bytes. (Encoded either as an instance of LoadedArrayBytes or as a 'deferred array bytes' tuple, depending on the value of `read_data_bytes`.) This is because fields encoded with a land/sea mask do not contain sufficient information within the field to determine the final two-dimensional shape of the data. The callers of this routine are the 'load' routines (both PP and FF). They both filter the resulting stream of fields with `_interpret_fields`, which replaces each field.data with an actual array (real or lazy). This is done separately so that `_interpret_fields` can detect land-mask fields and use them to construct data arrays for any fields which use land/sea-mask packing. """ dtype_endian_char = "<" if little_ended else ">" with open(filename, "rb") as pp_file: # Get a reference to the seek method on the file # (this is accessed 3* #number of headers so can provide a small # performance boost) pp_file_seek = pp_file.seek pp_file_read = pp_file.read field_count = 0 # Keep reading until we reach the end of file while True: # Move past the leading header length word pp_file_seek(PP_WORD_DEPTH, os.SEEK_CUR) # Get the LONG header entries dtype = "%ci%d" % (dtype_endian_char, PP_WORD_DEPTH) header_longs = np.fromfile(pp_file, dtype=dtype, count=NUM_LONG_HEADERS) # Nothing returned => EOF if len(header_longs) == 0: break # Get the FLOAT header entries dtype = "%cf%d" % (dtype_endian_char, PP_WORD_DEPTH) header_floats = np.fromfile(pp_file, dtype=dtype, count=NUM_FLOAT_HEADERS) header = tuple(header_longs) + tuple(header_floats) # Make a PPField of the appropriate sub-class (depends on header # release number) try: pp_field = make_pp_field(header) except ValueError as e: msg = ( "Unable to interpret field {}. {}. Skipping " "the remainder of the file.".format(field_count, str(e)) ) warnings.warn( msg, category=_WarnComboIgnoringLoad, ) break # Skip the trailing 4-byte word containing the header length pp_file_seek(PP_WORD_DEPTH, os.SEEK_CUR) # Read the word telling me how long the data + extra data is # This value is # of bytes len_of_data_plus_extra = struct.unpack_from( "%cL" % dtype_endian_char, pp_file_read(PP_WORD_DEPTH) )[0] if len_of_data_plus_extra != pp_field.lblrec * PP_WORD_DEPTH: wmsg = ( "LBLREC has a different value to the integer recorded " "after the header in the file ({} and {}). " "Skipping the remainder of the file." ) warnings.warn( wmsg.format( pp_field.lblrec * PP_WORD_DEPTH, len_of_data_plus_extra ), category=_WarnComboIgnoringLoad, ) break # calculate the extra length in bytes extra_len = pp_field.lbext * PP_WORD_DEPTH # Derive size and datatype of payload data_len = len_of_data_plus_extra - extra_len dtype = LBUSER_DTYPE_LOOKUP.get( pp_field.lbuser[0], LBUSER_DTYPE_LOOKUP["default"] ) if little_ended: # Change data dtype for a little-ended file. dtype = str(dtype) if dtype[0] != ">": msg = "Unexpected dtype {!r} can't be converted to little-endian" raise ValueError(msg) dtype = np.dtype("<" + dtype[1:]) if read_data_bytes: # Read the actual bytes. This can then be converted to a numpy # array at a higher level. pp_field.data = LoadedArrayBytes(pp_file.read(data_len), dtype) else: # Provide enough context to read the data bytes later on, # as a 'deferred array bytes' tuple. # N.B. this used to be a namedtuple called DeferredArrayBytes, # but it no longer is. Possibly for performance reasons? pp_field.data = (filename, pp_file.tell(), data_len, dtype) # Seek over the actual data payload. pp_file_seek(data_len, os.SEEK_CUR) # Do we have any extra data to deal with? if extra_len: pp_field._read_extra_data( pp_file, pp_file_read, extra_len, little_ended=little_ended ) # Skip that last 4 byte record telling me the length of the field I # have already read pp_file_seek(PP_WORD_DEPTH, os.SEEK_CUR) field_count += 1 yield pp_field # Stash codes not to be filtered (reference altitude and pressure fields). _STASH_ALLOW = [STASH(1, 0, 33), STASH(1, 0, 1)] def _convert_constraints(constraints): """Convert known constraints from Iris semantics to PP semantics. Convert known constraints from Iris semantics to PP semantics ignoring all unknown constraints. """ constraints = iris._constraints.list_of_constraints(constraints) pp_constraints = {} unhandled_constraints = False def _make_func(stashobj): """Provide unique name-space for each lambda function's stashobj variable.""" return lambda stash: stash == stashobj for con in constraints: if isinstance(con, iris.AttributeConstraint) and list( con._attributes.keys() ) == ["STASH"]: # Convert a STASH constraint. # The attribute can be a STASH object, a stashcode string, or a # callable. stashobj = con._attributes["STASH"] if callable(stashobj): call_func = stashobj elif isinstance(stashobj, (str, STASH)): call_func = _make_func(stashobj) else: raise TypeError( "STASH constraints should be either a" " callable, string or STASH object" ) if "stash" not in pp_constraints: pp_constraints["stash"] = [call_func] else: pp_constraints["stash"].append(call_func) else: # only keep the pp constraints set if they are all handled as # pp constraints unhandled_constraints = True def pp_filter(field): """Return True if field is to be kept, False if field does not match filter.""" res = True if field.stash not in _STASH_ALLOW: if pp_constraints.get("stash"): res = False for call_func in pp_constraints["stash"]: if call_func(str(field.stash)): res = True break return res if pp_constraints and not unhandled_constraints: result = pp_filter else: result = None return result
[docs] def load_cubes(filenames, callback=None, constraints=None): """Load cubes from a list of pp filenames. Parameters ---------- filenames : List of pp filenames to load. callback : optional A function which can be passed on to :func:`iris.io.run_callback`. constraints : optional A list of Iris constraints. Notes ----- The resultant cubes may not be in the order that they are in the file (order is not preserved when there is a field with orography references) """ return _load_cubes_variable_loader( filenames, callback, load, constraints=constraints )
def load_cubes_little_endian(filenames, callback=None, constraints=None): """Load cubes from a list of pp filenames containing little-endian data. Parameters ---------- filenames : List of pp filenames to load. callback : optional A function which can be passed on to :func:`iris.io.run_callback`. constraints : optional A list of Iris constraints. Notes ----- The resultant cubes may not be in the order that they are in the file (order is not preserved when there is a field with orography references). """ return _load_cubes_variable_loader( filenames, callback, load, {"little_ended": True}, constraints=constraints, )
[docs] def load_pairs_from_fields(pp_fields): r"""Convert an iterable of PP fields into an iterable of tuples of (Cubes, PPField). Parameters ---------- pp_fields : An iterable of :class:`iris.fileformats.pp.PPField`. Returns ------- :class:`iris.cube.Cube` An iterable of :class:`iris.cube.Cube`\ s. Notes ----- This capability can be used to filter out fields before they are passed to the load pipeline, and amend the cubes once they are created, using PP metadata conditions. Where this filtering removes a significant number of fields, the speed up to load can be significant: >>> import iris >>> from iris.fileformats.pp import load_pairs_from_fields >>> filename = iris.sample_data_path('E1.2098.pp') >>> filtered_fields = [] >>> for field in iris.fileformats.pp.load(filename): ... if field.lbproc == 128: ... filtered_fields.append(field) >>> cube_field_pairs = load_pairs_from_fields(filtered_fields) >>> for cube, field in cube_field_pairs: ... cube.attributes['lbproc'] = field.lbproc ... print(cube.attributes['lbproc']) 128 This capability can also be used to alter fields before they are passed to the load pipeline. Fields with out of specification header elements can be cleaned up this way and cubes created: >>> filename = iris.sample_data_path('E1.2098.pp') >>> cleaned_fields = list(iris.fileformats.pp.load(filename)) >>> for field in cleaned_fields: ... if field.lbrel == 0: ... field.lbrel = 3 >>> cubes_field_pairs = list(load_pairs_from_fields(cleaned_fields)) """ load_pairs_from_fields = iris.fileformats.rules.load_pairs_from_fields return load_pairs_from_fields(pp_fields, iris.fileformats.pp_load_rules.convert)
def _load_cubes_variable_loader( filenames, callback, loading_function, loading_function_kwargs=None, constraints=None, ): import iris.fileformats.um._fast_load as um_fast_load pp_filter = None if constraints is not None: pp_filter = _convert_constraints(constraints) if um_fast_load.STRUCTURED_LOAD_CONTROLS.loads_use_structured: # For structured loads, pass down the pp_filter function as an extra # keyword to the low-level generator function. loading_function_kwargs = loading_function_kwargs or {} loading_function_kwargs["pp_filter"] = pp_filter # Also do *not* use this filter in generic rules processing, as for # structured loading, the 'field' of rules processing is no longer a # PPField but a FieldCollation. pp_filter = None # Make a loader object for the generic rules code. loader = iris.fileformats.rules.Loader( um_fast_load._basic_load_function, loading_function_kwargs, um_fast_load._convert_collation, ) else: loader = iris.fileformats.rules.Loader( loading_function, loading_function_kwargs or {}, iris.fileformats.pp_load_rules.convert, ) result = iris.fileformats.rules.load_cubes(filenames, callback, loader, pp_filter) if um_fast_load.STRUCTURED_LOAD_CONTROLS.loads_use_structured: # We need an additional concatenate-like operation to combine cubes # from different files. Unfortunately, the 'merge' call provided in # the iris_load_xx functions cannot do this. result = um_fast_load._combine_structured_cubes(result) return result
[docs] def save(cube, target, append=False, field_coords=None): """Use the PP saving rules (and any user rules) to save a cube to a PP file. Parameters ---------- cube : :class:`iris.cube.Cube` target A filename or open file handle. append : bool, default=False Whether to start a new file afresh or add the cube(s) to the end of the file. Only applicable when target is a filename, not a file handle. Default is False. field_coords : optional List of 2 coords or coord names which are to be used for reducing the given cube into 2d slices, which will ultimately determine the x and y coordinates of the resulting fields. If None, the final two dimensions are chosen for slicing. Notes ----- See also :func:`iris.io.save`. Note that :func:`iris.save` is the preferred method of saving. This allows a :class:`iris.cube.CubeList` or a sequence of cubes to be saved to a PP file. """ fields = as_fields(cube, field_coords) save_fields(fields, target, append=append)
[docs] def save_pairs_from_cube(cube, field_coords=None): """Use the PP saving rules to generate (2D cube, PP field) pairs from a cube. Parameters ---------- cube : A :class:`iris.cube.Cube`. field_coords : optional List of 2 coords or coord names which are to be used for reducing the given cube into 2d slices, which will ultimately determine the x and y coordinates of the resulting fields. If None, the final two dimensions are chosen for slicing. Yields ------ :class:`iris.cube.Cube`, :class:`iris.fileformats.pp.PPField`. 2-dimensional slices of the input cube together with their associated pp-fields. """ # Open issues # Could use rules in "sections" ... e.g. to process the extensive # dimensions; ...? # Could pre-process the cube to add extra convenient terms? # e.g. x-coord, y-coord ... but what about multiple coordinates on the # dimension? # How to perform the slicing? # Do we always slice in the last two dimensions? # Not all source data will contain lat-lon slices. # What do we do about dimensions with multiple coordinates? # Deal with: # LBLREC - Length of data record in words (incl. extra data) # Done on save(*) # LBUSER[0] - Data type # Done on save(*) # LBUSER[1] - Start address in DATA (?! or just set to "null"?) # BLEV - Level - the value of the coordinate for LBVC # *) With the current on-save way of handling LBLREC and LBUSER[0] we can't # check if they've been set correctly without *actually* saving as a binary # PP file. That also means you can't use the same reference.txt file for # loaded vs saved fields (unless you re-load the saved field!). # Set to (or leave as) "null": # LBEGIN - Address of start of field in direct access dataset # LBEXP - Experiment identification # LBPROJ - Fields file projection number # LBTYP - Fields file field type code # LBLEV - Fields file level code / hybrid height model level # Build confidence by having a PP object that records which header items # have been set, and only saves if they've all been set? # Watch out for extra-data. # On the flip side, record which Cube metadata has been "used" and flag up # unused? n_dims = len(cube.shape) if n_dims < 2: raise ValueError("Unable to save a cube of fewer than 2 dimensions.") if field_coords is not None: # cast the given coord/coord names into cube coords field_coords = cube._as_list_of_coords(field_coords) if len(field_coords) != 2: raise ValueError( "Got %s coordinates in field_coords, expecting" " exactly 2." % len(field_coords) ) else: # default to the last two dimensions # (if result of coords is an empty list, will raise an IndexError) # NB watch out for the ordering of the dimensions field_coords = ( cube.coords(dimensions=n_dims - 2)[0], cube.coords(dimensions=n_dims - 1)[0], ) # Save each named or latlon slice2D in the cube for slice2D in cube.slices(field_coords): # Start with a blank PPField pp_field = PPField3() # Set all items to 0 because we need lbuser, lbtim # and some others to be present before running the rules. for name, positions in pp_field.HEADER_DEFN: # Establish whether field name is integer or real default = ( 0 if positions[0] <= NUM_LONG_HEADERS - UM_TO_PP_HEADER_OFFSET else 0.0 ) # Establish whether field position is scalar or composite if len(positions) > 1: default = [default] * len(positions) setattr(pp_field, name, default) # Some defaults should not be 0 pp_field.lbrel = 3 # Header release 3. pp_field.lbcode = 1 # Grid code. pp_field.bmks = 1.0 # Some scaley thing. pp_field.lbproc = 0 # Set the missing-data value to the standard default value. # The save code uses this header word to fill masked data points. pp_field.bmdi = -1e30 # From UM doc F3: "Set to -99 if LBEGIN not known" pp_field.lbuser[1] = -99 # Set the data, keeping it lazy where possible. pp_field.data = slice2D.core_data() # Run the PP save rules on the slice2D, to fill the PPField, # recording the rules that were used pp_field = verify(slice2D, pp_field) yield (slice2D, pp_field)
[docs] def as_fields(cube, field_coords=None): """Use the PP saving rules to convert a cube to an iterable of PP fields. Use the PP saving rules (and any user rules) to convert a cube to an iterable of PP fields. Parameters ---------- cube : :class:`iris.cube.Cube` field_coords : optional List of 2 coords or coord names which are to be used for reducing the given cube into 2d slices, which will ultimately determine the x and y coordinates of the resulting fields. If None, the final two dimensions are chosen for slicing. """ return (field for _, field in save_pairs_from_cube(cube, field_coords=field_coords))
[docs] def save_fields(fields, target, append: bool = False): """Save an iterable of PP fields to a PP file. Parameters ---------- fields : An iterable of PP fields. target : A filename or open file handle. append : bool, default=False Whether to start a new file afresh or add the cube(s) to the end of the file. Only applicable when target is a filename, not a file handle. Default is False. See Also -------- iris.io.save : Save one or more Cubes to file (or other writeable). """ # Open issues # Deal with: # LBLREC - Length of data record in words (incl. extra data) # Done on save(*) # LBUSER[0] - Data type # Done on save(*) # LBUSER[1] - Start address in DATA (?! or just set to "null"?) # BLEV - Level - the value of the coordinate for LBVC # *) With the current on-save way of handling LBLREC and LBUSER[0] we can't # check if they've been set correctly without *actually* saving as a binary # PP file. That also means you can't use the same reference.txt file for # loaded vs saved fields (unless you re-load the saved field!). # Set to (or leave as) "null": # LBEGIN - Address of start of field in direct access dataset # LBEXP - Experiment identification # LBPROJ - Fields file projection number # LBTYP - Fields file field type code # LBLEV - Fields file level code / hybrid height model level if isinstance(target, str): pp_file = open(target, "ab" if append else "wb") elif hasattr(target, "write"): if hasattr(target, "mode") and "b" not in target.mode: raise ValueError("Target not binary") pp_file = target else: raise ValueError("Can only save pp to filename or writable") try: # Save each field for pp_field in fields: # Write to file pp_field.save(pp_file) finally: if isinstance(target, str): pp_file.close()