import logging
import os
import warnings
import numpy as np
import xarray as xr
from scipy.interpolate import griddata
from scipy.spatial import Delaunay
from ..observation import ModelObs
logger = logging.getLogger(__name__)
[docs]def read_imod_results(
obs_collection,
ml,
runfile,
mtime,
model_ws,
modelname="",
nlay=None,
exclude_layers=0,
):
"""Read imod model results at point locations.
Parameters
----------
obs_collection : ObsCollection
collection of observations at which points imod results will be read
ml : flopy.modflow.mf.model
modflow model
runfile : Runfile
imod runfile object
mtime : list of datetimes
datetimes corresponding to the model periods
model_ws : str
model workspace with imod model
nlay : int, optional
number of layers if None the number of layers from ml is used.
modelname : str
modelname
exclude_layers : int
exclude modellayers from being read from imod
"""
import imod
if ml.modelgrid.xoffset == 0 or ml.modelgrid.yoffset == 0:
warnings.warn(
"you probably want to set the xll and/or yll " "attributes of ml.modelgrid"
)
if nlay is None:
nlay = ml.modelgrid.nlay
xmid, ymid, _ = ml.modelgrid.xyzcellcenters
xy = np.array([xmid.ravel(), ymid.ravel()]).T
uv = obs_collection.loc[:, ("x", "y")].dropna(how="any", axis=0).values
vtx, wts = interp_weights(xy, uv)
hm_ts = np.zeros((obs_collection.shape[0], len(mtime)))
# loop over layers
for m in range(nlay):
if m < exclude_layers:
continue
mask = obs_collection.modellayer.values == m
# loop over timesteps
for t, date in enumerate(mtime):
head_idf = "head_{}_l{}.idf".format(date.strftime("%Y%m%d"), m + 1)
fname = os.path.join(
model_ws, runfile.data["OUTPUTDIRECTORY"], "head", head_idf
)
logger.info(f"read {fname}")
ihds, _attrs = imod.idf.read(fname)
hm = interpolate(ihds, vtx, wts)
hm_ts[mask, t] = hm[mask]
mo_list = []
for i, name in enumerate(obs_collection.index):
mo = ModelObs(
index=mtime,
data=hm_ts[i],
name=name,
model=modelname,
x=obs_collection.loc[name, "x"],
y=obs_collection.loc[name, "y"],
source="modflow",
meta=obs_collection.loc[name, "obs"].meta,
)
mo_list.append(mo)
return mo_list
[docs]def read_modflow_results(
obs_collection,
ml,
hds_arr,
mtime,
modelname="",
nlay=None,
exclude_layers=None,
method="linear",
):
"""Read modflow groundwater heads at points in obs_collection.
Parameters
----------
obs_collection : ObsCollection
locations of model observation
ml : flopy.modflow.mf.model
modflow model
hds_arr : numpy array
heads with shape (ntimesteps, nlayers, nrow, ncol)
mtime : list of datetimes
dates for each model timestep
modelname : str, optional
modelname
nlay : int, optional
number of layers if None the number of layers from ml is used.
exclude_layers : list of int, optional
exclude the observations in these modellayers
method : str, optional
interpolation method, either 'linear' or 'nearest',
default is linear.
"""
if ml.modelgrid.grid_type == "structured":
if ml.modelgrid.xoffset == 0 or ml.modelgrid.yoffset == 0:
warnings.warn(
"you probably want to set the xll " "and/or yll attributes in DIS!"
)
if isinstance(hds_arr, xr.DataArray):
hds_arr = hds_arr.values
if nlay is None:
nlay = ml.modelgrid.nlay
if modelname == "":
modelname = ml.name
xmid, ymid, _ = ml.modelgrid.xyzcellcenters
if ml.modelgrid.grid_type == "structured":
xy = np.array([xmid.ravel(), ymid.ravel()]).T
elif ml.modelgrid.grid_type == "vertex":
xy = np.array([xmid, ymid]).T
uv = obs_collection.loc[:, ("x", "y")].dropna(how="any", axis=0).values
if method == "linear":
vtx, wts = interp_weights(xy, uv)
# get interpolated timeseries from hds_arr
hm_ts = np.nan * np.ones((obs_collection.shape[0], hds_arr.shape[0]))
if exclude_layers is None:
exclude_layers = []
# loop over layers
for m in range(nlay):
if m in exclude_layers:
continue
mask = obs_collection["modellayer"].values == m
# loop over timesteps
for t in range(hds_arr.shape[0]):
ihds = hds_arr[t, m]
ihds[ihds <= -999.0] = np.nan
if method == "linear":
hm = interpolate(ihds, vtx, wts)
hm_ts[mask, t] = hm[mask]
elif method == "nearest":
hm_ts[mask, t] = griddata(xy, ihds.ravel(), uv, method=method)[mask]
else:
raise ValueError(f"Unknown method: '{method}'")
mo_list = []
for i, name in enumerate(obs_collection.index):
mo = ModelObs(
index=mtime,
data=hm_ts[i],
name=name,
model=modelname,
x=obs_collection.loc[name, "x"],
y=obs_collection.loc[name, "y"],
meta=obs_collection.loc[name, "obs"].meta,
source="modflow",
)
mo_list.append(mo)
return mo_list
[docs]def interp_weights(xy, uv, d=2):
"""Calculate interpolation weights [1]_.
Parameters
----------
xy : np.array
array containing x-coordinates in first column and y-coordinates
in second column
uv : np.array
array containing coordinates at which interpolation weights should
be calculated, x-data in first column and y-data in second column
d : int, optional
dimension of data? (the default is 2, which works for 2D data)
Returns
-------
vertices: np.array
array containing interpolation vertices
weights: np.array
array containing interpolation weights per point
References
----------
.. [1] https://stackoverflow.com/questions/20915502/
speedup-scipy-griddata-for-multiple-interpolations-between-two-irregular-grids
"""
tri = Delaunay(xy)
simplex = tri.find_simplex(uv)
vertices = np.take(tri.simplices, simplex, axis=0)
temp = np.take(tri.transform, simplex, axis=0)
delta = uv - temp[:, d]
bary = np.einsum("njk,nk->nj", temp[:, :d, :], delta)
return vertices, np.hstack((bary, 1 - bary.sum(axis=1, keepdims=True)))
[docs]def interpolate(values, vtx, wts, fill_value=np.nan):
"""Interpolate values at locations defined by vertices and points [2]_, as
calculated by interp_weights function.
Parameters
----------
values : np.array
array containing values to interpolate
vtx : np.array
array containing interpolation vertices, see interp_weights()
wts : np.array
array containing interpolation weights, see interp_weights()
fill_value : float
fill value for points that have to be extrapolated (e.g. at or
beyond edges of the known points)
Returns
-------
arr: np.array
array containing interpolated values at locations as given by
vtx and wts
References
----------
.. [2] https://stackoverflow.com/questions/20915502/
speedup-scipy-griddata-for-multiple-interpolations-between-two-irregular-grids
"""
ret = np.einsum("nj,nj->n", np.take(values, vtx), wts)
ret[np.any(wts < 0, axis=1)] = fill_value
return ret