"""Module with ObsCollection class for a collection of observations.
The ObsCollection class is a subclass of a pandas DataFrame with
additional attributes and methods.
More information about subclassing pandas DataFrames can be found here:
http://pandas.pydata.org/pandas-docs/stable/development/extending.html#extending-subclassing-pandas
"""
import logging
import numbers
from typing import List, Optional
import numpy as np
import pandas as pd
from . import observation as obs
from . import util
logger = logging.getLogger(__name__)
[docs]def read_lizard(
extent=None,
codes=None,
name="",
tube_nr="all",
tmin=None,
tmax=None,
type_timeseries="merge",
only_metadata=False,
):
"""Get all observations from a list of codes of the monitoring wells and a list of
tube numbers.
Parameters
----------
extent : list, shapefile path or None
get groundwater monitoring wells within this extent [xmin, ymin, xmax, ymax]
or within a predefined Polygon from a shapefile
codes : lst of str or None
codes of the monitoring wells
tube_nr : lst of str
list of tube numbers of the monitoring wells that should be selected.
By default 'all' available tubes are selected.
tmin : str YYYY-m-d, optional
start of the observations, by default the entire time series is returned
tmax : Ttr YYYY-m-d, optional
end of the observations, by default the entire time series is returned
type_timeseries : str, optional
hand: returns only hand measurements
diver: returns only diver measurements
merge: the hand and diver measurements into one time series (default)
combine: keeps hand and diver measurements separeted
only_metadata : bool, optional
if True only metadata is returned and no time series data. The
default is False.
Returns
-------
ObsCollection
ObsCollection DataFrame with the 'obs' column
"""
oc = ObsCollection.from_lizard(
extent=extent,
codes=codes,
name=name,
tube_nr=tube_nr,
tmin=tmin,
tmax=tmax,
type_timeseries=type_timeseries,
only_metadata=only_metadata,
)
return oc
[docs]def read_bro(
extent=None,
bro_id=None,
name="",
tmin=None,
tmax=None,
only_metadata=False,
keep_all_obs=True,
epsg=28992,
ignore_max_obs=False,
):
"""Get all the observations within an extent or within a groundwatermonitoring net.
Parameters
----------
extent : list, tuple, numpy-array or None, optional
get groundwater monitoring wells within this extent
[xmin, xmax, ymin, ymax]
bro_id : str or None, optional
starts with 'GMN'.
name : str, optional
name of the observation collection
tmin : str or None, optional
start time of observations. The default is None.
tmax : str or None, optional
end time of observations. The default is None.
only_metadata : bool, optional
if True download only metadata, significantly faster. The default
is False.
keep_all_obs : boolean, optional
add all observation points to the collection, even without
measurements
epsg : int, optional
epsg code of the extent. The default is 28992 (RD).
ignore_max_obs : bool, optional
by default you get a prompt if you want to download over a 1000
observations at once. if ignore_max_obs is True you won't get the
prompt. The default is False
Returns
-------
ObsCollection
ObsCollection DataFrame with the 'obs' column
"""
oc = ObsCollection.from_bro(
extent=extent,
bro_id=bro_id,
name=name,
tmin=tmin,
tmax=tmax,
only_metadata=only_metadata,
keep_all_obs=keep_all_obs,
epsg=epsg,
ignore_max_obs=ignore_max_obs,
)
return oc
[docs]def read_bronhouderportaal_bro(dirname, full_meta=False, add_to_df=False):
"""get all the metadata from files in a directory. Files are GMW files of
well construction, and are subbmitted to
https://www.bronhouderportaal-bro.nl .
Parameters
----------
dirname : str
name of directory that holds XML files
full_meta : bool, optional
process not only the standard metadata to ObsCollection
add_to_df : bool, optional
add all the metadata to the ObsCollection DataFrame
Returns
-------
ObsCollection
ObsCollection DataFrame without the 'obs' column
"""
oc = ObsCollection.from_bronhouderportaal_bro(
dirname=dirname,
full_meta=full_meta,
)
if add_to_df:
oc.add_meta_to_df(key="all")
return oc
[docs]def read_dino(
dirname=None,
ObsClass=obs.GroundwaterObs,
subdir="Grondwaterstanden_Put",
suffix="1.csv",
keep_all_obs=True,
name=None,
**kwargs,
):
"""Read dino observations within an extent from the server or from a directory with
downloaded files.
Parameters
----------
dirname : str, optional
directory name, can be a .zip file or the parent directory
of subdir
ObsClass : type
class of the observations, so far only GroundwaterObs is supported
subdir : str
subdirectory of dirname with data files
suffix : str
suffix of files in subdir that will be read
keep_all_obs : boolean, optional
add all observation points to the collection, even the points
without measurements or metadata
name : str, optional
the name of the observation collection
kwargs:
kwargs are passed to the hydropandas.io.dino.read_dino_dir() function
Returns
-------
ObsCollection
collection of multiple point observations
"""
oc = ObsCollection.from_dino(
dirname=dirname,
ObsClass=ObsClass,
subdir=subdir,
suffix=suffix,
keep_all_obs=keep_all_obs,
name=name,
**kwargs,
)
return oc
[docs]def read_excel(path, meta_sheet_name="metadata"):
"""Create an observation collection from an excel file. The excel file should have
the same format as excel files created with the `to_excel` method of an
ObsCollection.
Parameters
----------
path : str
full file path (including extension) of the excel file.
meta_sheet_name : str, optional
sheetname with metadata. The default is "metadata".
Returns
-------
ObsCollection
Notes
-----
if you write an excel file using the 'to_excel' method and read an excel
with the 'read_excel' method you lose this information:
- The 'name' and 'meta' attributes of the ObsCollection
- metadata of each Observation stored in the 'meta' attribute
If you don't want to lose this data consider using the `to_pickle` and
`read_pickle` function.
"""
oc = ObsCollection.from_excel(path, meta_sheet_name=meta_sheet_name)
return oc
[docs]def read_fews(
file_or_dir=None,
xmlstring=None,
ObsClass=obs.GroundwaterObs,
name="fews",
translate_dic=None,
filterdict=None,
locations=None,
remove_nan=True,
low_memory=True,
unpackdir=None,
force_unpack=False,
preserve_datetime=False,
**kwargs,
):
"""Read one or several FEWS PI-XML files.
Parameters
----------
file_or_dir : str
zip, xml or directory with zips or xml files to read
xmlstring : str or None
string with xml data, only used if file_or_dir is None. Default is
None
ObsClass : type
class of the observations, e.g. GroundwaterObs or WaterlvlObs
name : str, optional
name of the observation collection, 'fews' by default
translate_dic : dic or None, optional
translate names from fews. If None this default dictionary is used:
{'locationId': 'locatie'}.
filterdict : dict, optional
dictionary with tag name to apply filter to as keys, and list of
accepted names as dictionary values to keep in final result,
i.e. {"locationId": ["B001", "B002"]}
locations : list of str, optional
list of locationId's to read from XML file, others are skipped.
If None (default) all locations are read. Only supported by
low_memory=True method!
low_memory : bool, optional
whether to use xml-parsing method with lower memory footprint,
default is True
remove_nan : boolean, optional
remove nan values from measurements, flag information about the
nan values is also lost, only used if low_memory=False
unpackdir : str
destination directory to unzip file if file_or_dir is a .zip
force_unpack : boolean, optional
force unpack if dst already exists
preserve_datetime : boolean, optional
whether to preserve datetime from zip archive
Returns
-------
ObsCollection
collection of multiple point observations
"""
oc = ObsCollection.from_fews_xml(
file_or_dir=file_or_dir,
xmlstring=xmlstring,
ObsClass=ObsClass,
name=name,
translate_dic=translate_dic,
filterdict=filterdict,
locations=locations,
remove_nan=remove_nan,
low_memory=low_memory,
unpackdir=unpackdir,
force_unpack=force_unpack,
preserve_datetime=preserve_datetime,
**kwargs,
)
return oc
[docs]def read_imod(
obs_collection,
ml,
runfile,
mtime,
model_ws,
modelname="",
nlay=None,
exclude_layers=0,
):
"""Read imod model results at point locations.
Parameters
----------
obs_collection : ObsCollection
collection of observations at which points imod results will be read
ml : flopy.modflow.mf.model
modflow model
runfile : Runfile
imod runfile object
mtime : list of datetimes
datetimes corresponding to the model periods
model_ws : str
model workspace with imod model
nlay : int, optional
number of layers if None the number of layers from ml is used.
modelname : str
modelname
exclude_layers : int
exclude modellayers from being read from imod
Returns
-------
ObsCollection
collection of multiple point observations
"""
oc = ObsCollection.from_imod(
obs_collection=obs_collection,
ml=ml,
runfile=runfile,
mtime=mtime,
model_ws=model_ws,
modelname=modelname,
nlay=nlay,
exclude_layers=exclude_layers,
)
return oc
[docs]def read_knmi(
locations=None,
stns=None,
xy=None,
meteo_vars=("RH",),
name="",
starts=None,
ends=None,
ObsClasses=None,
fill_missing_obs=False,
interval="daily",
use_api=True,
raise_exceptions=True,
):
"""Get knmi observations from a list of locations or a list of stations.
Parameters
----------
locations : pandas DataFrame or None
dataframe with columns 'x' and 'y' as coordinates. The
default is None
stns : list of str or None
list of knmi stations. The default is None
xy : list or numpy array, optional
xy coordinates of the locations. e.g. [[10,25], [5,25]]
meteo_vars : list or tuple of str
meteo variables e.g. ["RH", "EV24"]. The default is ("RH").
See list of all possible variables below
name : str, optional
name of the obscollection. The default is ''
starts : None, str, datetime or list, optional
start date of observations per meteo variable. The start date is
included in the time series.
If start is None the start date will be January 1st of the
previous year.
If start is str it will be converted to datetime.
If start is a list it should be the same length as meteo_vars and
the start time for each variable. The default is None
ends : list of str, datetime or None
end date of observations per meteo variable. The end date is
included in the time series.
If end is None the start date will be January 1st of the
previous year.
If end is a str it will be converted to datetime.
If end is a list it should be the same length as meteo_vars and
the end time for each meteo variable. The default is None
ObsClasses : list of type or None
class of the observations, can be PrecipitationObs, EvaporationObs
or MeteoObs. If None the type of observations is derived from the
meteo_vars.
**kwargs :
kwargs are passed to the hydropandas.io.knmi.get_knmi_obslist function
List of possible variables:
neerslagstations:
RD = de 24-uurs neerslagsom, gemeten van 0800 utc op de
voorafgaande dag tot 0800 utc op de vermelde datum meteostations:
DDVEC = Vectorgemiddelde windrichting in graden (360=noord,
90=oost, 180=zuid, 270=west, 0=windstil/variabel). Zie
http://www.knmi.nl/kennis-en-datacentrum/achtergrond/klimatologische-brochures-en-boeken
/ Vector mean wind direction in degrees (360=north, 90=east,
180=south, 270=west, 0=calm/variable)
FHVEC = Vectorgemiddelde windsnelheid (in 0.1 m/s). Zie
http://www.knmi.nl/kennis-en-datacentrum/achtergrond/klimatologische-brochures-en-boeken
/ Vector mean windspeed (in 0.1 m/s)
FG = Etmaalgemiddelde windsnelheid (in 0.1 m/s) / Daily mean
windspeed (in 0.1 m/s)
FHX = Hoogste uurgemiddelde windsnelheid (in 0.1 m/s) / Maximum
hourly mean windspeed (in 0.1 m/s)
FHXH = Uurvak waarin FHX is gemeten / Hourly division in which
FHX was measured
FHN = Laagste uurgemiddelde windsnelheid (in 0.1 m/s) / Minimum
hourly mean windspeed (in 0.1 m/s)
FHNH = Uurvak waarin FHN is gemeten / Hourly division in which
FHN was measured
FXX = Hoogste windstoot (in 0.1 m/s) / Maximum wind gust (in
0.1 m/s)
FXXH = Uurvak waarin FXX is gemeten / Hourly division in which
FXX was measured
TG = Etmaalgemiddelde temperatuur (in 0.1 graden Celsius) /
Daily mean temperature in (0.1 degrees Celsius)
TN = Minimum temperatuur (in 0.1 graden Celsius) / Minimum
temperature (in 0.1 degrees Celsius)
TNH = Uurvak waarin TN is gemeten / Hourly division in which TN
was measured
TX = Maximum temperatuur (in 0.1 graden Celsius) / Maximum
temperature (in 0.1 degrees Celsius)
TXH = Uurvak waarin TX is gemeten / Hourly division in which TX
was measured
T10N = Minimum temperatuur op 10 cm hoogte (in 0.1 graden
Celsius) / Minimum temperature at 10 cm above surface (in 0.1
degrees Celsius)
T10NH = 6-uurs tijdvak waarin T10N is gemeten / 6-hourly division
in which T10N was measured; 6=0-6 UT, 12=6-12 UT, 18=12-18 UT,
24=18-24 UT
SQ = Zonneschijnduur (in 0.1 uur) berekend uit de globale
straling (-1 voor <0.05 uur) / Sunshine duration (in 0.1 hour)
calculated from global radiation (-1 for <0.05 hour)
SP = Percentage van de langst mogelijke zonneschijnduur /
Percentage of maximum potential sunshine duration
Q = Globale straling (in J/cm2) / Global radiation (in J/cm2)
DR = Duur van de neerslag (in 0.1 uur) / Precipitation duration
(in 0.1 hour)
RH = Etmaalsom van de neerslag (in 0.1 mm) (-1 voor <0.05 mm) /
Daily precipitation amount (in 0.1 mm) (-1 for <0.05 mm)
RHX = Hoogste uursom van de neerslag (in 0.1 mm) (-1 voor <0.05
mm) / Maximum hourly precipitation amount (in 0.1 mm) (-1 for
<0.05 mm)
RHXH = Uurvak waarin RHX is gemeten / Hourly division in which
RHX was measured
PG = Etmaalgemiddelde luchtdruk herleid tot zeeniveau (in 0.1
hPa) berekend uit 24 uurwaarden / Daily mean sea level pressure
(in 0.1 hPa) calculated from 24 hourly values
PX = Hoogste uurwaarde van de luchtdruk herleid tot zeeniveau
(in 0.1 hPa) / Maximum hourly sea level pressure (in 0.1 hPa)
PXH = Uurvak waarin PX is gemeten / Hourly division in which PX
was measured
PN = Laagste uurwaarde van de luchtdruk herleid tot zeeniveau
(in 0.1 hPa) / Minimum hourly sea level pressure (in 0.1 hPa)
PNH = Uurvak waarin PN is gemeten / Hourly division in which PN
was measured
P = Luchtdruk (in 0.1 hPa) herleid tot zeeniveau, op het moment
van meten / Air pressure (in 0.1 hPa) reduced to mean sea level, at
the time of observation
VVN = Minimum opgetreden zicht / Minimum visibility; 0: <100 m,
1:100-200 m, 2:200-300 m,..., 49:4900-5000 m, 50:5-6 km,
56:6-7 km, 57:7-8 km,..., 79:29-30 km, 80:30-35 km, 81:35-40 km,
..., 89: >70 km)
VVNH = Uurvak waarin VVN is gemeten / Hourly division in which
VVN was measured
VVX = Maximum opgetreden zicht / Maximum visibility; 0: <100 m,
1:100-200 m, 2:200-300 m,..., 49:4900-5000 m, 50:5-6 km,
56:6-7 km, 57:7-8 km,..., 79:29-30 km, 80:30-35 km, 81:35-40 km,
..., 89: >70 km)
VVXH = Uurvak waarin VVX is gemeten / Hourly division in which
VVX was measured
NG = Etmaalgemiddelde bewolking (bedekkingsgraad van de
bovenlucht in achtsten, 9=bovenlucht onzichtbaar) / Mean daily
cloud cover (in octants, 9=sky invisible)
UG = Etmaalgemiddelde relatieve vochtigheid (in procenten) /
Daily mean relative atmospheric humidity (in percents)
UX = Maximale relatieve vochtigheid (in procenten) / Maximum
relative atmospheric humidity (in percents)
UXH = Uurvak waarin UX is gemeten / Hourly division in which UX
was measured
UN = Minimale relatieve vochtigheid (in procenten) / Minimum
relative atmospheric humidity (in percents)
UNH = Uurvak waarin UN is gemeten / Hourly division in which UN
was measured
EV24 = Referentiegewasverdamping (Makkink) (in 0.1 mm) /
Potential evapotranspiration (Makkink) (in 0.1 mm)
Returns
-------
ObsCollection
collection of multiple point observations
"""
oc = ObsCollection.from_knmi(
locations=locations,
stns=stns,
xy=xy,
meteo_vars=meteo_vars,
name=name,
starts=starts,
ends=ends,
ObsClasses=ObsClasses,
fill_missing_obs=fill_missing_obs,
interval=interval,
use_api=use_api,
raise_exceptions=raise_exceptions,
)
return oc
[docs]def read_menyanthes(
path, name="", ObsClass=obs.Obs, load_oseries=True, load_stresses=True
):
"""Read a Menyanthes file.
Parameters
----------
path : str
full path of the .men file.
name : str, optional
name of the observation collection. The default is "".
ObsClass : type, optional
class of the observations, e.g. GroundwaterObs. The default is
obs.Obs.
load_oseries : bool, optional
if True the observations are read. The default is True.
load_stresses : bool, optional
if True the stresses are read. The default is True.
Returns
-------
ObsCollection
collection of multiple point observations
"""
oc = ObsCollection.from_menyanthes(
path=path,
name=name,
ObsClass=ObsClass,
load_oseries=load_oseries,
load_stresses=load_stresses,
)
return oc
[docs]def read_modflow(
obs_collection,
ml,
hds_arr,
mtime,
modelname="",
nlay=None,
exclude_layers=None,
method="linear",
):
"""Read modflow groundwater heads at locations in obs_collection.
Parameters
----------
obs_collection : ObsCollection
locations of model observation
ml : flopy.modflow.mf.model
modflow model
hds_arr : numpy array
heads with shape (ntimesteps, nlayers, nrow, ncol)
mtime : list of datetimes
dates for each model timestep
modelname : str, optional
modelname
nlay : int, optional
number of layers if None the number of layers from ml is used.
exclude_layers : list of int, optional
exclude the observations in these model layers
method : str, optional
interpolation method, either 'linear' or 'nearest',
default is linear
Returns
-------
ObsCollection
collection of multiple point observations
"""
oc = ObsCollection.from_modflow(
obs_collection=obs_collection,
ml=ml,
hds_arr=hds_arr,
mtime=mtime,
modelname=modelname,
nlay=nlay,
exclude_layers=exclude_layers,
method=method,
)
return oc
[docs]def read_pickle(
filepath_or_buffer,
compression="infer",
storage_options=None,
):
"""Wrapper around pd.read_pickle.
Parameters
----------
filepath_or_buffer : str, path object, or file-like object
String, path object (implementing ``os.PathLike[str]``), or file-like
object implementing a binary ``readlines()`` function.
.. versionchanged:: 1.0.0
Accept URL. URL is not limited to S3 and GCS.
compression : str or dict, default 'infer'
For on-the-fly decompression of on-disk data. If 'infer' and
'filepath_or_buffer' is path-like, then detect compression from the
following extensions: '.gz','.bz2', '.zip', '.xz', or '.zst' (otherwise
no compression). If using 'zip', the ZIP file must contain only one data
file to be read in. Set to ``None`` for no decompression. Can also be a
dict with key ``'method'`` set to one of {``'zip'``, ``'gzip'``,
``'bz2'``, ``'zstd'``} and other key-value pairs are forwarded to
``zipfile.ZipFile``, ``gzip.GzipFile``, ``bz2.BZ2File``, or
``zstandard.ZstdDecompressor``, respectively. As an example, the
following could be passed for Zstandard decompression using a custom
compression dictionary: ``compression={'method': 'zstd', 'dict_data':
my_compression_dict}``.
.. versionchanged:: 1.4.0 Zstandard support.
storage_options : dict, optional
Extra options that make sense for a particular storage connection, e.g.
host, port, username, password, etc. For HTTP(S) URLs the key-value
pairs are forwarded to ``urllib`` as header options. For other URLs
(e.g. starting with "s3://", and "gcs://") the key-value pairs are
forwarded to ``fsspec``. Please see ``fsspec`` and ``urllib`` for more
details.
.. versionadded:: 1.2.0
Returns
-------
ObsCollection : same type as object stored in file
"""
return pd.read_pickle(filepath_or_buffer, compression, storage_options)
[docs]def read_waterinfo(
file_or_dir, name="", ObsClass=obs.WaterlvlObs, progressbar=True, **kwargs
):
"""Read waterinfo file or directory.
Parameters
----------
file_or_dir : str
path to file or directory. Files can be .csv or .zip
name : str, optional
name of the collection, by default ""
ObsClass : Obs, optional
type of Obs to read data as, by default obs.WaterlvlObs
progressbar : bool, optional
show progressbar, by default True
Returns
-------
ObsCollection
ObsCollection containing data
"""
oc = ObsCollection.from_waterinfo(
file_or_dir=file_or_dir,
name=name,
ObsClass=ObsClass,
progressbar=progressbar,
**kwargs,
)
return oc
[docs]def read_wiski(
dirname,
ObsClass=obs.GroundwaterObs,
suffix=".csv",
unpackdir=None,
force_unpack=False,
preserve_datetime=False,
keep_all_obs=True,
**kwargs,
):
"""
Parameters
----------
dirname : str
path of the zipfile with wiski data.
ObsClass : type, optional
type of Obs. The default is obs.GroundwaterObs.
suffix : str, optional
extension of filenames to read. The default is ".csv".
unpackdir : str or None, optional
directory to unpack zipped directory. The default is None.
force_unpack : bool, optional
force unzip, by default False.
preserve_datetime : bool, optional
preserve datetime of unzipped files, by default False
(useful for checking whether data has changed)
keep_all_obs : bool, optional
If True keep all observations even those without metadata. The default
is True.
**kwargs
Returns
-------
ObsCollection
ObsCollection containing observation data
"""
oc = ObsCollection.from_wiski(
dirname=dirname,
ObsClass=ObsClass,
suffix=suffix,
unpackdir=unpackdir,
force_unpack=force_unpack,
preserve_datetime=preserve_datetime,
keep_all_obs=keep_all_obs,
**kwargs,
)
return oc
[docs]def read_pastastore(
pstore,
libname,
ObsClass=obs.GroundwaterObs,
metadata_mapping=None,
):
"""Read pastastore library.
Parameters
----------
pstore : pastastore.PastaStore
PastaStore object
libname : str
name of library (e.g. oseries or stresses)
ObsClass : Obs, optional
type of Obs to read data as, by default obs.GroundwaterObs
metadata_mapping : dict, optional
dictionary containing map between metadata field names in pastastore and
metadata field names expected by hydropandas, by default None.
Returns
-------
ObsCollection
ObsCollection containing data
"""
return ObsCollection.from_pastastore(
pstore=pstore,
libname=libname,
ObsClass=ObsClass,
metadata_mapping=metadata_mapping,
)
[docs]class ObsCollection(pd.DataFrame):
"""Class for a collection of point observations.
An ObsCollection object is a subclass of a pandas.DataFrame and allows for
additional attributes and methods. Additional attributes are
defined in the '_metadata' attribute.
Parameters
----------
*args observations, list of observations or a pandas DataFrame,
**kwargs can be one of these:
name : str
name of the observation collection
meta : dic
metadata of the observation collection
"""
# temporary properties
_internal_names = pd.DataFrame._internal_names + ["none"]
_internal_names_set = set(_internal_names)
# normal properties
_metadata = [
"name",
"meta",
]
def __init__(self, *args, **kwargs):
self.name = kwargs.pop("name", "")
self.meta = kwargs.pop("meta", {})
if len(args) == 0:
logger.debug("Create empty ObsCollection")
super().__init__(**kwargs)
elif isinstance(args[0], (list, tuple)):
logger.debug("Convert list of observations to ObsCollection")
obs_df = util._obslist_to_frame(args[0])
super().__init__(obs_df, *args[1:], **kwargs)
elif isinstance(args[0], obs.Obs):
logger.debug("Convert observation(s) to ObsCollection")
obs_list = [o for o in args if isinstance(o, obs.Obs)]
remaining_args = [o for o in args if not isinstance(o, obs.Obs)]
obs_df = util._obslist_to_frame(obs_list)
super().__init__(obs_df, *remaining_args, **kwargs)
elif isinstance(args[0], pd.DataFrame):
if "obs" not in args[0].columns:
df = self.from_dataframe(*args)
super().__init__(df, **kwargs)
else:
super().__init__(*args, **kwargs)
else:
super().__init__(*args, **kwargs)
@property
def _constructor(self):
return ObsCollection
def _infer_otype(self):
"""Infer observation type from the obs column.
Parameters
----------
Returns
-------
otype, otypes
type or list of types of the observation objects
"""
otypes = self.obs.apply(lambda x: type(x)).unique()
if otypes.shape[0] == 1:
logger.debug("inferred observation type: {}".format(otypes[0]))
return otypes[0]
elif otypes.shape[0] > 1:
logger.debug("inferred multiple otypes, types: {}".format(otypes))
return otypes
else:
raise TypeError("could not infer observation type")
def _set_metadata_value(self, iname, att_name, value, add_to_meta=False):
"""Set a value on three different levels at once:
1. the value in an ObsCollection DataFrame
2. the attribute of the observation
3. the value in the meta dictionary of an observation (optional)
Parameters
----------
iname : str, int, float, ...
observation name. Must be same type as self.index.
e.g. B52D0111_3
att_name : str, int, float, ...
name of the column in self.columns and attribute
of the observation. e.g. 'x'
value : str, int, float, ...
value of the the att_name. e.g. 116234
add_to_meta : bool, optional
if True the att_name, value pair is added to the meta dictionary
of an observation. The default is False.
Raises
------
ValueError
if the iname is not in self.index the value cannot be set.
Returns
-------
None.
"""
if iname not in self.index:
raise ValueError(f"{iname} not in index")
o = self.loc[iname, "obs"]
if att_name in o._metadata:
setattr(o, att_name, value)
logger.debug(f"set attribute {att_name} of {iname} to {value}")
if att_name == "name":
# name is the index of the ObsCollection dataframe
self.rename(index={iname: value}, inplace=True)
else:
self.loc[iname, att_name] = value
logger.debug(f"set {iname}, {att_name} to {value} in obscollection")
if add_to_meta:
o.meta.update({att_name: value})
logger.debug(f"add {att_name} of {iname} with value {value} to meta")
def _is_consistent(self, check_individual_obs=True):
"""check if an observation collection is consistent. An observation
collection is consistent if:
1. all observations have a unique name
2. there are no nan values in the obs column
3. (optional) the metadata of each observation has the same type
and value as the corresponding row in the observation collection
dataframe. Only checked if check_individual_obs is True.
Parameters
----------
check_individual_obs : bool, optional
If True the third condition in the list above is checked. The
default is True.
Returns
-------
bool
True -> consistent
False -> inconsistent.
"""
# check unique index
if not self.index.is_unique:
logger.warning(
f""""index of observation collection -> {self.name}
not unique. non unique indices are:"""
)
logger.warning(" ".join(self.index[self.index.duplicated()]))
return False
# check nan values in observations
if self.obs.isna().any():
logger.warning(f"missing observation object in collection -> {self.name} ")
return False
# check oc data with individual object attributes
if check_individual_obs:
for o in self.obs.values:
for att in o._metadata:
if att not in ["name", "meta"]:
v1 = self.loc[o.name, att]
v2 = getattr(o, att)
# check if values are equal
try:
if v1 != v2:
# check if both are nan
if isinstance(v1, numbers.Number) and isinstance(
v2, numbers.Number
):
if np.isnan(v1) and np.isnan(v2):
continue
# otherwise return Nan
logger.warning(
f"observation collection -> {self.name} not"
f"consistent with observation -> {o.name}"
f"{att} value"
)
return False
except TypeError:
logger.warning(
f"observation collection -> {self.name} not"
f"consistent with observation -> {o.name} {att}"
"value"
)
return False
elif att == "name":
if o.name not in self.index:
logger.warning(
f"observation collection -> {self.name} not"
f"consistent with observation -> {o.name} name"
)
return False
return True
[docs] def add_observation(self, o, check_consistency=True, **kwargs):
"""Add an observation to an existing observation collection. If the observation
exists the two observations are merged.
Parameters
----------
o : hpd.observation.Obs
Observation object.
check_consistency : bool, optional
If True the consistency of the collection is first checked. The
default is True.
**kwargs passed to Obs.merge_observation:
merge_metadata : bool, optional
If True and observations are merged the metadata of the two
objects are merged. If there are any differences the overlap
parameter is used to determine which metadata is used. If
merge_metadata is False, the metadata of the original
observation is always used for the merged observation. The
default is True.
overlap : str, optional
How to deal with overlapping timeseries with different values.
Options are:
- error : Raise a ValueError
- use_left : use the overlapping part from the existing
observations
- use_right : use the overlapping part from the new observation
Default is 'error'.
Raises
------
RuntimeError
when the observation collection is inconsistent.
TypeError
when the observation type is wrong.
Returns
-------
None.
"""
if check_consistency:
if not self._is_consistent():
raise RuntimeError("inconsistent observation collection")
if not isinstance(o, obs.Obs):
raise TypeError("Observation should be of type hydropandas.observation.Obs")
# add new observation to collection
if o.name not in self.index:
logger.info(f"adding {o.name} to collection")
self.loc[o.name] = o.to_collection_dict()
else:
logger.info(
f"observation name {o.name} already in collection, merging observations"
)
o1 = self.loc[o.name, "obs"]
omerged = o1.merge_observation(o, **kwargs)
# overwrite observation in collection
self.loc[o.name] = omerged.to_collection_dict()
[docs] def add_obs_collection(
self, obs_collection, check_consistency=True, inplace=False, **kwargs
):
"""Add one observation collection to another observation collection. See
add_observation method for more details.
Parameters
----------
obs_collection : hpd.ObsCollection
ObsCollection object.
check_consistency : bool, optional
If True the consistency of both collections is first checked. The
default is True.
inplace : bool, optional
If True, modifies the ObsCollection in place (do not create a new
object). The default is False.
**kwargs passed to Obs.merge_observation:
merge_metadata : bool, optional
If True and observations are merged the metadata of the two
objects are merged. If there are any differences the overlap
parameter is used to determine which metadata is used. If
merge_metadata is False, the metadata of the original
observation is always used for the merged observation. The
default is True.
overlap : str, optional
How to deal with overlapping timeseries with different values.
Options are:
- error : Raise a ValueError
- use_left : use the overlapping part from the existing
observations
- use_right : use the overlapping part from the new observation
Default is 'error'.
Raises
------
RuntimeError
when the observation collection is inconsistent.
Returns
-------
ObsCollection or None
merged ObsCollection if ``inplace=True``.
"""
if check_consistency:
if not self._is_consistent():
raise RuntimeError(
f"inconsistent observation collection -> {self.name}"
)
if not obs_collection._is_consistent():
raise RuntimeError(
f"inconsistent observation collection -> {obs_collection.name}"
)
if inplace:
for o in obs_collection.obs.values:
self.add_observation(o, check_consistency=False, **kwargs)
else:
oc = self.copy()
for o in obs_collection.obs.values:
oc.add_observation(o, check_consistency=False, **kwargs)
return oc
[docs] def copy(self, deep=False):
"""Make a copy of this object's indices and data.
Parameters
----------
deep : bool, default True
Make a deep copy, including a deep copy of the observation objects.
With ``deep=False`` neither the indices nor the data are copied.
Returns
-------
ObsCollection
"""
if deep:
oc = super().copy(deep=deep)
# manually make a deep copy of the observations
oc["obs"] = [o.copy(deep=deep) for o in oc.obs.values]
return oc
return super().copy(deep=deep)
[docs] @classmethod
def from_bro(
cls,
extent=None,
bro_id=None,
name="",
tmin=None,
tmax=None,
only_metadata=False,
keep_all_obs=True,
epsg=28992,
ignore_max_obs=False,
):
"""Get all the observations within an extent or within a groundwatermonitoring
net.
Parameters
----------
extent : list, tuple, numpy-array or None, optional
get groundwater monitoring wells within this extent
[xmin, xmax, ymin, ymax]
bro_id : str or None, optional
starts with 'GMN'.
name : str, optional
name of the observation collection
tmin : str or None, optional
start time of observations. The default is None.
tmax : str or None, optional
end time of observations. The default is None.
only_metadata : bool, optional
if True download only metadata, significantly faster. The default
is False.
keep_all_obs : boolean, optional
add all observation points to the collection, even without
measurements
epsg : int, optional
epsg code of the extent. The default is 28992 (RD).
ignore_max_obs : bool, optional
by default you get a prompt if you want to download over a 1000
observations at once. if ignore_max_obs is True you won't get the
prompt. The default is False
Returns
-------
ObsCollection
ObsCollection DataFrame with the 'obs' column
"""
from .io.bro import get_obs_list_from_extent, get_obs_list_from_gmn
if bro_id is None and (extent is not None):
obs_list = get_obs_list_from_extent(
extent,
obs.GroundwaterObs,
tmin=tmin,
tmax=tmax,
only_metadata=only_metadata,
keep_all_obs=keep_all_obs,
epsg=epsg,
ignore_max_obs=ignore_max_obs,
)
meta = {}
elif bro_id is not None:
obs_list, meta = get_obs_list_from_gmn(
bro_id,
obs.GroundwaterObs,
only_metadata=only_metadata,
keep_all_obs=keep_all_obs,
)
name = meta.pop("name")
else:
raise ValueError("specify bro_id or extent")
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
[docs] @classmethod
def from_lizard(
cls,
extent=None,
codes=None,
name="",
tube_nr="all",
tmin=None,
tmax=None,
type_timeseries="merge",
only_metadata=False,
):
"""Get all observations within a specified extent.
Parameters
----------
extent : list, shapefile path or None
get groundwater monitoring wells wihtin this extent [xmin, ymin, xmax, ymax]
or within a predefined Polygon from a shapefile
codes : lst of str or None
codes of the monitoring wells
tube_nr : lst of str
list of tube numbers of the monitoring wells that should be selected.
By default 'all' available tubes are selected.
tmin : str YYYY-m-d, optional
start of the observations, by default the entire serie is returned
tmax : Ttr YYYY-m-d, optional
end of the observations, by default the entire serie is returned
type_timeseries : str, optional
hand: returns only hand measurements
diver: returns only diver measurements
merge: the hand and diver measurements into one time series (default)
combine: keeps hand and diver measurements separeted
The default is merge.
only_metadata : bool, optional
if True only metadata is returned and no time series data. The
default is False.
Returns
-------
ObsCollection
ObsCollection DataFrame with the 'obs' column
"""
from .io.lizard import get_obs_list_from_codes, get_obs_list_from_extent
if extent is not None:
obs_list = get_obs_list_from_extent(
extent,
obs.GroundwaterObs,
tube_nr,
tmin,
tmax,
type_timeseries,
only_metadata=only_metadata,
)
elif codes is not None:
obs_list = get_obs_list_from_codes(
codes,
obs.GroundwaterObs,
tube_nr,
tmin,
tmax,
type_timeseries,
only_metadata=only_metadata,
)
else:
raise ValueError("specify codes or extent")
return cls(obs_list, name=name)
[docs] @classmethod
def from_bronhouderportaal_bro(
cls,
dirname,
full_meta=False,
):
"""Get all the metadata from dirname.
Parameters
----------
dirname : str
name of dirname that holds XML files
full_meta : bool , optional
process all metadata. The default is False.
Returns
-------
ObsCollection
ObsCollection DataFrame without the 'obs' column
"""
from .io.bronhouderportaal_bro import get_obs_list_from_dir
obs_list = get_obs_list_from_dir(
dirname,
obs.GroundwaterObs,
full_meta=full_meta,
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df)
[docs] @classmethod
def from_dataframe(cls, df, obs_list=None, ObsClass=obs.GroundwaterObs):
"""Create an observation collection from a DataFrame by adding a column with
empty observations.
Parameters
----------
df : pandas DataFrame
input dataframe. If this dataframe has a column named 'obs' the
column is replaced with empty observation objects.
obs_list : list of observation.Obs, optional
list of observations. Default is None
ObsClass : class, optional
observation class used to create empty obs object, by
default obs.GroundwaterObs
Returns
-------
ObsCollection
ObsCollection DataFrame with the 'obs' column
"""
if isinstance(df, pd.DataFrame):
if obs_list is None:
obs_list = [ObsClass() for i in range(len(df))]
df["obs"] = obs_list
else:
raise TypeError(f"df should be type pandas.DataFrame not {type(df)}")
return cls(df)
[docs] @classmethod
def from_excel(cls, path, meta_sheet_name="metadata"):
"""Create an observation collection from an excel file. The excel file should
have the same format as excel files created with the `to_excel` method of an
ObsCollection.
Parameters
----------
path : str
full file path (including extension) of the excel file.
meta_sheet_name : str, optional
sheetname with metadata. The default is "metadata".
Returns
-------
ObsCollection
Notes
-----
if you write an excel file using the 'to_excel' method and read an excel
with the 'read_excel' method you lose this information:
- The 'name' and 'meta' attributes of the ObsCollection
- metadata of each Observation stored in the 'meta' attribute
If you don't want to lose this data consider using the `to_pickle` and
`read_pickle` function.
"""
df = pd.read_excel(path, meta_sheet_name, index_col=0)
for oname, row in df.iterrows():
measurements = pd.read_excel(path, oname, index_col=0)
all_metadata = row.to_dict()
obsclass = getattr(obs, row["obs"])
# get observation specific metadata
metadata = {
k: v for (k, v) in all_metadata.items() if k in obsclass._metadata
}
metadata["name"] = oname
o = obsclass(measurements, **metadata)
df.at[oname, "obs"] = o
return cls(df)
[docs] @classmethod
def from_dino(
cls,
dirname=None,
ObsClass=obs.GroundwaterObs,
subdir="Grondwaterstanden_Put",
suffix="1.csv",
keep_all_obs=True,
name=None,
**kwargs,
):
"""Read dino data within an extent from the server or from a directory with
downloaded files.
Parameters
----------
dirname : str, optional
directory name, can be a .zip file or the parent directory
of subdir
ObsClass : type
class of the observations, so far only GroundwaterObs is supported
subdir : str
subdirectory of dirname with data files
suffix : str
suffix of files in subdir that will be read
keep_all_obs : boolean, optional
add all observation points to the collection, even the points
without measurements or metadata
name : str, optional
the name of the observation collection
kwargs:
kwargs are passed to the hydropandas.io.dino.read_dino_dir() function
Returns
-------
cls(obs_df) : ObsCollection
collection of multiple point observations
"""
from .io.dino import read_dino_dir
# read dino directory
if name is None:
name = subdir
meta = {
"dirname": dirname,
"type": ObsClass,
"suffix": suffix,
"keep_all_obs": keep_all_obs,
}
obs_list = read_dino_dir(
dirname,
ObsClass,
subdir,
suffix,
keep_all_obs,
**kwargs,
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
[docs] @classmethod
def from_artdino_dir(
cls,
dirname=None,
ObsClass=obs.GroundwaterObs,
subdir="csv",
suffix=".csv",
unpackdir=None,
force_unpack=False,
preserve_datetime=False,
keep_all_obs=True,
name=None,
**kwargs,
):
"""Read a dino directory.
Parameters
----------
extent : list, optional
get dinodata online within this extent [xmin, xmax, ymin, ymax]
dirname : str, optional
directory name, can be a .zip file or the parent directory of subdir
ObsClass : type
class of the observations, e.g. GroundwaterObs or WaterlvlObs
subdir : str
subdirectory of dirname with data files
suffix : str
suffix of files in subdir that will be read
unpackdir : str
destination directory of the unzipped file
force_unpack : boolean, optional
force unpack if dst already exists
preserve_datetime : boolean, optional
use date of the zipfile for the destination file
keep_all_obs : boolean, optional
add all observation points to the collection, even without data or
metadata
name : str, optional
the name of the observation collection
kwargs:
kwargs are passed to the hydropandas.io.dino.read_dino_dir() function
Returns
-------
cls(obs_df) : ObsCollection
collection of multiple point observations
"""
from .io.dino import read_artdino_dir
if name is None:
name = subdir
meta = {
"dirname": dirname,
"type": ObsClass,
"suffix": suffix,
"unpackdir": unpackdir,
"force_unpack": force_unpack,
"preserve_datetime": preserve_datetime,
"keep_all_obs": keep_all_obs,
}
obs_list = read_artdino_dir(
dirname,
ObsClass,
subdir,
suffix,
unpackdir,
force_unpack,
preserve_datetime,
keep_all_obs,
**kwargs,
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
[docs] @classmethod
def from_fews_xml(
cls,
file_or_dir=None,
xmlstring=None,
ObsClass=obs.GroundwaterObs,
name="fews",
translate_dic=None,
filterdict=None,
locations=None,
remove_nan=True,
low_memory=True,
unpackdir=None,
force_unpack=False,
preserve_datetime=False,
**kwargs,
):
"""Read one or several FEWS PI-XML files.
Parameters
----------
file_or_dir : str
zip, xml or directory with zips or xml files to read
xmlstring : str or None
string with xml data, only used if file_or_dir is None. Default is
None
ObsClass : type
class of the observations, e.g. GroundwaterObs or WaterlvlObs
name : str, optional
name of the observation collection, 'fews' by default
translate_dic : dic or None, optional
translate names from fews. If None this default dictionary is used:
{'locationId': 'locatie'}.
filterdict : dict, optional
dictionary with tag name to apply filter to as keys, and list of
accepted names as dictionary values to keep in final result,
i.e. {"locationId": ["B001", "B002"]}
locations : list of str, optional
list of locationId's to read from XML file, others are skipped.
If None (default) all locations are read. Only supported by
low_memory=True method!
low_memory : bool, optional
whether to use xml-parsing method with lower memory footprint,
default is True
remove_nan : boolean, optional
remove nan values from measurements, flag information about the
nan values is also lost, only used if low_memory=False
unpackdir : str
destination directory to unzip file if path is a .zip
force_unpack : boolean, optional
force unpack if dst already exists
preserve_datetime : boolean, optional
whether to preserve datetime from zip archive
Returns
-------
cls(obs_df) : ObsCollection
collection of multiple point observations
"""
from .io.fews import read_xml_filelist, read_xmlstring
if translate_dic is None:
translate_dic = {"locationId": "monitoring_well"}
meta = {"type": ObsClass}
if file_or_dir is not None:
# get files
dirname, unzip_fnames = util.get_files(
file_or_dir,
ext=".xml",
unpackdir=unpackdir,
force_unpack=force_unpack,
preserve_datetime=preserve_datetime,
)
meta.update({"filename": dirname})
obs_list = read_xml_filelist(
unzip_fnames,
ObsClass,
directory=dirname,
translate_dic=translate_dic,
filterdict=filterdict,
locations=locations,
remove_nan=remove_nan,
low_memory=low_memory,
**kwargs,
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
elif (file_or_dir is None) and (xmlstring is not None):
obs_list = read_xmlstring(
xmlstring,
ObsClass,
translate_dic=translate_dic,
filterdict=filterdict,
locationIds=locations,
low_memory=low_memory,
remove_nan=remove_nan,
**kwargs,
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
else:
raise ValueError("either specify variables file_or_dir or xmlstring")
[docs] @classmethod
def from_imod(
cls,
obs_collection,
ml,
runfile,
mtime,
model_ws,
modelname="",
nlay=None,
exclude_layers=0,
):
"""Read imod model results at point locations.
Parameters
----------
obs_collection : ObsCollection
collection of observations at which points imod results will be read
ml : flopy.modflow.mf.model
modflow model
runfile : Runfile
imod runfile object
mtime : list of datetimes
datetimes corresponding to the model periods
model_ws : str
model workspace with imod model
nlay : int, optional
number of layers if None the number of layers from ml is used.
modelname : str
modelname
exclude_layers : int
exclude modellayers from being read from imod
"""
from .io.modflow import read_imod_results
mo_list = read_imod_results(
obs_collection,
ml,
runfile,
mtime,
model_ws,
modelname=modelname,
nlay=nlay,
exclude_layers=exclude_layers,
)
obs_df = util._obslist_to_frame(mo_list)
return cls(obs_df, name=modelname)
[docs] @classmethod
def from_knmi(
cls,
locations=None,
stns=None,
xy=None,
meteo_vars=("RH",),
name="",
starts=None,
ends=None,
ObsClasses=None,
fill_missing_obs=False,
interval="daily",
use_api=True,
raise_exceptions=True,
):
"""Get knmi observations from a list of locations or a list of stations.
Parameters
----------
locations : pandas DataFrame or None
dataframe with columns 'x' and 'y' as coordinates. The
default is None
stns : list of str or None
list of knmi stations. The default is None
xy : list or numpy array, optional
xy coordinates of the locations. e.g. [[10,25], [5,25]]
meteo_vars : list or tuple of str
meteo variables e.g. ["RH", "EV24"]. The default is ("RH").
See list of all possible variables in the hpd.read_knmi docstring.
name : str, optional
name of the obscollection. The default is ''
starts : None, str, datetime or list, optional
start date of observations per meteo variable. The start date is
included in the time series.
If start is None the start date will be January 1st of the
previous year.
If start is str it will be converted to datetime.
If start is a list it should be the same length as meteo_vars and
the start time for each variable. The default is None
ends : list of str, datetime or None
end date of observations per meteo variable. The end date is
included in the time series.
If end is None the start date will be January 1st of the
previous year.
If end is a str it will be converted to datetime.
If end is a list it should be the same length as meteo_vars and
the end time for each meteo variable. The default is None
ObsClasses : list of type or None
class of the observations, can be PrecipitationObs, EvaporationObs
or MeteoObs. If None the type of observations is derived from the
meteo_vars.
fill_missing_obs : bool, optional
if True nan values in time series are filled with nearby time series.
The default is False.
interval : str, optional
desired time interval for observations. Options are 'daily' and
'hourly'. The default is 'daily'.
use_api : bool, optional
if True the api is used to obtain the data, API documentation is here:
https://www.knmi.nl/kennis-en-datacentrum/achtergrond/data-ophalen-vanuit-een-script
if False a text file is downloaded into a temporary folder and the
data is read from there. Default is True since the api is back
online (July 2021).
raise_exceptions : bool, optional
if True you get errors when no data is returned. The default is False.
**kwargs :
kwargs are passed to the `hydropandas.io.knmi.get_knmi_obslist` function
"""
from .io.knmi import get_knmi_obslist
# obtain ObsClass
if ObsClasses is None:
ObsClasses = []
for meteo_var in meteo_vars:
if meteo_var in ("RH", "RD"):
ObsClasses.append(obs.PrecipitationObs)
elif meteo_var == "EV24":
ObsClasses.append(obs.EvaporationObs)
else:
ObsClasses.append(obs.MeteoObs)
elif isinstance(ObsClasses, type):
if issubclass(
ObsClasses, (obs.PrecipitationObs, obs.EvaporationObs, obs.MeteoObs)
):
ObsClasses = [ObsClasses] * len(meteo_vars)
else:
TypeError(
"must be None, PrecipitationObs, EvaporationObs, MeteoObs, "
"list or tuple"
)
elif isinstance(ObsClasses, (list, tuple)):
pass
else:
TypeError(
"must be None, PrecipitationObs, EvaporationObs, MeteoObs, "
"list or tuple"
)
meta = {}
meta["starts"] = starts
meta["ends"] = ends
meta["name"] = name
meta["ObsClasses"] = ObsClasses
meta["meteo_vars"] = meteo_vars
obs_list = get_knmi_obslist(
locations,
stns,
xy,
meteo_vars,
starts=starts,
ends=ends,
ObsClasses=ObsClasses,
fill_missing_obs=fill_missing_obs,
interval=interval,
use_api=use_api,
raise_exceptions=raise_exceptions,
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
[docs] @classmethod
def from_list(cls, obs_list, name=""):
"""Read observations from a list of obs objects.
Parameters
----------
obs_list : list of observation.Obs
list of observations
name : str, optional
name of the observation collection
"""
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name)
[docs] @classmethod
def from_menyanthes(
cls, path, name="", ObsClass=obs.Obs, load_oseries=True, load_stresses=True
):
from .io.menyanthes import read_file
menyanthes_meta = {"path": path, "type": ObsClass}
obs_list = read_file(
path, ObsClass, load_oseries=load_oseries, load_stresses=load_stresses
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, meta=menyanthes_meta, name=name)
[docs] @classmethod
def from_modflow(
cls,
obs_collection,
ml,
hds_arr,
mtime,
modelname="",
nlay=None,
exclude_layers=None,
method="linear",
):
"""Read modflow groundwater heads at points in obs_collection.
Parameters
----------
obs_collection : ObsCollection
locations of model observation
ml : flopy.modflow.mf.model
modflow model
hds_arr : numpy array
heads with shape (ntimesteps, nlayers, nrow, ncol)
mtime : list of datetimes
dates for each model timestep
modelname : str, optional
modelname
nlay : int, optional
number of layers if None the number of layers from ml is used.
exclude_layers : list of int, optional
exclude the observations in these model layers
method : str, optional
interpolation method, either 'linear' or 'nearest',
default is linear
"""
from .io.modflow import read_modflow_results
mo_list = read_modflow_results(
obs_collection,
ml,
hds_arr,
mtime,
modelname=modelname,
nlay=nlay,
method=method,
exclude_layers=exclude_layers,
)
obs_df = util._obslist_to_frame(mo_list)
return cls(obs_df)
[docs] @classmethod
def from_waterinfo(
cls, file_or_dir, name="", ObsClass=obs.WaterlvlObs, progressbar=True, **kwargs
):
"""Read waterinfo file or directory.
Parameters
----------
file_or_dir : str
path to file or directory. Files can be .csv or .zip
name : str, optional
name of the collection, by default ""
ObsClass : Obs, optional
type of Obs to read data as, by default obs.WaterlvlObs
progressbar : bool, optional
show progressbar, by default True
Returns
-------
ObsCollection
ObsCollection containing data
"""
from .io import waterinfo
meta = {"name": name, "type": ObsClass, "filename": file_or_dir}
obs_list = waterinfo.read_waterinfo_obs(
file_or_dir, ObsClass, progressbar=progressbar, **kwargs
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
[docs] @classmethod
def from_wiski(
cls,
dirname,
ObsClass=obs.GroundwaterObs,
suffix=".csv",
unpackdir=None,
force_unpack=False,
preserve_datetime=False,
keep_all_obs=True,
**kwargs,
):
from .io.wiski import read_wiski_dir
meta = {
"dirname": dirname,
"type": ObsClass,
"suffix": suffix,
"unpackdir": unpackdir,
"force_unpack": force_unpack,
"preserver_datetime": preserve_datetime,
"keep_all_obs": keep_all_obs,
}
name = "wiski_import"
obs_list = read_wiski_dir(
dirname,
ObsClass=ObsClass,
suffix=suffix,
unpackdir=unpackdir,
force_unpack=force_unpack,
preserve_datetime=preserve_datetime,
keep_all_obs=keep_all_obs,
**kwargs,
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
[docs] @classmethod
def from_pastastore(
cls, pstore, libname, ObsClass=obs.GroundwaterObs, metadata_mapping=None
):
"""Read pastastore library.
Parameters
----------
pstore : pastastore.PastaStore
PastaStore object
libname : str
name of library (e.g. oseries or stresses)
ObsClass : Obs, optional
type of Obs to read data as, by default obs.GroundwaterObs
metadata_mapping : dict, optional
dictionary containing map between metadata field names in pastastore and
metadata field names expected by hydropandas, by default None.
Returns
-------
ObsCollection
ObsCollection containing data
"""
from .io import pastas
obs_list = pastas.read_pastastore_library(
pstore, libname, ObsClass=ObsClass, metadata_mapping=metadata_mapping
)
obs_df = util._obslist_to_frame(obs_list)
meta = {
"name": pstore.name,
"conntype": pstore.conn.conn_type,
"library": libname,
}
return cls(obs_df, name=pstore.name, meta=meta)
[docs] def get_obs(self, name=None, **kwargs):
"""get an observation object from a collection
Parameters
----------
name : str or None, optional
name of the observation you want to select, by default None
**kwargs : any metadata, value pair e.g. for a collection of GroundwaterObs:
tube_nr = 1 or source = 'BRO'
Returns
-------
hpd.Obs
Observation object from the collection.
Raises
------
ValueError
If multiple observations in the collection match the given attribute values.
ValueError
If no observation in the collection match the given attribute values.
"""
# select by name
if name is None:
selected_obs = self
else:
selected_obs = self.loc[[name]]
# select by condition
for key, item in kwargs.items():
condition = selected_obs[key] == item
selected_obs = selected_obs.loc[condition]
# return an Obs objet
if len(selected_obs) == 1:
return selected_obs["obs"].values[0]
elif len(selected_obs) == 0:
raise ValueError("no observations for given conditions")
else:
raise ValueError(
f"multiple observations for given conditions {selected_obs.index}"
)
[docs] def to_excel(self, path, meta_sheet_name="metadata"):
"""Write an ObsCollection to an excel, the first sheet in the excel contains the
metadata, the other tabs are the timeseries of each observation.
The excel can be read using the read_excel function of hydropandas.
Parameters
----------
path : str
full path of xlsx file.
meta_sheet_name : str, optional
sheetname with metadata. The default is "metadata".
Raises
------
RuntimeError
If the ObsCollection is inconsistent.
Returns
-------
None.
Notes
-----
The following data is NOT written to the excel file:
- The 'name' and 'meta' attributes of the ObsCollection
- metadata of each Observation stored in the 'meta' dictionary
If you don't want this consider using the `to_pickle` method.
"""
if not self._is_consistent():
raise RuntimeError("inconsistent observation collection")
oc = self.copy(deep=True)
with pd.ExcelWriter(path) as writer:
# replace obs column by observation type
obseries = oc.pop("obs")
oc["obs"] = [type(o).__name__ for o in obseries]
# write ObsCollection dataframe to first sheet
super(ObsCollection, oc).to_excel(writer, sheet_name=meta_sheet_name)
# write each observation time series to next sheets
for o in obseries:
sheetname = o.name
for ch in ["[", "]", ":", "*", "?", "/", "\\"]:
sheetname = sheetname.replace(ch, "_")
o.to_excel(writer, sheet_name=sheetname)
[docs] def to_pi_xml(self, fname, timezone="", version="1.24"):
from .io import fews
fews.write_pi_xml(self, fname, timezone=timezone, version=version)
[docs] def to_gdf(self, xcol="x", ycol="y", crs=28992, drop_obs=True):
"""Convert ObsCollection to GeoDataFrame.
Parameters
----------
xcol : str
column name with x values
ycol : str
column name with y values
crs : int, optional
coordinate reference system, by default 28992 (RD new).
drop_obs : bool, optional
drop the column with observations. Useful for basic geodataframe
manipulations that require JSON serializable columns. The default
is True.
Returns
-------
gdf : geopandas.GeoDataFrame
"""
gdf = util.df2gdf(self, xcol=xcol, ycol=ycol, crs=crs)
if drop_obs:
return gdf.drop(columns="obs")
else:
return gdf
[docs] def to_pastastore(
self,
pstore=None,
pstore_name="",
col=None,
kind="oseries",
add_metadata=True,
conn=None,
overwrite=False,
):
"""Add observations to a new or existing pastastore.
Parameters
----------
pstore : pastastore.PastaStore, optional
Existing pastastore, if None a new pastastore is created
pstore_name : str, optional
Name of the pastastore only used if pstore is None
col : str, optional
Name of the column in the Obs dataframe to be used. If None the
first numeric column in the Obs Dataframe is used.
kind : str, optional
The kind of series that is added to the pastastore. Use 'oseries'
for observations and anything else for stresses.
add_metadata : boolean, optional
If True metadata from the observations added to the pastastore
conn : pastastore.connectors or None, optional
type of connector, if None the DictConnector is used. Default is
None.
overwrite : boolean, optional
if True, overwrite existing series in pastastore, default is False
Returns
-------
pstore : pastastore.PastaStore
the pastastore with the series from the ObsCollection
"""
from .io.pastas import create_pastastore
pstore = create_pastastore(
self,
pstore,
pstore_name,
add_metadata=add_metadata,
kind=kind,
col=col,
conn=conn,
overwrite=overwrite,
)
return pstore
[docs] def to_shapefile(self, path, xcol="x", ycol="y"):
"""Save ObsCollection as shapefile.
Parameters
----------
path : str
filename of shapefile (.shp) or geopackage (.gpkg). A geopackage
has the advantage that column names will not be truncated.
xcol : str
column name with x values
ycol : str
column name with y values
"""
from geopandas.array import GeometryDtype
gdf = util.df2gdf(self, xcol, ycol)
# remove obs column
if "obs" in gdf.columns:
gdf.drop(columns="obs", inplace=True)
# change dtypes that are not accepted for shapefiles
for colname, coltype in gdf.dtypes.items():
# ommit geometry dtype
if isinstance(coltype, GeometryDtype):
pass
# cast boolean columns to int
elif coltype == bool:
gdf[colname] = gdf[colname].astype(int)
# cast datetime columns to str
elif np.issubdtype(coltype, np.datetime64):
gdf[colname] = gdf[colname].astype(str)
gdf.to_file(path)
[docs] def get_series(self, tmin=None, tmax=None, col=None):
"""
Parameters
----------
tmin : datetime, optional
start time for series. The default is None.
tmax : datetime, optional
end time for series. The default is None.
col : str or None, optional
the column of the obs dataframe to get measurements from. The
first numeric column is used if col is None, by default None.
Returns
-------
series of Series
series of a series of observations within a time frame.
"""
if tmin is None:
tmin = self.stats.dates_first_obs.min()
if tmax is None:
tmax = self.stats.dates_last_obs.max()
def get_s(o, tmin=tmin, tmax=tmax, col=col):
if col is None:
col = o._get_first_numeric_col_name()
return o.loc[tmin:tmax, col]
return self.obs.apply(lambda o: o.loc[tmin:tmax, col])
[docs] def interpolate(
self,
xy: List[List[float]],
kernel: str = "thin_plate_spline",
kernel2: str = "linear",
epsilon: Optional[int] = None,
col: Optional[str] = None,
):
"""Interpolation method for ObsCollections using the Scipy radial basis function
(RBF)
Parameters
----------
xy : List[List[float]]
xy coordinates of locations of interest e.g. [[10,25], [5,25]]
kernel : str, optional
Type of radial basis funtion, by default thin_plate_spline.
Other options are linear, gaussian, inverse_quadratic,
multiquadric, inverse_multiquadric, cubic or quintic.
kernel2 : str, optional
Kernel in case there are not enough observations (3 or 6) for
time step, by default linear. Other options are gaussian,
inverse_quadratic, multiquadric, or inverse_multiquadric.
epsilon : float, optional
Shape parameter that scales the input to the RBF. If kernel is
linear, thin_plate_spline, cubic, or quintic, this defaults to 1.
Otherwise this must be specified.
col : str, optional
Name of the column in the Obs dataframe to be used. If None the
first numeric column in the Obs Dataframe is used.
Returns
-------
ObsCollection
"""
otype = self._infer_otype()
if isinstance(otype, (list, np.ndarray)):
raise TypeError(
"Please make sure that all Obs are of the same type. Currently"
f" found {', '.join([x.__name__ for x in otype])}."
)
xy_oc = self.loc[:, ["x", "y"]]
obsdf = util.oc_to_df(self, col=col)
fill_df = util.interpolate(
xy, obsdf, xy_oc, kernel=kernel, kernel2=kernel2, epsilon=epsilon
)
# add all metadata that is equal for all observations
kwargs = {}
meta_att = set(otype._metadata) - set(["x", "y", "name", "source", "meta"])
for att in meta_att:
if (self.loc[:, att] == self.iloc[0].loc[att]).all():
kwargs[att] = self.iloc[0].loc[att]
obs_list = []
for i, col in enumerate(fill_df.columns):
o = otype(
fill_df.loc[:, [col]].copy(),
x=xy[i][0],
y=xy[i][1],
name=col,
source=f"interpolation {self.name}",
meta={"interpolation_kernel": kernel, "interpolation_epsilon": epsilon},
**kwargs,
)
obs_list.append(o)
return self.from_list(obs_list)