import logging
import numpy as np
import pandas as pd
from pandas.api.types import is_numeric_dtype
try:
from tqdm import tqdm
except ModuleNotFoundError:
tqdm = None
from . import accessor
logger = logging.getLogger(__name__)
[docs]def get_model_layer_z(z, zvec, left=-999, right=999):
"""Get index of model layer based on elevation.
Assumptions:
- the highest value in zvec is the top of model layer 0.
- if z is equal to the bottom of a layer, the model layer above that
layer is assigned.
Parameters
----------
z : int, float
elevation.
zvec : list, np.array
elevations of model layers. shape is nlay + 1
left : int, optional
if z is below the lowest value in zvec, this value is returned.
The default is -999.
right : TYPE, optional
if z is above the highest value in zvec, this value is returned.
The default is 999.
Returns
-------
int : int
model layer
Examples
--------
>>> zvec = [0, -10, -20, -30]
>>> get_model_layer_z(-5, zvec)
0
>>> get_model_layer_z(-25, zvec)
2
>>> get_model_layer_z(-50, zvec)
-999
>>> get_model_layer_z(100, zvec)
999
>>> get_model_layer_z(-20, zvec)
1
"""
# make sure zvec is descending
zvec = np.sort(zvec)[::-1]
if z in zvec:
z += 1e-10
return int(
np.interp(z, zvec[::-1], np.arange(len(zvec))[::-1], left=left, right=right)
)
[docs]def check_if_var_is_invalid(var):
return bool(var is None or np.isnan(var))
[docs]def get_modellayer_from_screen_depth(ftop, fbot, zvec, left=-999, right=999):
"""
Parameters
----------
ftop : int or float
top of screen.
fbot : int or float
bottom of screen, has to be lower than ftop.
zvec : list or np.array
elevations of the modellayers at the location of the tube.
left : int, optional
value to return if tube screen is below the modellayers.
The default is -999.
right : int, optional
value to return if tube screen is above the modellayers.
The default is-999.
Raises
------
ValueError
raised if something unexpected happens.
Returns
-------
int or np.nan
modellayer.
examples
--------
>>> zvec = [0, -10, -20, -30, -40]
>>> get_modellayer_from_screen_depth(-5, -7, zvec)
0
>>> get_modellayer_from_screen_depth(-25, -27, zvec)
2
>>> get_modellayer_from_screen_depth(-15, -27, zvec)
2
>>> get_modellayer_from_screen_depth(-5, -27, zvec)
1
>>> get_modellayer_from_screen_depth(-5, -37, zvec)
1
>>> get_modellayer_from_screen_depth(15, -5, zvec)
0
>>> get_modellayer_from_screen_depth(15, 5, zvec)
999
>>> get_modellayer_from_screen_depth(-55, -65, zvec)
-999
>>> get_modellayer_from_screen_depth(15, -65, zvec)
nan
>>> get_modellayer_from_screen_depth(None, -7, zvec)
0
>>> get_modellayer_from_screen_depth(None, None, zvec)
nan
"""
if isinstance(zvec, np.ndarray) and np.isnan(zvec).all():
logger.warning("vertical datum invalid returning nan")
return np.nan
zvec = np.sort(zvec)[::-1]
ftop_invalid = check_if_var_is_invalid(ftop)
fbot_invalid = check_if_var_is_invalid(fbot)
if ftop_invalid and fbot_invalid:
logger.error("- values for ftop and fbot are invalid!")
return np.nan
elif fbot_invalid:
lay_ftop = get_model_layer_z(ftop, zvec, left=left, right=right)
logger.error(f"- fbot invalid. selected based on ftop: {lay_ftop}")
return lay_ftop
elif ftop_invalid:
lay_fbot = get_model_layer_z(fbot, zvec, left=left, right=right)
logger.error(f"- ftop invalid. selected based on fbot: {lay_fbot}")
return lay_fbot
if ftop < fbot:
logger.warning("- tube screen top below tube screen bot, switching top and bot")
fbot, ftop = ftop, fbot
lay_ftop = get_model_layer_z(ftop, zvec, left=left, right=right)
lay_fbot = get_model_layer_z(fbot, zvec, left=left, right=right)
# Piezometer in layer in which majority of screen is located
if lay_fbot == lay_ftop:
logger.debug(f"- selected layer {lay_fbot}")
return lay_fbot
else:
if lay_fbot == left and lay_ftop == right:
logger.debug("- tube screen spans all layers. return nan")
return np.nan
elif lay_ftop == right:
logger.debug(
f"- tube screen top higher than top layer. selected layer {lay_fbot}"
)
return lay_fbot
elif lay_fbot == left:
logger.debug(
f"- tube screen bot lower than bottom layer. selected layer {lay_ftop}"
)
return lay_ftop
logger.debug(
f"- tube screen crosses layer boundary:\n - layers: {lay_ftop}, {lay_fbot}"
)
logger.debug(
f"- tube screen spans {lay_fbot - lay_ftop + 1} layers."
" checking length per layer\n"
" - length per layer:"
)
# check which layer has the biggest length of the tube screen
length_layers = np.zeros(lay_fbot - lay_ftop + 1)
for i in range(len(length_layers)):
if i == 0:
length_layers[i] = ftop - zvec[lay_ftop + 1]
elif (i + 1) == len(length_layers):
length_layers[i] = zvec[lay_ftop + i] - fbot
else:
length_layers[i] = zvec[lay_ftop + i] - zvec[lay_ftop + 1 + i]
logger.debug(f" - lay {lay_ftop + i}: {length_layers[i]:.2f}")
# choose layer with biggest length
rel_layer = np.argmax(length_layers)
lay_out = lay_ftop + rel_layer
logger.debug(f" - selected layer: {lay_out}")
return lay_out
[docs]def get_zvec(x, y, gwf=None, ds=None):
"""get a list with the vertical layer boundaries at a point in the model.
Parameters
----------
x : int or float
x coordinate.
y : int or float
y coordinate.
gwf : flopy.mf6.modflow.mfgwf.ModflowGwf
modflow model with top and bottoms
ds : xarray.Dataset
xarray Dataset typically created in nlmod. Must have
variables 'top' and 'botm'.
Raises
------
NotImplementedError
not all grid types are supported yet.
Returns
-------
zvec : list
list of vertical layer boundaries. length is nlay + 1.
"""
import flopy
from shapely.geometry import Point
if gwf and not ds:
ix = flopy.utils.GridIntersect(gwf.modelgrid)
if gwf.modelgrid.grid_type == "structured":
res = ix.intersect(
Point(x, y),
)
if len(res) > 0:
r, c = res["cellids"][0]
zvec = np.array(
[gwf.modelgrid.top[r, c]]
+ [gwf.modelgrid.botm[i, r, c] for i in range(gwf.modelgrid.nlay)]
)
else:
print("Point is not within model extent! Returning NaN.")
zvec = np.nan
elif gwf.modelgrid.grid_type == "vertex":
res = ix.intersect([(x, y)], shapetype="point")
idx = res["cellids"].astype(int)[0]
if len(res) > 0:
zvec = [gwf.modelgrid.top[idx]] + [
gwf.modelgrid.botm[i, idx] for i in range(gwf.modelgrid.nlay)
]
else:
print("Point is not within model extent! Returning NaN.")
zvec = np.nan
else:
raise NotImplementedError(
f"gridtype {gwf.modelgrid.grid_type} not (yet) implemented"
)
elif ds and not gwf:
# assuming modflow type definition with 1 top and N botms
# first check if point is within the extent
if "angrot" in ds.attrs and ds.attrs["angrot"] != 0.0:
import nlmod
pol = nlmod.dims.get_extent_polygon(ds)
if not Point(x, y).within(pol):
return np.nan
else:
xmin, xmax, ymin, ymax = ds.attrs["extent"]
if (x < xmin) or (x > xmax) or (y < ymin) or (y > ymax):
return np.nan
if ds.gridtype == "vertex":
import nlmod
cid = nlmod.dims.xy_to_icell2d((x, y), ds)
sel = ds.sel(icell2d=cid)
zvec = np.concatenate(([sel["top"].values], sel["botm"].values))
mask = np.isnan(zvec)
idx = np.where(~mask, np.arange(mask.size), 0)
np.maximum.accumulate(idx, axis=0, out=idx)
zvec[mask] = zvec[idx[mask]]
else:
sel = ds.sel(x=x, y=y, method="nearest")
first_notna = np.nonzero(np.isfinite(np.atleast_1d(sel["top"].values)))[0][
0
]
if sel["top"].values.shape == ():
top = np.atleast_1d(sel["top"].values)
else:
top = np.atleast_1d(sel["top"].values[[first_notna]])
zvec = np.concatenate([top, sel["botm"].values])
mask = np.isnan(zvec)
idx = np.where(~mask, np.arange(mask.size), 0)
np.maximum.accumulate(idx, axis=0, out=idx)
zvec[mask] = zvec[idx[mask]]
else:
raise ValueError("Pass one of 'gwf' or 'ds'!")
return zvec
[docs]@accessor.register_obscollection_accessor("gwobs")
class GwObsAccessor:
def __init__(self, oc_obj):
self._obj = oc_obj
[docs] def set_tube_nr(
self, radius=1, xcol="x", ycol="y", if_exists="error", add_to_meta=False
):
"""Set the tube numbers based on the location of the observations.
The value of the tube number is set:
- In the ObsCollection dataframe
- As the attribute of an Obs object
- optionally: In the meta dictionary of the Obs object (only if add_to_meta is
True)
This method is useful for groundwater observations. If two or more
observation points are close to each other they will be seen as one
monitoring_well with multiple tubes. The tube_nr is based on the
'screen_bottom' attribute of the observations in such a way that
the deepest tube has the highest tube number.
Parameters
----------
radius : int, optional
max distance between two observations to be seen as one location,
by default 1
xcol : str, optional
column name with x coordinates, by default 'x'
ycol : str, optional
column name with y coordinates, by default 'y'
if_exists : str, optional
what to do if an observation point already has a tube_nr, options:
'error', 'replace' or 'keep', by default 'error'
add_to_meta : bool, optional
if True the tube_nr is added to the meta dictionary
of an observation. The default is False.
Raises
------
RuntimeError
if the column tube_nr exists and if_exists='error' an error is
raised
"""
# check if column exists in obscollection
if "tube_nr" in self._obj.columns:
# set type to numeric
if not is_numeric_dtype(self._obj["tube_nr"]):
self._obj["tube_nr"] = pd.to_numeric(
self._obj["tube_nr"], errors="coerce"
)
# check if name should be replaced
if if_exists == "error":
raise RuntimeError(
"the column 'tube_nr' already exist, set"
"if_exists='replace' to replace the current values"
)
elif if_exists == "replace":
self._obj["tube_nr"] = np.nan
else:
self._obj["tube_nr"] = np.nan
# apply tube numbers to tubes that are close to eachother
for name in self._obj.index:
if np.isnan(self._obj.loc[name, "tube_nr"]):
x = self._obj.loc[name, xcol]
y = self._obj.loc[name, ycol]
distance_to_other_tubes = np.sqrt(
(self._obj[xcol] - x) ** 2 + (self._obj[ycol] - y) ** 2
)
dup_x = self._obj.loc[distance_to_other_tubes < radius]
if dup_x.shape[0] == 1:
self._obj.set_metadata_value(
name, "tube_nr", 1, add_to_meta=add_to_meta
)
else:
dup_x2 = dup_x.sort_values("screen_bottom", ascending=False)
for i, pb_dub in enumerate(dup_x2.index):
self._obj.set_metadata_value(
pb_dub, "tube_nr", i + 1, add_to_meta=add_to_meta
)
[docs] def set_tube_nr_location(
self,
loc_col,
radius=1,
xcol="x",
ycol="y",
if_exists="error",
add_to_meta=False,
):
"""This method sets the tube_nr and location attributes of a GroundwaterObs
based on the location and screen depth of the observations.
The value of the tube_nr and the location are set:
- in the ObsCollection dataframe
- as the attribute of an Obs object
- optional: in the meta dictionary of the Obs object (only if add_to_meta is
True)
If two or more observations are close to each other they will be seen
as one location with multiple tubes. The tube_nr is based on the
'screen_bottom' attribute of the observations in such a way that
the deepest tube has the highest tube number. The location name is
based on the name in the loc_col of the screen with the lowest
tube_nr.
Parameters
----------
loc_col : str
the column name with the names to use for the location
radius : int, optional
max distance between two observations to be seen as one location,
by default 1
xcol : str, optional
column name with x coordinates, by default 'x'
ycol : str, optional
column name with y coordinates, by default 'y'
if_exists : str, optional
what to do if an observation point already has a tube_nr, options:
'error', 'replace' or 'keep', by default 'error'
add_to_meta : bool, optional
if True the tube_nr and location are added to the meta dictionary
of an observation. The default is False.
Raises
------
RuntimeError
if the column tube_nr exists and if_exists='error' an error
is raised
"""
# check if columns exists in obscollection
if "tube_nr" in self._obj.columns or "location" in self._obj.columns:
if if_exists == "error":
raise RuntimeError(
"the column 'tube_nr or location' already exist, set"
"if_exists='replace' to replace the current values"
)
elif if_exists == "replace":
self._obj["tube_nr"] = np.nan
self._obj["location"] = np.nan
else:
self._obj["tube_nr"] = np.nan
self._obj["location"] = np.nan
# apply tube numbers to tubes that are close to eachother
for name in self._obj.index:
if np.isnan(self._obj.loc[name, "tube_nr"]):
x = self._obj.loc[name, xcol]
y = self._obj.loc[name, ycol]
distance_to_other_tubes = np.sqrt(
(self._obj[xcol] - x) ** 2 + (self._obj[ycol] - y) ** 2
)
dup_x = self._obj.loc[distance_to_other_tubes < radius]
if dup_x.shape[0] == 1:
self._obj.set_metadata_value(
name, "tube_nr", 1, add_to_meta=add_to_meta
)
location = self._obj.loc[name, loc_col]
self._obj.set_metadata_value(
name,
"location",
location,
add_to_meta=add_to_meta,
)
else:
dup_x2 = dup_x.sort_values("screen_bottom", ascending=False)
for i, pb_dub in enumerate(dup_x2.index):
if i == 0:
location = self._obj.loc[pb_dub, loc_col]
self._obj.set_metadata_value(
pb_dub, "tube_nr", i + 1, add_to_meta=add_to_meta
)
self._obj.set_metadata_value(
pb_dub,
"location",
location,
add_to_meta=add_to_meta,
)
[docs] def get_modellayers(self, gwf=None, ds=None):
"""Get the modellayer per observation. The layers can be obtained from
the modflow model or can be defined in zgr.
Parameters
----------
gwf : flopy.mf6.modflow.mfgwf.ModflowGwf
modflow model
ds : xarray.Dataset
xarray Dataset with with top and bottoms, must have
dimensions 'x' and 'y' and variables 'top' and 'bot'
Returns
-------
pd.Series with the modellayers of each observation
"""
msg = (
"the get_modellayers method is deprecated, please use the nlmod "
"function: nlmod.layer.get_modellayers_screens"
)
logger.warning(msg)
modellayers = []
for o in self._obj.obs.values:
logger.debug("-" * 10 + f"\n {o.name}:")
modellayers.append(o.gwobs.get_modellayer_modflow(gwf=gwf, ds=ds))
return pd.Series(index=self._obj.index, data=modellayers, name="modellayer")
[docs] def get_regis_layers(self):
"""Get the regis layer per observation.
Parameters
----------
Returns
-------
pd.Series with the names of the regis layer of each observation
"""
if tqdm is not None:
tqdm.pandas()
return self._obj.obs.progress_apply(lambda o: o.gwobs.get_regis_layer())
else:
return self._obj.obs.apply(lambda o: o.gwobs.get_regis_layer())
[docs]@accessor.register_obs_accessor("gwobs")
class GeoAccessorObs:
def __init__(self, obs):
self._obj = obs
[docs] def get_modellayer_modflow(self, gwf=None, ds=None, left=-999, right=999):
"""Add modellayer to meta dictionary.
Parameters
----------
gwf : flopy.mf6.modflow.mfgwf.ModflowGwf
modflow model
ds : xarray.Dataset
xarray Dataset with with top and bottoms, must have
dimensions 'x' and 'y' and variables 'top' and 'bot'
Returns
-------
int
modellayer
"""
if np.isnan(self._obj.x) or np.isnan(self._obj.y):
logger.warning("x or y coordinate is NaN, returning NaN")
return np.nan
zvec = get_zvec(self._obj.x, self._obj.y, gwf=gwf, ds=ds)
if np.all(np.isnan(zvec)):
return np.nan
else:
return get_modellayer_from_screen_depth(
self._obj.screen_top,
self._obj.screen_bottom,
zvec,
left=left,
right=right,
)
[docs] def get_regis_layer(self):
"""find the name of the REGIS layer based on the tube screen depth.
Parameters
----------
Returns
-------
str
name of REGIS layer
"""
import xarray as xr
if np.isnan(self._obj.screen_top) or np.isnan(self._obj.screen_bottom):
return "nan"
# connect to regis netcdf
regis_url = "https://www.dinodata.nl/opendap/REGIS/REGIS.nc"
regis_ds = xr.open_dataset(regis_url, decode_times=False, decode_coords="all")
# rename layer in regis netcdf
regis_ds = regis_ds.rename({"layer": "layer_old"})
regis_ds.coords["layer"] = regis_ds.layer_old.astype(str)
regis_ds = regis_ds.swap_dims({"layer_old": "layer"})
# get zvec regis netcdf
z = (
regis_ds["bottom"]
.sel(x=self._obj.x, y=self._obj.y, method="nearest")
.dropna(dim="layer")
)
ztop = regis_ds["top"].sel(x=self._obj.x, y=self._obj.y, method="nearest").max()
zvec = np.concatenate([[ztop.values], z.values])
# get index of regis model layer
layer_i = get_modellayer_from_screen_depth(
self._obj.screen_top,
self._obj.screen_bottom,
zvec,
left=-999,
right=999,
)
if layer_i == 999:
return "above"
elif layer_i == -999:
return "below"
return str(z.layer[layer_i].values)