Source code for opendrift.readers.reader_FVCOM_xarray

# This file is part of OpenDrift.
#
# OpenDrift is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 2
#
# OpenDrift is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OpenDrift.  If not, see <https://www.gnu.org/licenses/>.

import numpy as np
from datetime import datetime, timedelta, timezone
from netCDF4 import Dataset
import xarray as xr
from scipy.interpolate import LinearNDInterpolator
import scipy, scipy.spatial
import pyproj
import logging
from opendrift.readers.basereader import BaseReader, UnstructuredReader

logger = logging.getLogger(__name__)


[docs] class Reader(BaseReader, UnstructuredReader): """ A reader for unstructured (irregularily gridded) `CF compliant <https://cfconventions.org/>`_ netCDF files. Args: :param filename: A pattern of netCDF files or a zarr archive. The netCDF file can also be an URL to an OPeNDAP server. :type filename: string, required. :param filetype: 'netcdf' or 'zarr'. :type filetype: string, required. :param name: Name of reader :type name: string, optional :param proj4: PROJ.4 string describing projection of data. :type proj4: string, optional .. seealso:: py:mod:`opendrift.readers.basereader.unstructured`. """ # Misspelled standard names in some (Akvaplan-NIVA) FVCOM files variable_aliases = { # FVCOM name (standard_name, or long_name if necessary): OpenDrift standard "eastward_sea_water_velocity": "x_sea_water_velocity", "Northward_sea_water_velocity": "y_sea_water_velocity", "eastward wind": "x_wind", "northward wind": "y_wind", "ww": "upward_sea_water_velocity", "sea_floor_depth_below_geoid": "sea_floor_depth_below_sea_level", "l": "turbulent_generic_length_scale", "q2": "turbulent_kinetic_energy", "kh": "ocean_vertical_diffusivity", } node_variables = [ "sea_floor_depth_below_sea_level", "sea_water_salinity", "sea_water_temperature", "sea_surface_height_above_geoid", "turbulent_generic_length_scale", "turbulent_kinetic_energy", "ocean_vertical_diffusivity", ] face_variables = [ "x_wind", "y_wind", "x_sea_water_velocity", "y_sea_water_velocity", "upward_sea_water_velocity", ] dataset = None # For in-memory caching of Sigma-coordinates and ocean depth siglay = None siglev = None siglay_center = None siglev_center = None ocean_depth_nele = None ocean_depth_node = None def __init__(self, filename, proj4, filetype="zarr", cache=None, name=None): """ filename: zarr store or netcdf file proj4: PROJ4 string filetype: "zarr" or "netcdf" cache: either None (no caching) or int (size of cache in bytes). Only respected for zarr stores. name: name of Reader. """ filestr = str(filename) # if Path was supplied if name is None: self.name = filestr else: self.name = name self.timer_start("open dataset") logger.info("Opening dataset: " + filestr) if filetype == "zarr": from zarr.storage import FSStore, LRUStoreCache store_nocache = FSStore(filestr) if cache is not None: store = LRUStoreCache( store_nocache, max_size=cache ) # e.g. 2**30 = 1 GiB cache else: store = store_nocache self.dataset = xr.open_dataset( store, engine="zarr", drop_variables=["time"] ) elif filetype == "netcdf": self.dataset = open_fvcom_files_as_xarray(filename) logger.info("Using custom projection: %s.." % proj4) self.proj4 = proj4 logger.info("Reading grid and coordinate variables...") if self.dataset.CoordinateSystem == "Cartesian": self.x = self.dataset["x"][:] self.y = self.dataset["y"][:] self.xc = self.dataset["xc"][:] self.yc = self.dataset["yc"][:] elif self.dataset.CoordinateSystem == "GeoReferenced": self.x = self.dataset["lon"][:] self.y = self.dataset["lat"][:] self.xc = self.dataset["lonc"][:] self.yc = self.dataset["latc"][:] else: raise RuntimeError("This coordinate system is not supported") ref_time = datetime( 1858, 11, 17, 00, 00, 00 ) # TODO: include , tzinfo=timezone.utc) self.times = np.array( [ ref_time + timedelta(days=Itime.item() + Itime2.item() / 86400e3) for Itime, Itime2 in zip( self.dataset["Itime"][:], self.dataset["Itime2"][:], ) ] ) self.start_time = self.times[0] self.end_time = self.times[-1] # time steps are not constant self.xmin = np.min(self.x.values) self.xmax = np.max(self.x.values) self.ymin = np.min(self.y.values) self.ymax = np.max(self.y.values) self.variable_mapping = {} for var_name in self.dataset.variables: # skipping coordinate variables if var_name in [ "x", "y", "time", "lon", "lat", "lonc", "latc", "siglay", "siglev", "siglay_center", "siglev_center", ]: continue # We only use sea_floor at nodes if var_name in ["h_center"]: continue attrs = self.dataset[var_name].attrs if "standard_name" in attrs: name = attrs["standard_name"] # TBD: check against requested_variables before adding logger.debug(f"Adding FVCOM std_name {name}") name = self.variable_aliases.get(name, name) logger.debug(f"Using OpenDrift alias: {name}") self.variable_mapping[name] = str(var_name) elif var_name in self.dataset.data_vars: if var_name in self.variable_aliases: name = self.variable_aliases[var_name] logger.debug(f"Using OpenDrift alias: {var_name} for {name}") else: name = var_name logger.debug(f"Using {var_name} from dataset variable name") self.variable_mapping[name] = str(var_name) else: logger.debug(f"Dropping {var_name} - no mapping found") self.variables = list(self.variable_mapping.keys()) # Run constructor of parent Reader class super().__init__() self.boundary = self._build_boundary_polygon_(self.x, self.y) self.timer_start("build index") logger.debug("building index of nodes..") self.nodes_idx = self._build_ckdtree_(self.x, self.y) logger.debug("building index of faces..") self.faces_idx = self._build_ckdtree_(self.xc, self.yc) self.timer_end("build index") self.timer_end("open dataset")
[docs] def plot_mesh(self): """ Plot the grid mesh. Does not automatically show the figure. """ import matplotlib.pyplot as plt plt.figure() plt.scatter(self.x, self.y, marker="x", color="blue", label="nodes") plt.scatter(self.xc, self.yc, marker="o", color="red", label="centroids") x, y = getattr(self.boundary, "context").exterior.xy plt.plot(x, y, color="green", label="boundary") plt.legend() plt.title("Unstructured grid: %s\n%s" % (self.name, self.proj)) plt.xlabel("x [m]") plt.ylabel("y [m]")
[docs] def get_variables(self, requested_variables, time=None, x=None, y=None, z=None): """ FVCOM Grid: FVCOM uses 'triangular prisms' for gridding. Some variables are defined on the faces of the triangles, while others at the node. `x` and `y` holds the positions of the node, while `xc` and `yc` holds the positions on the centroids/faces. The centroids/faces are also known as 'zonal', or elements (presumably as in finite element). .. note:: Currently this reader does not really interpolate. It looks up the closest point in time and space. Each element has a lookup-table of its surrounding elements, this list can be used when looking up elements for the interpolator of an arbitrary point on the grid. The same goes for the nodes. Let E be number of elements and N be number of nodes. Relevant lookup-tables: nbe: [3 x E] elements surround each element nbve: [9 x N] elements surrounding each node, minimum 3 nbsn: [11 x N] nodes surrounding each node Variables: Face: * u * v Node: * temperature * salinity """ x = np.atleast_1d(x) y = np.atleast_1d(y) z = np.atleast_1d(z) if len(z) == 1: z = z[0] * np.ones(x.shape) logger.debug( "Requested variabels: %s, lengths: %d, %d, %d" % (requested_variables, len(x), len(y), len(z)) ) requested_variables, time, x, y, z, _outside = self.check_arguments( requested_variables, time, x, y, z ) ( nearest_time, _time_before, _time_after, indx_nearest, _indx_before, _indx_after, ) = self.nearest_time(time) logger.debug("Nearest time: %s" % nearest_time) node_variables = [ var for var in requested_variables if var in self.node_variables ] face_variables = [ var for var in requested_variables if var in self.face_variables ] all_avail = (len(node_variables) + len(face_variables)) == len( requested_variables ) if not all_avail: logger.debug( f"Node variables {node_variables} and face variables {face_variables}" ) logger.debug(f"Requested variables {requested_variables}") logger.debug(f"Variable mapping {self.variable_mapping}") assert all_avail, "missing variables requested" variables = {} x = xr.DataArray(x, dims=["particles"]) y = xr.DataArray(y, dims=["particles"]) z = xr.DataArray(z, dims=["particles"]) if node_variables: logger.debug("Interpolating node-variables..") nodes = xr.DataArray(self._nearest_node_(x, y), dims=["particles"]) assert len(nodes) == len(x) for var in node_variables: dvar = self.variable_mapping.get(var) logger.debug("Interpolating: %s (%s)" % (var, dvar)) dvar = self.dataset[dvar] if any([d.startswith("sig") for d in dvar.dims]): # super hacky! sigma_ind = xr.DataArray( self.__nearest_node_sigma__(dvar, nodes.values, z.values), dims=["particles"], ) assert len(sigma_ind) == len(z) else: sigma_ind = slice(None) variables[var] = dvar.isel( time=indx_nearest, siglay_dim=sigma_ind, siglev_dim=sigma_ind, # hacky - relies on the fact that at most one of siglay/siglev # is defined on any given array node=nodes, missing_dims="ignore", ).values if face_variables: logger.debug("Interpolating face-variables..") neles = xr.DataArray(self._nearest_face_(x, y), dims=["particles"]) assert len(neles) == len(x) for var in face_variables: dvar = self.variable_mapping.get(var) logger.debug("Interpolating: %s (%s)" % (var, dvar)) dvar = self.dataset[dvar] if any([d.startswith("sig") for d in dvar.dims]): # super hacky! sigma_ind = xr.DataArray( self.__nearest_face_sigma__(dvar, neles.values, z.values), dims=["particles"], ) assert len(sigma_ind) == len(z) else: sigma_ind = slice(None) variables[var] = dvar.isel( time=indx_nearest, siglay_dim=sigma_ind, siglev_dim=sigma_ind, # hacky - relies on the fact that at most one of siglay/siglev # is defined on any given array nele=neles, missing_dims="ignore", ).values return variables
[docs] @staticmethod def _vector_nearest_(X, xp): """ Find nearest element in vector of vectors `X` for each `xp`. Args: X NxM matrix of levels xp M vector of positions Returns: i M vector of indices [0..N] of closest element in X[0..N, i] to xp[i] """ if "siglay" in X.dims: xp = xr.DataArray( xp, dims=["node"] ) # add dummy dimension for easier xr magic inds = np.abs(X - xp).argmin(dim="siglay").values elif "siglay_center" in X.dims: xp = xr.DataArray( xp, dims=["nele"] ) # add dummy dimension for easier xr magic inds = np.abs(X - xp).argmin(dim="siglay_center").values elif "siglev" in X.dims: xp = xr.DataArray( xp, dims=["node"] ) # add dummy dimension for easier xr magic inds = np.abs(X - xp).argmin(dim="siglev").values elif "siglev_center" in X.dims: xp = xr.DataArray( xp, dims=["nele"] ) # add dummy dimension for easier xr magic inds = np.abs(X - xp).argmin(dim="siglev_center").values return inds
[docs] def __nearest_node_sigma__(self, var, nodes, z): """ Find nearest depth at node (sigma layer or sigma level depending on where the variable is defined). """ shp = var.shape if self.siglay is None: logger.debug("Reading siglays into memory...") self.siglay = self.dataset["siglay"][:] if self.siglev is None: logger.debug("Reading siglevs into memory...") self.siglev = self.dataset["siglev"][:] if any([d.startswith("siglay") for d in var.dims]): sigmas = self.siglay[:] elif any([d.startswith("siglev") for d in var.dims]): sigmas = self.siglev[:] else: raise ValueError if self.ocean_depth_node is None: logger.debug("Reading ocean depth into memory...") self.ocean_depth_node = self.dataset["h"][:] # Calculating depths from sigmas depths = self.z_from_sigma(sigmas, self.ocean_depth_node[nodes]) return self._vector_nearest_(depths, z)
[docs] def __nearest_face_sigma__(self, var, el, z): """ TBD: This is extremely hacky... Find nearest depth at element (sigma layer or sigma level depending on where the variable is defined). """ shp = var.shape if self.siglay_center is None: logger.info("Reading siglay_centers into memory...") self.siglay_center = self.dataset["siglay_center"][:] if self.siglev_center is None: logger.info("Reading siglev_centers into memory...") self.siglev_center = self.dataset["siglev_center"][:] if any([d.startswith("siglay") for d in var.dims]): sigmas = self.siglay_center[:] elif any([d.startswith("siglev") for d in var.dims]): sigmas = self.siglev_center[:] else: raise ValueError if self.ocean_depth_nele is None: logger.info("Reading ocean depth center into memory...") self.ocean_depth_nele = self.dataset["h_center"][:] # Calculating depths from sigmas depths = self.z_from_sigma(sigmas, self.ocean_depth_nele[el]) return self._vector_nearest_(depths, z)
[docs] @staticmethod def z_from_sigma(sigma, depth, elevation=0): """ Calculate z-depth from sigma constant. https://rdrr.io/github/edwardlavender/fvcom.tbx/src/R/depth_from_known.R Args: sigma Sigma coefficient(s) depth Depth below mean sea-surface elevation Elevation of sea-surface (e.g. tidal) Returns: z, depth below sea-surface in meters. """ return sigma * (depth + elevation)
[docs] def open_fvcom_files_as_xarray(fname, load=False, **kwarg): """ Works around xarray limitation to load FVCOM grid data """ from glob import glob with Dataset(sorted(glob(fname))[0]) as f: ds = xr.open_mfdataset( fname, decode_times=False, drop_variables=["siglay", "siglev"], data_vars="minimal", **kwarg, ) if "siglay" in ds.dims: ds = _rename_sigma(ds, f, "siglay") if "siglev" in ds.dims: ds = _rename_sigma(ds, f, "siglev") times = ds.Itime + ds.Itime2 / 86400e3 ds["time"] = _timestamps_from_days_since_epoch(times.values) for dim, v in [ ("time", "Itime"), ("time", "Itime2"), ("node", "lat"), ("node", "lon"), ("nele", "lonc"), ("nele", "latc"), ("node", "x"), ("node", "y"), ("nele", "xc"), ("nele", "yc"), ]: if v in ds.data_vars: ds = ds.assign_coords({v: (dim, ds[v].values)}) return ds
[docs] def _timestamps_from_days_since_epoch(days): """ Convert from iterable of days since epoch to a list of pandas Timestamps """ import pandas as pd return [pd.Timestamp("1858-11-17 00:00:00") + pd.Timedelta(days=d) for d in days]
[docs] def _rename_sigma(ds, f, name): """ name: siglay or siglev """ dim_name = name + "_dim" ds = ds.rename_dims({name: dim_name}) ds = ds.assign( variables={ name: ((dim_name, "node"), np.copy(f[name])), } ) ds = ds.assign_coords( { name: ds[name].isel(node=0).values, name + "_center": ds[name + "_center"].isel(nele=0).values, } ) return ds