Source code for iris.analysis.trajectory

# Copyright Iris contributors
# This file is part of Iris and is released under the BSD license.
# See LICENSE in the root of the repository for full licensing details.
"""Defines a Trajectory class, and a routine to extract a sub-cube along a trajectory."""

import math

import numpy as np
from scipy.spatial import cKDTree

import iris.coords

class _Segment:
    """A single trajectory line segment.

    Two points, as described in the Trajectory class.

    def __init__(self, p0, p1):
        # check keys
        if sorted(p0.keys()) != sorted(p1.keys()):
            raise ValueError("keys do not match")

        self.pts = [p0, p1]

        # calculate our length
        squares = 0
        for key in self.pts[0].keys():
            delta = self.pts[1][key] - self.pts[0][key]
            squares += delta * delta
        self.length = math.sqrt(squares)

[docs] class Trajectory: """A series of given waypoints with pre-calculated sample points.""" def __init__(self, waypoints, sample_count=10): """Define a trajectory using a sequence of waypoints. Parameters ---------- waypoints : A sequence of dictionaries, mapping coordinate names to values. sample_count : int, default=10 The number of sample positions to use along the trajectory. Examples -------- :: waypoints = [{'latitude': 45, 'longitude': -60}, {'latitude': 45, 'longitude': 0}] Trajectory(waypoints) .. note:: All the waypoint dictionaries must contain the same coordinate names. """ self.waypoints = waypoints self.sample_count = sample_count # create line segments from the waypoints segments = [ _Segment(self.waypoints[i], self.waypoints[i + 1]) for i in range(len(self.waypoints) - 1) ] # calculate our total length self.length = sum([seg.length for seg in segments]) # generate our sampled points # The trajectory points, as dictionaries of {coord_name: value}. self.sampled_points = [] sample_step = self.length / (self.sample_count - 1) # start with the first segment cur_seg_i = 0 cur_seg = segments[cur_seg_i] len_accum = cur_seg.length for p in range(self.sample_count): # calculate the sample position along our total length sample_at_len = p * sample_step # skip forward to the containing segment while len_accum < sample_at_len and cur_seg_i < len(segments): cur_seg_i += 1 cur_seg = segments[cur_seg_i] len_accum += cur_seg.length # how far through the segment is our sample point? seg_start_len = len_accum - cur_seg.length seg_frac = (sample_at_len - seg_start_len) / cur_seg.length # sample each coordinate in this segment, to create a new # sampled point new_sampled_point = {} for key in cur_seg.pts[0].keys(): seg_coord_delta = cur_seg.pts[1][key] - cur_seg.pts[0][key] new_sampled_point.update( {key: cur_seg.pts[0][key] + seg_frac * seg_coord_delta} ) # add this new sampled point self.sampled_points.append(new_sampled_point) def __repr__(self): return "Trajectory(%s, sample_count=%s)" % ( self.waypoints, self.sample_count, ) def _get_interp_points(self): """Translate `self.sampled_points` to the format expected by the interpolator. Returns ------- `self.sampled points` `self.sampled points` in the format required by `:func:`~iris.analysis.trajectory.interpolate`. """ points = { k: [point_dict[k] for point_dict in self.sampled_points] for k in self.sampled_points[0].keys() } return [(k, v) for k, v in points.items()] def _src_cube_anon_dims(self, cube): """Locate the index of anonymous dimensions. A helper method to locate the index of anonymous dimensions on the interpolation target, ``cube``. Returns ------- The index of any anonymous dimensions in ``cube``. """ named_dims = [cube.coord_dims(c)[0] for c in cube.dim_coords] return list(set(range(cube.ndim)) - set(named_dims))
[docs] def interpolate(self, cube, method=None): """Interpolate ``cube`` on the defined trajectory. Call :func:`~iris.analysis.trajectory.interpolate` to interpolate ``cube`` on the defined trajectory. Assumes that the coordinate names supplied in the waypoints dictionaries match to coordinate names in `cube`, and that points are supplied in the same coord_system as in `cube`, where appropriate (i.e. for horizontal coordinate points). Parameters ---------- cube : The source Cube to interpolate. method : optional The interpolation method to use; "linear" (default) or "nearest". Only nearest is available when specifying multi-dimensional coordinates. """ sample_points = self._get_interp_points() interpolated_cube = interpolate(cube, sample_points, method=method) # Add an "index" coord to name the anonymous dimension produced by # the interpolation, if present. if len(interpolated_cube.dim_coords) < interpolated_cube.ndim: # Add a new coord `index` to describe the new dimension created by # interpolating. index_coord = iris.coords.DimCoord( range(self.sample_count), long_name="index" ) # Make sure anonymous dims in `cube` do not mistakenly get labelled # as the new `index` dimension created by interpolating. src_anon_dims = self._src_cube_anon_dims(cube) interp_anon_dims = self._src_cube_anon_dims(interpolated_cube) (anon_dim_index,) = list(set(interp_anon_dims) - set(src_anon_dims)) # Add the new coord to the interpolated cube. interpolated_cube.add_dim_coord(index_coord, anon_dim_index) return interpolated_cube
[docs] def interpolate(cube, sample_points, method=None): """Extract a sub-cube at the given n-dimensional points. Parameters ---------- cube : The source Cube. sample_points : A sequence of coordinate (name) - values pairs. method : optional Request "linear" interpolation (default) or "nearest" neighbour. Only nearest neighbour is available when specifying multi-dimensional coordinates. Examples -------- :: sample_points = [('latitude', [45, 45, 45]), ('longitude', [-60, -50, -40])] interpolated_cube = interpolate(cube, sample_points) Notes ----- This function does not maintain laziness when called; it realises data. See more at :doc:`/userguide/real_and_lazy_data`. """ from iris.analysis import Linear if method not in [None, "linear", "nearest"]: raise ValueError("Unhandled interpolation specified : %s" % method) # Convert any coordinate names to coords points = [] for coord, values in sample_points: if isinstance(coord, str): coord = cube.coord(coord) points.append((coord, values)) sample_points = points # Do all value sequences have the same number of values? coord, values = sample_points[0] trajectory_size = len(values) for coord, values in sample_points[1:]: if len(values) != trajectory_size: raise ValueError("Lengths of coordinate values are inconsistent.") # Which dimensions are we squishing into the last dimension? squish_my_dims = set() for coord, values in sample_points: dims = cube.coord_dims(coord) for dim in dims: squish_my_dims.add(dim) # Derive the new cube's shape by filtering out all the dimensions we're # about to sample, # and then adding a new dimension to accommodate all the sample points. remaining = [ (dim, size) for dim, size in enumerate(cube.shape) if dim not in squish_my_dims ] new_data_shape = [size for dim, size in remaining] new_data_shape.append(trajectory_size) # Start with empty data and then fill in the "column" of values for each # trajectory point. new_cube = iris.cube.Cube(np.empty(new_data_shape)) new_cube.metadata = cube.metadata # Derive the mapping from the non-trajectory source dimensions to their # corresponding destination dimensions. remaining_dims = [dim for dim, size in remaining] dimension_remap = {dim: i for i, dim in enumerate(remaining_dims)} # Record a mapping from old coordinate IDs to new coordinates, # for subsequent use in creating updated aux_factories. coord_mapping = {} # Create all the non-squished coords for coord in cube.dim_coords: src_dims = cube.coord_dims(coord) if squish_my_dims.isdisjoint(src_dims): dest_dims = [dimension_remap[dim] for dim in src_dims] new_coord = coord.copy() new_cube.add_dim_coord(new_coord, dest_dims) coord_mapping[id(coord)] = new_coord for coord in cube.aux_coords: src_dims = cube.coord_dims(coord) if squish_my_dims.isdisjoint(src_dims): dest_dims = [dimension_remap[dim] for dim in src_dims] new_coord = coord.copy() new_cube.add_aux_coord(new_coord, dest_dims) coord_mapping[id(coord)] = new_coord # Create all the squished (non derived) coords, not filled in yet. trajectory_dim = len(remaining_dims) for coord in cube.dim_coords + cube.aux_coords: src_dims = cube.coord_dims(coord) if not squish_my_dims.isdisjoint(src_dims): points = np.array([coord.points.flatten()[0]] * trajectory_size) new_coord = iris.coords.AuxCoord( points, var_name=coord.var_name, standard_name=coord.standard_name, long_name=coord.long_name, units=coord.units, bounds=None, attributes=coord.attributes, coord_system=coord.coord_system, ) new_cube.add_aux_coord(new_coord, trajectory_dim) coord_mapping[id(coord)] = new_coord for factory in cube.aux_factories: new_cube.add_aux_factory(factory.updated(coord_mapping)) # Are the given coords all 1-dimensional? (can we do linear interp?) for coord, values in sample_points: if coord.ndim > 1: if method == "linear": msg = ( "Cannot currently perform linear interpolation for " "multi-dimensional coordinates." ) raise iris.exceptions.CoordinateMultiDimError(msg) method = "nearest" break if method in ["linear", None]: # Using cube.interpolate will generate extra values that we don't need # as it makes a grid from the provided coordinates (like a meshgrid) # and then does interpolation for all of them. This is memory # inefficient, but significantly more time efficient than calling # cube.interpolate (or the underlying method on the interpolator) # repeatedly, so using this approach for now. In future, it would be # ideal if we only interpolated at the points we care about columns = cube.interpolate(sample_points, Linear()) # np.einsum(a, [0, 0], [0]) is like np.diag(a) # We're using einsum here to do an n-dimensional diagonal, leaving the # other dimensions unaffected and putting the diagonal's direction on # the final axis initial_inds = list(range(1, columns.ndim + 1)) for ind in squish_my_dims: initial_inds[ind] = 0 final_inds = list(filter(lambda x: x != 0, initial_inds)) + [0] = np.einsum(, initial_inds, final_inds) # Fill in the empty squashed (non derived) coords. # We're using the same einstein summation plan as for the cube, but # redoing those indices to match the indices in the coordinates for columns_coord in columns.dim_coords + columns.aux_coords: src_dims = cube.coord_dims(columns_coord) if not squish_my_dims.isdisjoint(src_dims): # Mapping the cube indices onto the coord initial_coord_inds = [initial_inds[ind] for ind in src_dims] # Making the final ones the same way as for the cube # 0 will always appear in the initial ones because we know this # coord overlaps the squish dims final_coord_inds = list( filter(lambda x: x != 0, initial_coord_inds) ) + [0] new_coord_points = np.einsum( columns_coord.points, initial_coord_inds, final_coord_inds ) # Check we're not overwriting coord.points with the wrong shape if ( not new_cube.coord( == new_coord_points.shape ): msg = ( "Coord {} was expected to have new points of shape {}. " "Found shape of {}." ) raise ValueError( msg.format(, new_cube.coord(, new_coord_points.shape, ) ) # Replace the points new_cube.coord( = new_coord_points elif method == "nearest": # Use a cache with _nearest_neighbour_indices_ndcoords() cache = {} column_indexes = _nearest_neighbour_indices_ndcoords( cube, sample_points, cache=cache ) # Construct "fancy" indexes, so we can create the result data array in # a single numpy indexing operation. # ALSO: capture the index range in each dimension, so that we can fetch # only a required (square) sub-region of the source data. fancy_source_indices = [] region_slices = [] n_index_length = len(column_indexes[0]) dims_reduced = [False] * n_index_length for i_ind in range(n_index_length): contents = [column_index[i_ind] for column_index in column_indexes] each_used = [content != slice(None) for content in contents] if np.all(each_used): # This dimension is addressed : use a list of indices. dims_reduced[i_ind] = True # Select the region by min+max indices. start_ind = np.min(contents) stop_ind = 1 + np.max(contents) region_slice = slice(start_ind, stop_ind) # Record point indices with start subtracted from all of them. fancy_index = list(np.array(contents) - start_ind) elif not np.any(each_used): # This dimension is not addressed by the operation. # Use a ":" as the index. fancy_index = slice(None) # No sub-region selection for this dimension. region_slice = slice(None) else: # Should really never happen, if _ndcoords is right. msg = ( "Internal error in trajectory interpolation : point " "selection indices should all have the same form." ) raise ValueError(msg) fancy_source_indices.append(fancy_index) region_slices.append(region_slice) # Fetch the required (square-section) region of the source data. # NOTE: This is not quite as good as only fetching the individual # points used, but it avoids creating a sub-cube for each point, # which is very slow, especially when points are re-used a lot ... source_area_indices = tuple(region_slices) source_data = cube[source_area_indices].data # Transpose source data before indexing it to get the final result. # Because.. the fancy indexing will replace the indexed (horizontal) # dimensions with a new single dimension over trajectory points. # Move those dimensions to the end *first* : this ensures that the new # dimension also appears at the end, which is where we want it. # Make a list of dims with the reduced ones last. dims_reduced = np.array(dims_reduced) dims_order = np.arange(n_index_length) dims_order = np.concatenate( (dims_order[~dims_reduced], dims_order[dims_reduced]) ) # Rearrange the data dimensions and the fancy indices into that order. source_data = source_data.transpose(dims_order) fancy_source_indices = [fancy_source_indices[i_dim] for i_dim in dims_order] # Apply the fancy indexing to get all the result data points. = source_data[tuple(fancy_source_indices)] # Fill in the empty squashed (non derived) coords. column_coords = [ coord for coord in cube.dim_coords + cube.aux_coords if not squish_my_dims.isdisjoint(cube.coord_dims(coord)) ] new_cube_coords = [ new_cube.coord( for column_coord in column_coords ] all_point_indices = np.array(column_indexes) single_point_test_cube = cube[column_indexes[0]] for new_cube_coord, src_coord in zip(new_cube_coords, column_coords): # Check structure of the indexed coord (at one selected point). point_coord = single_point_test_cube.coord(src_coord) if len(point_coord.points) != 1: msg = ( "Coord {} at one x-y position has the shape {}, " "instead of being a single point. " ) raise ValueError(msg.format(, src_coord.shape)) # Work out which indices apply to the input coord. # NOTE: we know how to index the source cube to get a cube with a # single point for each coord, but this is very inefficient. # So here, we translate cube indexes into *coord* indexes. src_coord_dims = cube.coord_dims(src_coord) fancy_coord_index_arrays = [ list(all_point_indices[:, src_dim]) for src_dim in src_coord_dims ] # Fill the new coord with all the correct points from the old one. new_cube_coord.points = src_coord.points[tuple(fancy_coord_index_arrays)] # NOTE: the new coords do *not* have bounds. return new_cube
def _ll_to_cart(lon, lat): # Based on cartopy.img_transform.ll_to_cart(). x = np.sin(np.deg2rad(90 - lat)) * np.cos(np.deg2rad(lon)) y = np.sin(np.deg2rad(90 - lat)) * np.sin(np.deg2rad(lon)) z = np.cos(np.deg2rad(90 - lat)) return (x, y, z) def _cartesian_sample_points(sample_points, sample_point_coord_names): """Replace geographic lat/lon with cartesian xyz. Generates coords suitable for nearest point calculations with `scipy.spatial.cKDTree`. Parameters ---------- sample_points : [coord][datum] list of sample_positions for each datum, formatted for fast use of :func:`_ll_to_cart()`. sample_point_coord_names : [coord] list of n coord names. Returns ------- list of [x,y,z,t,etc] positions, formatted for kdtree. """ # Find lat and lon coord indices i_lat = i_lon = None i_non_latlon = list(range(len(sample_point_coord_names))) for i, name in enumerate(sample_point_coord_names): if "latitude" in name: i_lat = i i_non_latlon.remove(i_lat) if "longitude" in name: i_lon = i i_non_latlon.remove(i_lon) if i_lat is None or i_lon is None: return sample_points.transpose() num_points = len(sample_points[0]) cartesian_points = [None] * num_points # Get the point coordinates without the latlon for p in range(num_points): cartesian_points[p] = [sample_points[c][p] for c in i_non_latlon] # Add cartesian xyz coordinates from latlon x, y, z = _ll_to_cart(sample_points[i_lon], sample_points[i_lat]) for p in range(num_points): cartesian_point = cartesian_points[p] cartesian_point.append(x[p]) cartesian_point.append(y[p]) cartesian_point.append(z[p]) return cartesian_points def _nearest_neighbour_indices_ndcoords(cube, sample_points, cache=None): """Calculate the cube nearest neighbour indices for the samples. Return the indices to select the data value(s) closest to the given coordinate point values. 'sample_points' is of the form [[coord-or-coord-name, point-value(s)]*]. The lengths of all the point-values sequences must be equal. This function is adapted for points sampling a multi-dimensional coord, and can currently only do nearest neighbour interpolation. Because this function can be slow for multidimensional coordinates, a 'cache' dictionary can be provided by the calling code. .. note:: If the points are longitudes/latitudes, these are handled correctly as points on the sphere, but the values must be in 'degrees'. Developer notes: A "sample space cube" is made which only has the coords and dims we are sampling on. We get the nearest neighbour using this sample space cube. """ if sample_points: try: coord, value = sample_points[0] except (KeyError, ValueError): emsg = ( "Sample points must be a list of " "(coordinate, value) pairs, got {!r}." ) raise TypeError(emsg.format(sample_points)) # Convert names to coords in sample_point and reformat sample point values # for use in `_cartesian_sample_points()`. coord_values = [] sample_point_coords = [] sample_point_coord_names = [] ok_coord_ids = set(map(id, cube.dim_coords + cube.aux_coords)) for coord, value in sample_points: coord = cube.coord(coord) if id(coord) not in ok_coord_ids: msg = ( "Invalid sample coordinate {!r}: derived coordinates are" " not allowed.".format( ) raise ValueError(msg) sample_point_coords.append(coord) sample_point_coord_names.append( value = np.array(value, ndmin=1) coord_values.append(value) coord_point_lens = np.array([len(value) for value in coord_values]) if not np.all(coord_point_lens == coord_point_lens[0]): msg = "All coordinates must have the same number of sample points." raise ValueError(msg) coord_values = np.array(coord_values) # Which dims are we sampling? sample_dims = set() for coord in sample_point_coords: for dim in cube.coord_dims(coord): sample_dims.add(dim) sample_dims = sorted(list(sample_dims)) # Extract a sub cube that lives in just the sampling space. sample_space_slice = [0] * cube.ndim for sample_dim in sample_dims: sample_space_slice[sample_dim] = slice(None, None) sample_space_slice = tuple(sample_space_slice) sample_space_cube = cube[sample_space_slice] # Just the sampling coords. for coord in sample_space_cube.coords(): if not in sample_point_coord_names: sample_space_cube.remove_coord(coord) # Order the sample point coords according to the sample space cube coords. sample_space_coord_names = [ for coord in sample_space_cube.coords()] new_order = [ sample_space_coord_names.index(name) for name in sample_point_coord_names ] coord_values = np.array([coord_values[i] for i in new_order]) sample_point_coord_names = [sample_point_coord_names[i] for i in new_order] sample_space_coords = sample_space_cube.dim_coords + sample_space_cube.aux_coords sample_space_coords_and_dims = [ (coord, sample_space_cube.coord_dims(coord)) for coord in sample_space_coords ] if cache is not None and cube in cache: kdtree = cache[cube] else: # Create a "sample space position" for each # `datum.sample_space_data_positions[coord_index][datum_index]`. sample_space_data_positions = np.empty( (len(sample_space_coords_and_dims),, dtype=float, ) for d, ndi in enumerate(np.ndindex( for c, (coord, coord_dims) in enumerate(sample_space_coords_and_dims): # Index of this datum along this coordinate (could be n-D). if coord_dims: keys = tuple(ndi[ind] for ind in coord_dims) else: keys = slice(None, None) # Position of this datum along this coordinate. sample_space_data_positions[c][d] = coord.points[keys] # Convert to cartesian coordinates. Flatten for kdtree compatibility. cartesian_space_data_coords = _cartesian_sample_points( sample_space_data_positions, sample_point_coord_names ) # Create a kdtree for the nearest-distance lookup to these 3d points. kdtree = cKDTree(cartesian_space_data_coords) # This can find the nearest datum point to any given target point, # which is the goal of this function. # Update cache. if cache is not None: cache[cube] = kdtree # Convert the sample points to cartesian (3d) coords. # If there is no latlon within the coordinate there will be no change. # Otherwise, geographic latlon is replaced with cartesian xyz. cartesian_sample_points = _cartesian_sample_points( coord_values, sample_point_coord_names ) # Use kdtree to get the nearest sourcepoint index for each target point. _, datum_index_lists = kdtree.query(cartesian_sample_points) # Convert flat indices back into multidimensional sample-space indices. sample_space_dimension_indices = np.unravel_index( datum_index_lists, ) # Convert this from "pointwise list of index arrays for each dimension", # to "list of cube indices for each point". sample_space_ndis = np.array(sample_space_dimension_indices).transpose() # For the returned result, we must convert these indices into the source # (sample-space) cube, to equivalent indices into the target 'cube'. # Make a result array: (cube.ndim * <index>), per sample point. n_points = coord_values.shape[-1] main_cube_slices = np.empty((n_points, cube.ndim), dtype=object) # Initialise so all unused indices are ":". main_cube_slices[:] = slice(None) # Move result indices according to the source (sample) and target (cube) # dimension mappings. for sample_coord, sample_coord_dims in sample_space_coords_and_dims: # Find the coord in the main cube main_coord = cube.coord( main_coord_dims = cube.coord_dims(main_coord) # Fill nearest-point data indices for each coord dimension. for sample_i, main_i in zip(sample_coord_dims, main_coord_dims): main_cube_slices[:, main_i] = sample_space_ndis[:, sample_i] # Return as a list of **tuples** : required for correct indexing usage. result = [tuple(inds) for inds in main_cube_slices] return result
[docs] class UnstructuredNearestNeigbourRegridder: """Encapsulate the operation of :meth:`iris.analysis.trajectory.interpolate`. Encapsulate the operation of :meth:`iris.analysis.trajectory.interpolate` with given source and target grids. This is the type used by the :class:`~iris.analysis.UnstructuredNearest` regridding scheme. """ # TODO: cache the necessary bits of the operation so reuse can actually # be more efficient. def __init__(self, src_cube, target_grid_cube): """Nearest-neighbour regridder. A nearest-neighbour regridder to perform regridding from the source grid to the target grid. This can then be applied to any source data with the same structure as the original 'src_cube'. Parameters ---------- src_cube : :class:`~iris.cube.Cube` The :class:`~iris.cube.Cube` defining the source grid. The X and Y coordinates can have any shape, but must be mapped over the same cube dimensions. target_grid_cube : :class:`~iris.cube.Cube` A :class:`~iris.cube.Cube`, whose X and Y coordinates specify a desired target grid. The X and Y coordinates must be one-dimensional dimension coordinates, mapped to different dimensions. All other cube components are ignored. Returns ------- regridder (object) A callable object with the interface:: result_cube = regridder(data) where `data` is a cube with the same grid as the original `src_cube`, that is to be regridded to the `target_grid_cube`. Notes ----- .. note:: For latitude-longitude coordinates, the nearest-neighbour distances are computed on the sphere, otherwise flat Euclidean distances are used. The source and target X and Y coordinates must all have the same coordinate system, which may also be None. If any X and Y coordinates are latitudes or longitudes, they *all* must be. Otherwise, the corresponding X and Y coordinates must have the same units in the source and grid cubes. """ from iris.analysis._interpolation import snapshot_grid from iris.util import _meshgrid # Make a copy of the source cube, so we can convert coordinate units. src_cube = src_cube.copy() # Snapshot the target grid and check it is a "normal" grid. tgt_x_coord, tgt_y_coord = snapshot_grid(target_grid_cube) # Check that the source has unique X and Y coords over common dims. if not src_cube.coords(axis="x") or not src_cube.coords(axis="y"): msg = "Source cube must have X- and Y-axis coordinates." raise ValueError(msg) src_x_coord = src_cube.coord(axis="x") src_y_coord = src_cube.coord(axis="y") if src_cube.coord_dims(src_x_coord) != src_cube.coord_dims(src_y_coord): msg = "Source cube X and Y coordinates must have the same cube dimensions." raise ValueError(msg) # Record *copies* of the original grid coords, in the desired # dimension order. # This lets us convert the actual ones in use to units of "degrees". self.src_grid_coords = [src_y_coord.copy(), src_x_coord.copy()] self.tgt_grid_coords = [tgt_y_coord.copy(), tgt_x_coord.copy()] # Check that all XY coords have suitable coordinate systems and units. coords_all = [src_x_coord, src_y_coord, tgt_x_coord, tgt_y_coord] cs = coords_all[0].coord_system if not all(coord.coord_system == cs for coord in coords_all): msg = ( "Source and target cube X and Y coordinates must all have " "the same coordinate system." ) raise ValueError(msg) # Check *all* X and Y coords are lats+lons, if any are. latlons = [ "latitude" in or "longitude" in for coord in coords_all ] if any(latlons) and not all(latlons): msg = ( "If any X and Y coordinates are latitudes/longitudes, " "then they all must be." ) raise ValueError(msg) self.grid_is_latlon = any(latlons) if self.grid_is_latlon: # Convert all XY coordinates to units of "degrees". # N.B. already copied the target grid, so the result matches that. for coord in coords_all: try: coord.convert_units("degrees") except ValueError: msg = ( "Coordinate {!r} has units of {!r}, which does not " 'convert to "degrees".' ) raise ValueError(msg.format(, str(coord.units))) else: # Check that source and target have the same X and Y units. if ( src_x_coord.units != tgt_x_coord.units or src_y_coord.units != tgt_y_coord.units ): msg = ( "Source and target cube X and Y coordinates must " "have the same units." ) raise ValueError(msg) # Record the resulting grid shape. self.tgt_grid_shape = tgt_y_coord.shape + tgt_x_coord.shape # Calculate sample points as 2d arrays, like broadcast (NY,1)*(1,NX). x_2d, y_2d = _meshgrid(tgt_x_coord.points, tgt_y_coord.points) # Cast as a "trajectory", to suit the method used. self.trajectory = ( (, x_2d.flatten()), (, y_2d.flatten()), ) def __call__(self, src_cube): # Check the source cube X and Y coords match the original. # Note: for now, this is sufficient to ensure a valid trajectory # interpolation, but if in future we save and reuse the cache context # for the 'interpolate' call, we may need more checks here. # Check the given cube against the original. x_cos = src_cube.coords(axis="x") y_cos = src_cube.coords(axis="y") if ( not x_cos or not y_cos or y_cos != [self.src_grid_coords[0]] or x_cos != [self.src_grid_coords[1]] ): msg = ( "The given cube is not defined on the same source " "grid as this regridder." ) raise ValueError(msg) # Convert source XY coordinates to degrees if required. if self.grid_is_latlon: src_cube = src_cube.copy() src_cube.coord(axis="x").convert_units("degrees") src_cube.coord(axis="y").convert_units("degrees") # Get the basic interpolated results. result_trajectory_cube = interpolate( src_cube, self.trajectory, method="nearest" ) # Reconstruct this as a cube "like" the source data. # TODO: handle all aux-coords, cell measures ?? # The shape is that of the basic result, minus the trajectory (last) # dimension, plus the target grid dimensions. target_shape = result_trajectory_cube.shape[:-1] + self.tgt_grid_shape data_2d_x_and_y = # Make a new result cube with the reshaped data. result_cube = iris.cube.Cube(data_2d_x_and_y) result_cube.metadata = src_cube.metadata # Copy all the coords from the trajectory result. i_trajectory_dim = result_trajectory_cube.ndim - 1 for coord in result_trajectory_cube.dim_coords: dims = result_trajectory_cube.coord_dims(coord) if i_trajectory_dim not in dims: result_cube.add_dim_coord(coord.copy(), dims) for coord in result_trajectory_cube.aux_coords: dims = result_trajectory_cube.coord_dims(coord) if i_trajectory_dim not in dims: result_cube.add_aux_coord(coord.copy(), dims) # Add the X+Y grid coords from the grid cube, mapped to the new Y and X # dimensions, i.e. the last 2. for i_dim, coord in enumerate(self.tgt_grid_coords): result_cube.add_dim_coord(coord.copy(), i_dim + i_trajectory_dim) return result_cube