import numpy as np
from scipy.ndimage import map_coordinates
import scipy.ndimage as ndimage
from scipy.interpolate import interp1d, LinearNDInterpolator
from .interpolators import Nearest2DInterpolator, fill_NaN_towards_seafloor, horizontal_interpolation_methods, vertical_interpolation_methods
from opendrift.readers.basereader import variables
import logging
logger = logging.getLogger(__name__)
[docs]
class ReaderBlock():
"""Class to store and interpolate the output from a reader with data on a regular (structured) grid."""
def __init__(self, data_dict,
interpolation_horizontal='linearNDFast',
interpolation_vertical='linear',
wrap_x=False):
# Make pointers to data values, for convenience
self.x = data_dict['x']
self.y = data_dict['y']
self.time = data_dict['time']
self.data_dict = data_dict
del self.data_dict['x']
del self.data_dict['y']
del self.data_dict['time']
try:
self.z = data_dict['z']
del self.data_dict['z']
except:
self.z = None
self.wrap_x = wrap_x # For global readers where longitude wraps at 360
if wrap_x is True:
if self.x.min() < 180:
logger.debug('Shifting reader block longitudes to -180 to 180')
self.x = np.mod(self.x+180, 360) - 180
elif self.x.max() > 360:
logger.debug('Shifting reader block longitudes to 0 to 360')
self.x = np.mod(self.x, 360)
# Mask any extremely large values, e.g. if missing netCDF _Fill_value
filled_variables = set()
for var in self.data_dict:
self.data_dict[var] = variables.Variables.__check_variable_array__(var, self.data_dict[var])
if isinstance(self.data_dict[var], np.ma.core.MaskedArray):
self.data_dict[var] = np.ma.masked_outside(
np.ma.masked_invalid(self.data_dict[var]), -1E+9, 1E+9)
# Convert masked arrays to numpy arrays
self.data_dict[var] = np.ma.filled(self.data_dict[var],
fill_value=np.nan)
# Fill missing data towards seafloor if 3D
if isinstance(self.data_dict[var], (list,)):
logger.warning('Ensemble data currently not extrapolated towards seafloor')
elif self.data_dict[var].ndim == 3:
filled = fill_NaN_towards_seafloor(self.data_dict[var])
if filled is True:
filled_variables.add(var)
if len(filled_variables) > 0:
logger.debug('Filled NaN-values toward seafloor for :'
+ str(list(filled_variables)))
# Set 1D (vertical) and 2D (horizontal) interpolators
try:
self.Interpolator2DClass = \
horizontal_interpolation_methods[interpolation_horizontal]
except Exception:
raise NotImplementedError(
'Valid interpolation methods are: ' +
str(horizontal_interpolation_methods.keys()))
try:
self.Interpolator1DClass = \
vertical_interpolation_methods[interpolation_vertical]
except Exception:
raise NotImplementedError(
'Valid interpolation methods are: ' +
str(vertical_interpolation_methods.keys()))
if 'land_binary_mask' in self.data_dict.keys() and \
interpolation_horizontal != 'nearest':
logger.debug('Nearest interpolation will be used '
'for landmask, and %s for other variables'
% interpolation_horizontal)
[docs]
def _initialize_interpolator(self, x, y, z=None):
logger.debug('Initialising interpolator.')
if self.wrap_x is True:
if self.x.min() > 0:
x = np.mod(x, 360) # Shift x/lons to 0-360
else:
x = np.mod(x + 180, 360) - 180 # Shift x/lons to -180-180
self.interpolator2d = self.Interpolator2DClass(self.x, self.y, x, y)
if self.z is not None and len(np.atleast_1d(self.z)) > 1:
self.interpolator1d = self.Interpolator1DClass(self.z, z)
[docs]
def interpolate(self, x, y, z=None, variables=None,
profiles=[], profiles_depth=None):
self._initialize_interpolator(x, y, z)
env_dict = {}
if profiles is not []:
profiles_dict = {'z': self.z}
for varname, data in self.data_dict.items():
nearest = False
if varname == 'land_binary_mask':
nearest = True
self.interpolator2d_nearest = Nearest2DInterpolator(self.x, self.y, x, y)
if type(data) is list:
num_ensembles = len(data)
logger.debug('Interpolating %i ensembles for %s' % (num_ensembles, varname))
if data[0].ndim == 2:
horizontal = np.zeros(x.shape)*np.nan
else:
horizontal = np.zeros((len(self.z), len(x)))*np.nan
ensemble_number = np.remainder(range(len(x)), num_ensembles)
for en in range(num_ensembles):
elnum = ensemble_number == en
int_full = self._interpolate_horizontal_layers(data[en], nearest=nearest)
if int_full.ndim == 1:
horizontal[elnum] = int_full[elnum]
else:
horizontal[:, elnum] = int_full[:, elnum]
else:
horizontal = self._interpolate_horizontal_layers(data, nearest=nearest)
if profiles is not None and varname in profiles:
profiles_dict[varname] = horizontal
if horizontal.ndim > 1:
env_dict[varname] = self.interpolator1d(horizontal)
else:
env_dict[varname] = horizontal
if 'z' in profiles_dict:
profiles_dict['z'] = np.atleast_1d(profiles_dict['z'])
return env_dict, profiles_dict
[docs]
def _interpolate_horizontal_layers(self, data, nearest=False):
'''Interpolate all layers of 3d (or 2d) array.'''
if nearest is True:
interpolator2d = self.interpolator2d_nearest
else:
interpolator2d = self.interpolator2d
if data.ndim == 2:
return interpolator2d(data)
if data.ndim == 3:
num_layers = data.shape[0]
# Allocate output array
result = np.ma.empty((num_layers, len(interpolator2d.x)))
for layer in range(num_layers):
result[layer, :] = self.interpolator2d(data[layer, :, :])
return result
[docs]
def covers_positions(self, x, y, z=None):
'''Check if given positions are covered by this reader block.'''
if self.wrap_x is True:
if self.x.min() < 180:
logger.debug('Shifting longitudes to -180 to 180')
x = np.mod(x+180, 360) - 180
elif self.x.max() > 360:
logger.debug('Shifting longitudes to 0 to 360')
x = np.mod(x, 360)
indices = np.where((x >= self.x.min()) & (x <= self.x.max()) &
(y >= self.y.min()) & (y <= self.y.max()))[0]
if len(indices) == len(x):
return True
else:
return False