Source code for opendrift.readers

"""
Readers
=======

Readers are responsible for providing Opendrift with data about the
`enviornment` of the drifting particles or elements.

All readers descend from :class:`.basereader.BaseReader`. A reader generally also descends from one of the few general classes of readers. When writing a new reader consider which one fits your input data best:

    * :class:`.basereader.continuous.ContinuousReader`
    * :class:`.basereader.structured.StructuredReader`
    * :class:`.basereader.unstructured.UnstructuredReader`

The `ContinuousReader` is suited for data that can be defined at any point within the domain, or if the reader does its own interpolation internally. E.g. a :class:`synthetic eddy <.reader_ArtificialOceanEddy.Reader>`, or a :class:`constant <.reader_constant.Reader>`. The `StructuredReader` aids in interpolation when creating a reader of data on a :class:`regular grid <.reader_netCDF_CF_generic.Reader>`, while the `UnstructuredReader` provides the basics for :class:`irregularily gridded data <.reader_netCDF_CF_unstructured.Reader>` (e.g. finite volume models).

.. seealso::

    See the reader-types or reader-implementations for more details.

    See :class:`.basereader.BaseReader` for how readers work internally.
"""

import os
from datetime import datetime, timedelta
import pkgutil
import importlib
from urllib.parse import urlparse, parse_qsl
import logging; logger = logging.getLogger(__name__)
import glob
import json
import numpy as np
import opendrift
import xarray as xr
import copernicusmarine


