Source code for iris.fileformats.nimrod

# Copyright Iris contributors
# This file is part of Iris and is released under the BSD license.
# See LICENSE in the root of the repository for full licensing details.
"""Provides NIMROD file format capabilities."""

import glob
import os
import struct
import sys

import numpy as np

import iris
from iris.exceptions import TranslationError
import iris.fileformats.nimrod_load_rules

# general header (int16) elements 1-31 (Fortran bytes 1-62)
general_header_int16s = (

# general header (float32) elements 32-59 (Fortran bytes 63-174)
general_header_float32s = (

# data specific header (float32) elements 60-104 (Fortran bytes 175-354)
data_header_float32s = (

# data specific header (char) elements 105-107 (bytes 355-410)
# units, source and title

# data specific header (int16) elements 108-159 (Fortran bytes 411-512)
data_header_int16s = (

def _read_chars(infile, num):
    """Read characters from the (big-endian) file."""
    instr =
    result = struct.unpack(">%ds" % num, instr)[0]
    result = result.decode()
    return result

[docs] class NimrodField: """A data field from a NIMROD file. Capable of converting itself into a :class:`~iris.cube.Cube` References ---------- Met Office (2003): Met Office Rain Radar Data from the NIMROD System. NCAS British Atmospheric Data Centre, date of citation. """ def __init__(self, from_file=None): """Create a NimrodField object and optionally read from an open file. Example:: with open("nimrod_file", "rb") as infile: field = NimrodField(infile) """ if from_file is not None:
[docs] def read(self, infile): """Read the next field from the given file object.""" self._read_header(infile) self._read_data(infile)
def _read_header_subset(self, infile, names, dtype): # Read contiguous header items of the same data type. values = np.fromfile(infile, dtype=dtype, count=len(names)) if sys.byteorder == "little": values.byteswap(True) for i, name in enumerate(names): setattr(self, name, values[i]) def _read_header(self, infile): """Load the 512 byte header (surrounded by 4-byte length).""" leading_length = struct.unpack(">L",[0] if leading_length != 512: raise TranslationError("Expected header leading_length of 512") # general header (int16) elements 1-31 (bytes 1-62) self._read_header_subset(infile, general_header_int16s, np.int16) # general header (float32) elements 32-59 (bytes 63-174) self._read_header_subset(infile, general_header_float32s, np.float32) # skip unnamed floats * (28 - len(general_header_float32s)), os.SEEK_CUR) # data specific header (float32) elements 60-104 (bytes 175-354) self._read_header_subset(infile, data_header_float32s, np.float32) # skip unnamed floats * (45 - len(data_header_float32s)), os.SEEK_CUR) # data specific header (char) elements 105-107 (bytes 355-410) self.units = _read_chars(infile, 8) self.source = _read_chars(infile, 24) self.title = _read_chars(infile, 24) # data specific header (int16) elements 108- (bytes 411-512) self._read_header_subset(infile, data_header_int16s, np.int16) # skip unnamed int16s * (51 - len(data_header_int16s)), os.SEEK_CUR) trailing_length = struct.unpack(">L",[0] if trailing_length != leading_length: raise TranslationError( "Expected header trailing_length of {}, got {}.".format( leading_length, trailing_length ) ) def _read_data(self, infile): """Read the data array: int8, int16, int32 or float32. (surrounded by 4-byte length, at start and end) """ # what are we expecting? num_data = int(self.num_rows) * int(self.num_cols) num_data_bytes = int(num_data) * int(self.datum_len) # format string for unpacking the # 0:real if self.datum_type == 0: numpy_dtype = np.float32 # 1:int elif self.datum_type == 1: if self.datum_len == 1: numpy_dtype = np.int8 elif self.datum_len == 2: numpy_dtype = np.int16 elif self.datum_len == 4: numpy_dtype = np.int32 else: raise TranslationError("Undefined datum length %d" % self.datum_type) # 2:byte elif self.datum_type == 2: numpy_dtype = np.byte else: raise TranslationError("Undefined data type") leading_length = struct.unpack(">L",[0] if leading_length != num_data_bytes: raise TranslationError( "Expected data leading_length of %d" % num_data_bytes ) = np.fromfile(infile, dtype=numpy_dtype, count=num_data) if sys.byteorder == "little": trailing_length = struct.unpack(">L",[0] if trailing_length != leading_length: raise TranslationError( "Expected data trailing_length of %d" % num_data_bytes ) # Form the correct shape. =, self.num_cols)
[docs] def load_cubes(filenames, callback=None): """Load cubes from a list of NIMROD filenames. Parameters ---------- filenames : List of NIMROD filenames to load. callback : optional A function which can be passed on to :func:``. Notes ----- The resultant cubes may not be in the same order as in the files. """ if isinstance(filenames, str): filenames = [filenames] for filename in filenames: for path in glob.glob(filename): with open(path, "rb") as infile: while True: try: field = NimrodField(infile) except struct.error: # End of file. Move on to the next file. break cube = # Were we given a callback? if callback is not None: cube =, cube, field, filename) if cube is None: continue yield cube