Source code for opendrift.readers.operators.ops

from abc import abstractmethod
from numbers import Number
from typing import List
import pyproj
import xarray as xr
import numpy as np


[docs] class Combine: """Combining two readers into a third one. You can use usual operators, but also more complex ones such as gaussian combining. """
[docs] def __add__(self, other): from .readerops import Combined as ReaderCombined from .numops import Combined as NumCombined from ..basereader import BaseReader if isinstance(other, Number): return NumCombined.add(other, self) elif isinstance(other, BaseReader): return ReaderCombined(self, other, lambda a, b: a + b) else: return NotImplemented
[docs] def __radd__(self, other): return self.__add__(other)
[docs] def __mul__(self, other): from .numops import Combined as NumCombined if isinstance(other, Number): return NumCombined.mul(other, self) else: return NotImplemented
[docs] def __rmul__(self, other): return self.__mul__(other)
[docs] def __truediv__(self, other): from .numops import Combined as NumCombined if isinstance(other, Number): return NumCombined.div(other, self) else: return NotImplemented
[docs] def __sub__(self, other): return self + (-1 * other)
[docs] def combine_gaussian(self, measurement_reader, std): """Mix two readers with a gaussian, whose std is the one given as an argument. The measurment reader have to be of type timeseries, with a lon and lat attributes that are taken as the center of the measure. """ from .readerops import Combined as ReaderCombined def gaussian_factor(lon, lat, lon_center, lat_center, std): geod = pyproj.Geod(ellps='WGS84') assert isinstance(np.broadcast_arrays(lon, lat), list), f"requested lon and lat not broadcastable" lon, lat = np.broadcast_arrays(lon, lat) requested_shape = lon.shape requested_ndim = len(requested_shape) ## if isinstance(lon_center, float) : lon_center = lon_center * np.ones(requested_shape) # elif lon_center.shape != requested_shape : lon_center = np.expand_dims(lon_center, tuple(range(1, requested_ndim+1))) lon, lon_center = np.broadcast_arrays(lon_center, lon) ## if isinstance(lat_center, float) : lat_center = lat_center * np.ones(requested_shape) # elif lat_center.shape != requested_shape : lat_center = np.expand_dims(lat_center, tuple(range(1, requested_ndim+1))) lat, lat_center = np.broadcast_arrays(lat_center, lat) ## if isinstance(std, float) : std = std * np.ones(requested_shape) elif lon_center.shape != requested_shape : std = np.expand_dims(std, tuple(range(1, requested_ndim+1))) std, _ = np.broadcast_arrays(std, lat) _, _, distance = geod.inv(lon, lat, lon_center, lat_center) exponential_factor = np.exp( -np.power(distance/std, 2.) / 2) return exponential_factor return ReaderCombined(a = self, b = measurement_reader, op = gaussian_factor, op_type= "combine_gaussian", external_params = std)
[docs] class Filter: @property @abstractmethod def variables(self) -> List[str]: pass
[docs] def filter_vars(self, vars): """ Only keep the specified variables. """ from .filter import FilterVariables return FilterVariables(self, vars)
[docs] def exclude_vars(self, vars): """ Remove the specified variables. """ from .filter import FilterVariables vars = list(set(self.variables) - set(vars)) return FilterVariables(self, vars)