[docs] def open_dataset_opendrift(source, zarr_storage_options=None, open_mfdataset_options={}, chunks=None): """ Wrapper around Xarray open_dataset and open_mfdataset. Common wrapper/opener to be used for all Xarray based readers xarray.open_dataset will be used if source is: - a single netCDF file or OPeNDAP URL xarray.open_mfdataset will be used if source is: - a list of netCDF files - a filename with wildcards (* ? or [) cf-times are decoded after removing any offending variables (e.g. if units equals "hours since analysis") """ if chunks is not None: open_mfdataset_options['chunks'] = chunks if isinstance(source, xr.Dataset): ds = source elif zarr_storage_options is not None: # This could better be handled outside of this method ds = xr.open_zarr(source, storage_options=zarr_storage_options) ds.name = source elif isinstance(source, list) or any(s in str(source) for s in [ '*', '?', '[' ]): logger.info('Opening files with xarray.open_mfdataset') ds = xr.open_mfdataset(source, data_vars='minimal', coords='minimal', compat='override', decode_times=False, **open_mfdataset_options) else: logger.info('Opening file with xr.open_dataset') ds = xr.open_dataset(source, decode_times=False) # Decode CF times offending = ds.filter_by_attrs(units='hours since analysis') # Found e.g. in HYCOM datasets if len(offending) == 0: offending = ds.filter_by_attrs(units='msec since 00:00:00') # Found e.g. in FVCOM if len(offending) > 0: logger.warning(f'Removing variables that cannot be CF decoded: {list(offending.variables)}') ds = ds.drop_vars(offending) ds = xr.decode_cf(ds, decode_times=True) # Chunk size of time dimension should be 1 ds = ds.unify_chunks() for dim,chunksize in ds.chunks.items(): chunksize = chunksize[0] if 'time' in dim: if chunksize > 1: # Note: ds = ds.chunk({dim: 1}) does not have the desired effect logger.warning(f'Chunk size for dimension {dim} is {chunksize}. Reopening dataset with chunk {dim}:1.') ds = open_dataset_opendrift(source, zarr_storage_options=zarr_storage_options, open_mfdataset_options=open_mfdataset_options, chunks={dim: 1}) return ds
[docs] def add_standard_name_for_surface_grib_variables(ds): """Try to identify surface variables from GRIB datasets and add standard_name attribute""" GRIB_parameters = { 'wind_speed': {'Grib2_Parameter': np.array([0, 2, 1])}, 'eastward_wind': {'Grib2_Parameter': np.array([0, 2, 2])}, 'northward_wind': {'Grib2_Parameter': np.array([0, 2, 3])}, } surface_variables = {'u10': 'eastward_wind', 'v10': 'northward_wind'} for var_name, var in ds.data_vars.items(): if any(forbidden.lower() in var_name.lower() for forbidden in ('percentile', 'pctl')): logger.debug(f'Skipping percentile variable {var_name}') continue if 'standard_name' in var.attrs and var.attrs['standard_name'] != 'unknown': continue if var_name in surface_variables: standard_name = surface_variables[var_name] logger.debug(f'Selecting GRIB variable {var_name} at 10m height and adding standard_name {standard_name}') ds[var_name].attrs['standard_name'] = standard_name continue if var.attrs.get('Grib2_Level_Type', None) != 103: continue # Not surface variable for standard_name, maps in GRIB_parameters.items(): if isinstance(maps, dict): maps = [maps] if any( np.all(attval == var.attrs.get(att)) # attribute value is found in map for ma in maps for att, attval in ma.items() if att in var.attrs # variable has attribute ): # Must check that height is 10m or surface for coordname, coordvar in var.coords.items(): if coordvar.attrs.get('_CoordinateAxisType', None) == 'Height' and 10 in coordvar.values: logger.debug(f'Selecting GRIB variable {var_name} at 10m height and adding standard_name {standard_name}') ds[var_name] = var.sel({coordname: 10}) ds[var_name].attrs['standard_name'] = standard_name return ds
[docs] def datetime_from_variable(var): import pandas as pd try: return pd.to_datetime(var).to_pydatetime() except: logger.warning('Could not decode time with Pandas') datetimeindex = var.to_index().to_datetimeindex() times = pd.to_datetime(datetimeindex).to_pydatetime() logger.info('Decoded time through datetimeindex') return times
[docs] def open_mfdataset_overlap(url_base, time_series=None, start_time=None, end_time=None, freq=None, timedim='time'): if time_series is None: construct_from_times urls = [t.strftime(url_base) for t in time_series] time_step = time_series[1] - time_series[0] print('Opening individual URLs...') chunks = {'time': 1, 'depth': 1, 'Y': 17, 'X': 2602} datasets = [xr.open_dataset(u, chunks=chunks).sel({timedim: slice(t, t+time_step-timedelta(seconds=1))}) for u,t in zip(urls, time_series)] print('Concatenating...') ds = xr.concat(datasets, dim=timedim, compat='override', combine_attrs='override', join='override', coords='minimal', data_vars='minimal') return ds
[docs] def all_readers(): '''Return a list of all readers in subpackage opendrift.readers''' # Import the sub-package subpackage_name = 'opendrift.readers' subpackage = importlib.import_module(subpackage_name) # Get the path of the sub-package package_path = subpackage.__path__ # List all modules in the sub-package modules = [] for _, module_name, is_pkg in pkgutil.iter_modules(package_path): if module_name.startswith('reader_'): #full_module_name = f"{subpackage_name}.{module_name}" modules.append(module_name) return modules
[docs] def applicable_readers(url): '''Return a list of readers that are possible candidates for a given URL, filename or product ID''' from opendrift.readers import reader_netCDF_CF_generic from opendrift.readers import reader_netCDF_CF_unstructured from opendrift.readers import reader_ROMS_native from opendrift.readers import reader_copernicusmarine if len(glob.glob(url)) > 0 or any(e in url for e in [':', '/']): return [reader_netCDF_CF_generic, reader_ROMS_native, reader_netCDF_CF_unstructured] else: try : copernicusmarine.describe(dataset_id=url) return [reader_copernicusmarine] except : return []
#def reader_from_url(url, timeout=10): # '''Make readers from URLs or paths to datasets''' # # if isinstance(url, list): # return [reader_from_url(u) for u in url] # # try: # Initialise reader from JSON string # j = json.loads(url) # try: # reader_module = importlib.import_module( # 'opendrift.readers.' + j['reader']) # reader = getattr(reader_module, 'Reader') # del j['reader'] # reader = reader(**j) # return reader # except Exception as e: # logger.warning('Creating reader from JSON failed:') # logger.warning(e) # except: # pass # # reader_modules = applicable_readers(url) # # for reader_module in reader_modules: # try: # logger.debug(f'Testing reader {reader_module}') # r = reader_module.Reader(url) # return r # except Exception as e: # logger.debug('Could not open %s with %s' % (url, reader_module)) # # return None # No readers worked
[docs] def reader_from_urlpath(urlpath): if isinstance(urlpath, list): return [reader_from_urlpath(u) for u in urlpath] available_readers = all_readers() parsed = urlparse(urlpath.strip()) if len(parsed.path.split('://')) > 1: requested_reader = parsed.path.split('://')[0] path = parsed.path.split('://')[1] else: requested_reader = None path = parsed.path query = parse_qsl(parsed.query) if len(query) == 0: options = {} else: options = dict(query) if requested_reader is not None: try: reader_module = importlib.import_module(f'opendrift.readers.{requested_reader}') except: logger.debug(f'Cannot import reader {requested_reader}') return None if path == '': reader = reader_module.Reader(**options) else: reader = reader_module.Reader(path, **options) return reader if os.path.exists(urlpath) or path != '': # URL or filename, no reader specified # We try different readers, returning first successful reader_modules = applicable_readers(urlpath) for reader_module in reader_modules: try: logger.debug(f'Testing reader {reader_module}') reader = reader_module.Reader(urlpath) return reader except Exception as e: logger.debug('Could not open %s with %s' % (urlpath, reader_module)) return None # No readers worked