Source code for hydropandas.extensions.gwobs

import logging

import numpy as np
import pandas as pd
from pandas.api.types import is_numeric_dtype

try:
    from tqdm import tqdm
except ModuleNotFoundError:
    tqdm = None

from . import accessor

logger = logging.getLogger(__name__)


[docs]def get_model_layer_z(z, zvec, left=-999, right=999): """Get index of model layer based on elevation. Assumptions: - the highest value in zvec is the top of model layer 0. - if z is equal to the bottom of a layer, the model layer above that layer is assigned. Parameters ---------- z : int, float elevation. zvec : list, np.array elevations of model layers. shape is nlay + 1 left : int, optional if z is below the lowest value in zvec, this value is returned. The default is -999. right : TYPE, optional if z is above the highest value in zvec, this value is returned. The default is 999. Returns ------- int : int model layer Examples -------- >>> zvec = [0, -10, -20, -30] >>> get_model_layer_z(-5, zvec) 0 >>> get_model_layer_z(-25, zvec) 2 >>> get_model_layer_z(-50, zvec) -999 >>> get_model_layer_z(100, zvec) 999 >>> get_model_layer_z(-20, zvec) 1 """ # make sure zvec is descending zvec = np.sort(zvec)[::-1] if z in zvec: z += 1e-10 return int( np.interp(z, zvec[::-1], np.arange(len(zvec))[::-1], left=left, right=right) )
[docs]def check_if_var_is_invalid(var): return bool(var is None or np.isnan(var))
[docs]def get_modellayer_from_screen_depth(ftop, fbot, zvec, left=-999, right=999): """ Parameters ---------- ftop : int or float top of screen. fbot : int or float bottom of screen, has to be lower than ftop. zvec : list or np.array elevations of the modellayers at the location of the tube. left : int, optional value to return if tube screen is below the modellayers. The default is -999. right : int, optional value to return if tube screen is above the modellayers. The default is-999. Raises ------ ValueError raised if something unexpected happens. Returns ------- int or np.nan modellayer. examples -------- >>> zvec = [0, -10, -20, -30, -40] >>> get_modellayer_from_screen_depth(-5, -7, zvec) 0 >>> get_modellayer_from_screen_depth(-25, -27, zvec) 2 >>> get_modellayer_from_screen_depth(-15, -27, zvec) 2 >>> get_modellayer_from_screen_depth(-5, -27, zvec) 1 >>> get_modellayer_from_screen_depth(-5, -37, zvec) 1 >>> get_modellayer_from_screen_depth(15, -5, zvec) 0 >>> get_modellayer_from_screen_depth(15, 5, zvec) 999 >>> get_modellayer_from_screen_depth(-55, -65, zvec) -999 >>> get_modellayer_from_screen_depth(15, -65, zvec) nan >>> get_modellayer_from_screen_depth(None, -7, zvec) 0 >>> get_modellayer_from_screen_depth(None, None, zvec) nan """ if isinstance(zvec, np.ndarray) and np.isnan(zvec).all(): logger.warning("vertical datum invalid returning nan") return np.nan zvec = np.sort(zvec)[::-1] ftop_invalid = check_if_var_is_invalid(ftop) fbot_invalid = check_if_var_is_invalid(fbot) if ftop_invalid and fbot_invalid: logger.error("- values for ftop and fbot are invalid!") return np.nan elif fbot_invalid: lay_ftop = get_model_layer_z(ftop, zvec, left=left, right=right) logger.error(f"- fbot invalid. selected based on ftop: {lay_ftop}") return lay_ftop elif ftop_invalid: lay_fbot = get_model_layer_z(fbot, zvec, left=left, right=right) logger.error(f"- ftop invalid. selected based on fbot: {lay_fbot}") return lay_fbot if ftop < fbot: logger.warning("- tube screen top below tube screen bot, switching top and bot") fbot, ftop = ftop, fbot lay_ftop = get_model_layer_z(ftop, zvec, left=left, right=right) lay_fbot = get_model_layer_z(fbot, zvec, left=left, right=right) # Piezometer in layer in which majority of screen is located if lay_fbot == lay_ftop: logger.debug(f"- selected layer {lay_fbot}") return lay_fbot else: if lay_fbot == left and lay_ftop == right: logger.debug("- tube screen spans all layers. return nan") return np.nan elif lay_ftop == right: logger.debug( f"- tube screen top higher than top layer. selected layer {lay_fbot}" ) return lay_fbot elif lay_fbot == left: logger.debug( f"- tube screen bot lower than bottom layer. selected layer {lay_ftop}" ) return lay_ftop logger.debug( f"- tube screen crosses layer boundary:\n - layers: {lay_ftop}, {lay_fbot}" ) logger.debug( f"- tube screen spans {lay_fbot - lay_ftop + 1} layers." " checking length per layer\n" " - length per layer:" ) # check which layer has the biggest length of the tube screen length_layers = np.zeros(lay_fbot - lay_ftop + 1) for i in range(len(length_layers)): if i == 0: length_layers[i] = ftop - zvec[lay_ftop + 1] elif (i + 1) == len(length_layers): length_layers[i] = zvec[lay_ftop + i] - fbot else: length_layers[i] = zvec[lay_ftop + i] - zvec[lay_ftop + 1 + i] logger.debug(f" - lay {lay_ftop + i}: {length_layers[i]:.2f}") # choose layer with biggest length rel_layer = np.argmax(length_layers) lay_out = lay_ftop + rel_layer logger.debug(f" - selected layer: {lay_out}") return lay_out
[docs]def get_zvec(x, y, gwf=None, ds=None): """get a list with the vertical layer boundaries at a point in the model. Parameters ---------- x : int or float x coordinate. y : int or float y coordinate. gwf : flopy.mf6.modflow.mfgwf.ModflowGwf modflow model with top and bottoms ds : xarray.Dataset xarray Dataset typically created in nlmod. Must have variables 'top' and 'botm'. Raises ------ NotImplementedError not all grid types are supported yet. Returns ------- zvec : list list of vertical layer boundaries. length is nlay + 1. """ import flopy from shapely.geometry import Point if gwf and not ds: ix = flopy.utils.GridIntersect(gwf.modelgrid) if gwf.modelgrid.grid_type == "structured": res = ix.intersect( Point(x, y), ) if len(res) > 0: r, c = res["cellids"][0] zvec = np.array( [gwf.modelgrid.top[r, c]] + [gwf.modelgrid.botm[i, r, c] for i in range(gwf.modelgrid.nlay)] ) else: print("Point is not within model extent! Returning NaN.") zvec = np.nan elif gwf.modelgrid.grid_type == "vertex": res = ix.intersect([(x, y)], shapetype="point") idx = res["cellids"].astype(int)[0] if len(res) > 0: zvec = [gwf.modelgrid.top[idx]] + [ gwf.modelgrid.botm[i, idx] for i in range(gwf.modelgrid.nlay) ] else: print("Point is not within model extent! Returning NaN.") zvec = np.nan else: raise NotImplementedError( f"gridtype {gwf.modelgrid.grid_type} not (yet) implemented" ) elif ds and not gwf: # assuming modflow type definition with 1 top and N botms # first check if point is within the extent if "angrot" in ds.attrs and ds.attrs["angrot"] != 0.0: import nlmod pol = nlmod.dims.get_extent_polygon(ds) if not Point(x, y).within(pol): return np.nan else: xmin, xmax, ymin, ymax = ds.attrs["extent"] if (x < xmin) or (x > xmax) or (y < ymin) or (y > ymax): return np.nan if ds.gridtype == "vertex": import nlmod cid = nlmod.dims.xy_to_icell2d((x, y), ds) sel = ds.sel(icell2d=cid) zvec = np.concatenate(([sel["top"].values], sel["botm"].values)) mask = np.isnan(zvec) idx = np.where(~mask, np.arange(mask.size), 0) np.maximum.accumulate(idx, axis=0, out=idx) zvec[mask] = zvec[idx[mask]] else: sel = ds.sel(x=x, y=y, method="nearest") first_notna = np.nonzero(np.isfinite(np.atleast_1d(sel["top"].values)))[0][ 0 ] if sel["top"].values.shape == (): top = np.atleast_1d(sel["top"].values) else: top = np.atleast_1d(sel["top"].values[[first_notna]]) zvec = np.concatenate([top, sel["botm"].values]) mask = np.isnan(zvec) idx = np.where(~mask, np.arange(mask.size), 0) np.maximum.accumulate(idx, axis=0, out=idx) zvec[mask] = zvec[idx[mask]] else: raise ValueError("Pass one of 'gwf' or 'ds'!") return zvec
[docs]@accessor.register_obscollection_accessor("gwobs") class GwObsAccessor: def __init__(self, oc_obj): self._obj = oc_obj
[docs] def set_tube_nr( self, radius=1, xcol="x", ycol="y", if_exists="error", add_to_meta=False ): """Set the tube numbers based on the location of the observations. The value of the tube number is set: - In the ObsCollection dataframe - As the attribute of an Obs object - optionally: In the meta dictionary of the Obs object (only if add_to_meta is True) This method is useful for groundwater observations. If two or more observation points are close to each other they will be seen as one monitoring_well with multiple tubes. The tube_nr is based on the 'screen_bottom' attribute of the observations in such a way that the deepest tube has the highest tube number. Parameters ---------- radius : int, optional max distance between two observations to be seen as one location, by default 1 xcol : str, optional column name with x coordinates, by default 'x' ycol : str, optional column name with y coordinates, by default 'y' if_exists : str, optional what to do if an observation point already has a tube_nr, options: 'error', 'replace' or 'keep', by default 'error' add_to_meta : bool, optional if True the tube_nr is added to the meta dictionary of an observation. The default is False. Raises ------ RuntimeError if the column tube_nr exists and if_exists='error' an error is raised """ # check if column exists in obscollection if "tube_nr" in self._obj.columns: # set type to numeric if not is_numeric_dtype(self._obj["tube_nr"]): self._obj["tube_nr"] = pd.to_numeric( self._obj["tube_nr"], errors="coerce" ) # check if name should be replaced if if_exists == "error": raise RuntimeError( "the column 'tube_nr' already exist, set" "if_exists='replace' to replace the current values" ) elif if_exists == "replace": self._obj["tube_nr"] = np.nan else: self._obj["tube_nr"] = np.nan # apply tube numbers to tubes that are close to eachother for name in self._obj.index: if np.isnan(self._obj.loc[name, "tube_nr"]): x = self._obj.loc[name, xcol] y = self._obj.loc[name, ycol] distance_to_other_tubes = np.sqrt( (self._obj[xcol] - x) ** 2 + (self._obj[ycol] - y) ** 2 ) dup_x = self._obj.loc[distance_to_other_tubes < radius] if dup_x.shape[0] == 1: self._obj.set_metadata_value( name, "tube_nr", 1, add_to_meta=add_to_meta ) else: dup_x2 = dup_x.sort_values("screen_bottom", ascending=False) for i, pb_dub in enumerate(dup_x2.index): self._obj.set_metadata_value( pb_dub, "tube_nr", i + 1, add_to_meta=add_to_meta )
[docs] def set_tube_nr_location( self, loc_col, radius=1, xcol="x", ycol="y", if_exists="error", add_to_meta=False, ): """This method sets the tube_nr and location attributes of a GroundwaterObs based on the location and screen depth of the observations. The value of the tube_nr and the location are set: - in the ObsCollection dataframe - as the attribute of an Obs object - optional: in the meta dictionary of the Obs object (only if add_to_meta is True) If two or more observations are close to each other they will be seen as one location with multiple tubes. The tube_nr is based on the 'screen_bottom' attribute of the observations in such a way that the deepest tube has the highest tube number. The location name is based on the name in the loc_col of the screen with the lowest tube_nr. Parameters ---------- loc_col : str the column name with the names to use for the location radius : int, optional max distance between two observations to be seen as one location, by default 1 xcol : str, optional column name with x coordinates, by default 'x' ycol : str, optional column name with y coordinates, by default 'y' if_exists : str, optional what to do if an observation point already has a tube_nr, options: 'error', 'replace' or 'keep', by default 'error' add_to_meta : bool, optional if True the tube_nr and location are added to the meta dictionary of an observation. The default is False. Raises ------ RuntimeError if the column tube_nr exists and if_exists='error' an error is raised """ # check if columns exists in obscollection if "tube_nr" in self._obj.columns or "location" in self._obj.columns: if if_exists == "error": raise RuntimeError( "the column 'tube_nr or location' already exist, set" "if_exists='replace' to replace the current values" ) elif if_exists == "replace": self._obj["tube_nr"] = np.nan self._obj["location"] = np.nan else: self._obj["tube_nr"] = np.nan self._obj["location"] = np.nan # apply tube numbers to tubes that are close to eachother for name in self._obj.index: if np.isnan(self._obj.loc[name, "tube_nr"]): x = self._obj.loc[name, xcol] y = self._obj.loc[name, ycol] distance_to_other_tubes = np.sqrt( (self._obj[xcol] - x) ** 2 + (self._obj[ycol] - y) ** 2 ) dup_x = self._obj.loc[distance_to_other_tubes < radius] if dup_x.shape[0] == 1: self._obj.set_metadata_value( name, "tube_nr", 1, add_to_meta=add_to_meta ) location = self._obj.loc[name, loc_col] self._obj.set_metadata_value( name, "location", location, add_to_meta=add_to_meta, ) else: dup_x2 = dup_x.sort_values("screen_bottom", ascending=False) for i, pb_dub in enumerate(dup_x2.index): if i == 0: location = self._obj.loc[pb_dub, loc_col] self._obj.set_metadata_value( pb_dub, "tube_nr", i + 1, add_to_meta=add_to_meta ) self._obj.set_metadata_value( pb_dub, "location", location, add_to_meta=add_to_meta, )
[docs] def get_modellayers(self, gwf=None, ds=None): """Get the modellayer per observation. The layers can be obtained from the modflow model or can be defined in zgr. Parameters ---------- gwf : flopy.mf6.modflow.mfgwf.ModflowGwf modflow model ds : xarray.Dataset xarray Dataset with with top and bottoms, must have dimensions 'x' and 'y' and variables 'top' and 'bot' Returns ------- pd.Series with the modellayers of each observation """ msg = ( "the get_modellayers method is deprecated, please use the nlmod " "function: nlmod.layer.get_modellayers_screens" ) logger.warning(msg) modellayers = [] for o in self._obj.obs.values: logger.debug("-" * 10 + f"\n {o.name}:") modellayers.append(o.gwobs.get_modellayer_modflow(gwf=gwf, ds=ds)) return pd.Series(index=self._obj.index, data=modellayers, name="modellayer")
[docs] def get_regis_layers(self): """Get the regis layer per observation. Parameters ---------- Returns ------- pd.Series with the names of the regis layer of each observation """ if tqdm is not None: tqdm.pandas() return self._obj.obs.progress_apply(lambda o: o.gwobs.get_regis_layer()) else: return self._obj.obs.apply(lambda o: o.gwobs.get_regis_layer())
[docs]@accessor.register_obs_accessor("gwobs") class GeoAccessorObs: def __init__(self, obs): self._obj = obs
[docs] def get_modellayer_modflow(self, gwf=None, ds=None, left=-999, right=999): """Add modellayer to meta dictionary. Parameters ---------- gwf : flopy.mf6.modflow.mfgwf.ModflowGwf modflow model ds : xarray.Dataset xarray Dataset with with top and bottoms, must have dimensions 'x' and 'y' and variables 'top' and 'bot' Returns ------- int modellayer """ if np.isnan(self._obj.x) or np.isnan(self._obj.y): logger.warning("x or y coordinate is NaN, returning NaN") return np.nan zvec = get_zvec(self._obj.x, self._obj.y, gwf=gwf, ds=ds) if np.all(np.isnan(zvec)): return np.nan else: return get_modellayer_from_screen_depth( self._obj.screen_top, self._obj.screen_bottom, zvec, left=left, right=right, )
[docs] def get_regis_layer(self): """find the name of the REGIS layer based on the tube screen depth. Parameters ---------- Returns ------- str name of REGIS layer """ import xarray as xr if np.isnan(self._obj.screen_top) or np.isnan(self._obj.screen_bottom): return "nan" # connect to regis netcdf regis_url = "https://www.dinodata.nl/opendap/REGIS/REGIS.nc" regis_ds = xr.open_dataset(regis_url, decode_times=False, decode_coords="all") # rename layer in regis netcdf regis_ds = regis_ds.rename({"layer": "layer_old"}) regis_ds.coords["layer"] = regis_ds.layer_old.astype(str) regis_ds = regis_ds.swap_dims({"layer_old": "layer"}) # get zvec regis netcdf z = ( regis_ds["bottom"] .sel(x=self._obj.x, y=self._obj.y, method="nearest") .dropna(dim="layer") ) ztop = regis_ds["top"].sel(x=self._obj.x, y=self._obj.y, method="nearest").max() zvec = np.concatenate([[ztop.values], z.values]) # get index of regis model layer layer_i = get_modellayer_from_screen_depth( self._obj.screen_top, self._obj.screen_bottom, zvec, left=-999, right=999, ) if layer_i == 999: return "above" elif layer_i == -999: return "below" return str(z.layer[layer_i].values)