"""Module with ObsCollection class for a collection of observations.
The ObsCollection class is a subclass of a pandas DataFrame with
additional attributes and methods.
More information about subclassing pandas DataFrames can be found here:
http://pandas.pydata.org/pandas-docs/stable/development/extending.html#extending-subclassing-pandas
"""
import json
import logging
import numbers
import os
import warnings
from io import StringIO, TextIOWrapper
from pathlib import Path
from typing import Iterable, Literal
import numpy as np
import pandas as pd
from . import observation as obs
from . import util
from .serialization import HydropandasEncoder
logger = logging.getLogger(__name__)
[docs]def read_bro(
extent=None,
bro_id=None,
name="",
tmin=None,
tmax=None,
only_metadata=False,
keep_all_obs=True,
epsg=28992,
ignore_max_obs=False,
engine="hydropandas",
):
"""Get all the observations within an extent or within a groundwatermonitoring net.
Parameters
----------
extent : list, tuple, numpy-array or None, optional
get groundwater monitoring wells within this extent
[xmin, xmax, ymin, ymax]
bro_id : str or None, optional
starts with 'GMN'.
name : str, optional
name of the observation collection
tmin : str or None, optional
start time of observations. The default is None.
tmax : str or None, optional
end time of observations. The default is None.
only_metadata : bool, optional
if True download only metadata, significantly faster. The default
is False.
keep_all_obs : boolean, optional
add all observation points to the collection, even without
measurements
epsg : int, optional
epsg code of the extent. The default is 28992 (RD).
ignore_max_obs : bool, optional
by default you get a prompt if you want to download over a 1000
observations at once. if ignore_max_obs is True you won't get the
prompt. The default is False
engine : str, optional
Select how data from the bro-database is obtained, options are 'hydropandas' or
'brodata' The default is 'hydropandas'.
Returns
-------
ObsCollection
ObsCollection DataFrame with the 'obs' column
"""
oc = ObsCollection.from_bro(
extent=extent,
bro_id=bro_id,
name=name,
tmin=tmin,
tmax=tmax,
only_metadata=only_metadata,
keep_all_obs=keep_all_obs,
epsg=epsg,
ignore_max_obs=ignore_max_obs,
engine=engine,
)
return oc
[docs]def read_bronhouderportaal_bro(dirname, full_meta=False, add_to_df=False):
"""get all the metadata from files in a directory. Files are GMW files of
well construction, and are subbmitted to
https://www.bronhouderportaal-bro.nl .
Parameters
----------
dirname : str
name of directory that holds XML files
full_meta : bool, optional
process not only the standard metadata to ObsCollection
add_to_df : bool, optional
add all the metadata to the ObsCollection DataFrame
Returns
-------
ObsCollection
ObsCollection DataFrame without the 'obs' column
"""
oc = ObsCollection.from_bronhouderportaal_bro(
dirname=dirname,
full_meta=full_meta,
)
if add_to_df:
oc.add_meta_to_df(key="all")
return oc
[docs]def read_csv(path, parse_dates=True, index_col=0, **kwargs):
"""Create an observation collection from one or more csv files. The csv file(s)
should have the same format as csv files created with the `to_csv` method of an
ObsCollection.
Parameters
----------
path : str
directory with csv files, a .zip file with csv files or a single csv file.
parse_dates : bool, optional
whether to parse the dates when reading the csv files. The default is True.
index_col : int, optional
column to use as index, by default 0
kwargs:
kwargs are passed to the pandas.read_csv function
Returns
-------
ObsCollection
Notes
-----
if you write a csv file using the 'to_csv' method and read a csv
with the 'read_csv' method you lose this information:
- The 'name' and 'meta' attributes of the ObsCollection
- metadata of each Observation stored in the 'meta' attribute
- integer dtypes may become floats
If you don't want to lose this data consider using the `to_json` and
`read_json` function.
"""
path = Path(path)
if path.is_dir():
# read csv files in directory
obslist = []
for file in path.glob("*.csv"):
obslist.append(
obs.read_csv_obs(
file, parse_dates=parse_dates, index_col=index_col, **kwargs
)
)
if not obslist:
logger.warning(f"No csv files found in directory {path}")
elif path.suffix == ".zip":
# unzip and read .csv files in directory
dirpath, files = util.get_files(path, ".csv")
if not files:
logger.warning(f"No csv files found in directory {path}")
obslist = []
for file in files:
obslist.append(
obs.read_csv_obs(
Path(dirpath) / file,
parse_dates=parse_dates,
index_col=index_col,
**kwargs,
)
)
elif path.suffix == ".csv":
# read single csv file
obslist = [
obs.read_csv_obs(
path, parse_dates=parse_dates, index_col=index_col, **kwargs
)
]
else:
raise ValueError(f"Path {path} is not a directory, a zipfile or a .csv file")
return ObsCollection(obslist)
[docs]def read_dino(
dirname=None,
ObsClass=obs.GroundwaterObs,
subdir="DINO_Grondwaterstanden",
suffix=None,
keep_all_obs=True,
name=None,
**kwargs,
):
"""Read dino observations from a directory with
downloaded files.
Parameters
----------
dirname : str, optional
directory name, can be a .zip file or the parent directory
of subdir
ObsClass : type
class of the observations, so far only obs.GroundwaterObs is supported
subdir : str
subdirectory of dirname with data files. For old school dino zip files this
should be "Grondwaterstanden_Put". For new style the default value
DINO_Grondwaterstanden is sufficient. The default is DINO_Grondwaterstanden.
suffix : str or None, optional
suffix of files in subdir that will be read. For old school dino zip files this
should be '1.csv'. For new style the default value None is sufficient. The
default is None
keep_all_obs : boolean, optional
add all observation points to the collection, even the points
without measurements or metadata
name : str, optional
the name of the observation collection
kwargs:
kwargs are passed to the hydropandas.io.dino.read_dino_dir() function
Returns
-------
ObsCollection
collection of multiple point observations
"""
oc = ObsCollection.from_dino(
dirname=dirname,
ObsClass=ObsClass,
subdir=subdir,
suffix=suffix,
keep_all_obs=keep_all_obs,
name=name,
**kwargs,
)
return oc
[docs]def read_excel(path, meta_sheet_name="metadata"):
"""Create an observation collection from an excel file. The excel file should have
the same format as excel files created with the `to_excel` method of an
ObsCollection.
Parameters
----------
path : str
full file path (including extension) of the excel file.
meta_sheet_name : str, optional
sheetname with metadata. The default is "metadata".
Returns
-------
ObsCollection
Notes
-----
if you write an excel file using the 'to_excel' method and read an excel
with the 'read_excel' method you lose this information:
- The 'name' and 'meta' attributes of the ObsCollection
- metadata of each Observation stored in the 'meta' attribute
If you don't want to lose this data consider using the `to_json` and
`read_json` function.
"""
oc = ObsCollection.from_excel(path, meta_sheet_name=meta_sheet_name)
return oc
[docs]def read_fews(
file_or_dir=None,
xmlstring=None,
ObsClass=obs.GroundwaterObs,
name="fews",
translate_dic=None,
filterdict=None,
locations=None,
remove_nan=True,
low_memory=True,
unpackdir=None,
force_unpack=False,
preserve_datetime=False,
**kwargs,
):
"""Read one or several FEWS PI-XML files.
Parameters
----------
file_or_dir : str
zip, xml or directory with zips or xml files to read
xmlstring : str or None
string with xml data, only used if file_or_dir is None. Default is
None
ObsClass : type
class of the observations, e.g. GroundwaterObs or WaterlvlObs
name : str, optional
name of the observation collection, 'fews' by default
translate_dic : dic or None, optional
translate names from fews. If None this default dictionary is used:
{'locationId': 'locatie'}.
filterdict : dict, optional
dictionary with tag name to apply filter to as keys, and list of
accepted names as dictionary values to keep in final result,
i.e. {"locationId": ["B001", "B002"]}
locations : list of str, optional
list of locationId's to read from XML file, others are skipped.
If None (default) all locations are read. Only supported by
low_memory=True method!
low_memory : bool, optional
whether to use xml-parsing method with lower memory footprint,
default is True
remove_nan : boolean, optional
remove nan values from measurements, flag information about the
nan values is also lost, only used if low_memory=False
unpackdir : str
destination directory to unzip file if file_or_dir is a .zip
force_unpack : boolean, optional
force unpack if dst already exists
preserve_datetime : boolean, optional
whether to preserve datetime from zip archive
Returns
-------
ObsCollection
collection of multiple point observations
"""
oc = ObsCollection.from_fews_xml(
file_or_dir=file_or_dir,
xmlstring=xmlstring,
ObsClass=ObsClass,
name=name,
translate_dic=translate_dic,
filterdict=filterdict,
locations=locations,
remove_nan=remove_nan,
low_memory=low_memory,
unpackdir=unpackdir,
force_unpack=force_unpack,
preserve_datetime=preserve_datetime,
**kwargs,
)
return oc
[docs]def read_imod(
obs_collection,
ml,
runfile,
mtime,
model_ws,
modelname="",
nlay=None,
exclude_layers=0,
):
"""Read imod model results at point locations.
Parameters
----------
obs_collection : ObsCollection
collection of observations at which points imod results will be read
ml : flopy.modflow.mf.model
modflow model
runfile : Runfile
imod runfile object
mtime : list of datetimes
datetimes corresponding to the model periods
model_ws : str
model workspace with imod model
nlay : int, optional
number of layers if None the number of layers from ml is used.
modelname : str
modelname
exclude_layers : int
exclude modellayers from being read from imod
Returns
-------
ObsCollection
collection of multiple point observations
"""
oc = ObsCollection.from_imod(
obs_collection=obs_collection,
ml=ml,
runfile=runfile,
mtime=mtime,
model_ws=model_ws,
modelname=modelname,
nlay=nlay,
exclude_layers=exclude_layers,
)
return oc
[docs]def read_json(path, **kwargs):
"""Read an observation collection or an observation from a json file.
Parameters
----------
path : str
full file path (including extension) of the json file.
kwargs:
kwargs are passed to the ObsCollection.from_json function
Returns
-------
ObsCollection
"""
if isinstance(path, (TextIOWrapper, StringIO)):
fo = path
closing = False
elif isinstance(path, (str, os.PathLike)):
fo = open(path, "r")
closing = True
else:
raise TypeError("path should be a string or a file object")
d = json.load(fo)
if closing:
fo.close()
msg = (
"cannot read json file because no obstype is present, this is probably "
"because the json file was not created with hydropandas. Try "
"parsing using pandas.read_json"
)
assert "obstype" in d, msg
obstype = d["obstype"]
if obstype == "ObsCollection":
return ObsCollection.from_json(path, **kwargs)
else:
if hasattr(obs, obstype):
return getattr(obs, obstype).from_json(path, **kwargs)
else:
msg = (
f"cannot read a json file from {obstype=}, this is probably "
"because the json file was not created with hydropandas. Try "
"parsing using pandas.read_json"
)
raise ValueError(msg)
[docs]def read_knmi(
locations=None,
stns=None,
xy=None,
meteo_vars=("RH",),
name="",
starts=None,
ends=None,
ObsClasses=None,
fill_missing_obs=False,
interval="daily",
use_api=True,
raise_exceptions=True,
progress_callback=None,
fill_missing_obs_with_factor=False,
):
"""Get knmi observations from a list of locations or a list of stations.
Parameters
----------
locations : pandas DataFrame or None
dataframe with columns 'x' and 'y' as coordinates. The
default is None
stns : list of str or None
list of knmi stations. The default is None
xy : list or numpy array, optional
xy coordinates of the locations. e.g. [[10,25], [5,25]]
meteo_vars : list or tuple of str
meteo variables e.g. ["RH", "EV24"]. The default is ("RH").
See list of all possible variables below
name : str, optional
name of the obscollection. The default is ''
starts : None, str, datetime or list, optional
start date of observations per meteo variable. The start date is
included in the time series.
If start is None the start date will be January 1st of the
previous year.
If start is str it will be converted to datetime.
If start is a list it should be the same length as meteo_vars and
the start time for each variable. The default is None
ends : list of str, datetime or None
end date of observations per meteo variable. The end date is
included in the time series.
If end is None the start date will be January 1st of the
previous year.
If end is a str it will be converted to datetime.
If end is a list it should be the same length as meteo_vars and
the end time for each meteo variable. The default is None
ObsClasses : list of type or None
class of the observations, can be PrecipitationObs, EvaporationObs
or MeteoObs. If None the type of observations is derived from the
meteo_vars.
fill_missing_obs : bool, optional
if True nan values in time series are filled with nearby time series.
The default is False.
progress_callback : callable or None, optional
callback function called with (i, total) for each station processed.
The default is None.
fill_missing_obs_with_factor : bool, optional
if True, donor-station values are scaled with an overlap-based factor
before filling missing values. This automatically enables
fill_missing_obs.
The default is False.
**kwargs :
kwargs are passed to the hydropandas.io.knmi.get_knmi_obslist function
List of possible variables:
neerslagstations:
RD = de 24-uurs neerslagsom, gemeten van 0800 utc op de
voorafgaande dag tot 0800 utc op de vermelde datum meteostations:
DDVEC = Vectorgemiddelde windrichting in graden (360=noord,
90=oost, 180=zuid, 270=west, 0=windstil/variabel). Zie
http://www.knmi.nl/kennis-en-datacentrum/achtergrond/klimatologische-brochures-en-boeken
/ Vector mean wind direction in degrees (360=north, 90=east,
180=south, 270=west, 0=calm/variable)
FHVEC = Vectorgemiddelde windsnelheid (in 0.1 m/s). Zie
http://www.knmi.nl/kennis-en-datacentrum/achtergrond/klimatologische-brochures-en-boeken
/ Vector mean windspeed (in 0.1 m/s)
FG = Etmaalgemiddelde windsnelheid (in 0.1 m/s) / Daily mean
windspeed (in 0.1 m/s)
FHX = Hoogste uurgemiddelde windsnelheid (in 0.1 m/s) / Maximum
hourly mean windspeed (in 0.1 m/s)
FHXH = Uurvak waarin FHX is gemeten / Hourly division in which
FHX was measured
FHN = Laagste uurgemiddelde windsnelheid (in 0.1 m/s) / Minimum
hourly mean windspeed (in 0.1 m/s)
FHNH = Uurvak waarin FHN is gemeten / Hourly division in which
FHN was measured
FXX = Hoogste windstoot (in 0.1 m/s) / Maximum wind gust (in
0.1 m/s)
FXXH = Uurvak waarin FXX is gemeten / Hourly division in which
FXX was measured
TG = Etmaalgemiddelde temperatuur (in 0.1 graden Celsius) /
Daily mean temperature in (0.1 degrees Celsius)
TN = Minimum temperatuur (in 0.1 graden Celsius) / Minimum
temperature (in 0.1 degrees Celsius)
TNH = Uurvak waarin TN is gemeten / Hourly division in which TN
was measured
TX = Maximum temperatuur (in 0.1 graden Celsius) / Maximum
temperature (in 0.1 degrees Celsius)
TXH = Uurvak waarin TX is gemeten / Hourly division in which TX
was measured
T10N = Minimum temperatuur op 10 cm hoogte (in 0.1 graden
Celsius) / Minimum temperature at 10 cm above surface (in 0.1
degrees Celsius)
T10NH = 6-uurs tijdvak waarin T10N is gemeten / 6-hourly division
in which T10N was measured; 6=0-6 UT, 12=6-12 UT, 18=12-18 UT,
24=18-24 UT
SQ = Zonneschijnduur (in 0.1 uur) berekend uit de globale
straling (-1 voor <0.05 uur) / Sunshine duration (in 0.1 hour)
calculated from global radiation (-1 for <0.05 hour)
SP = Percentage van de langst mogelijke zonneschijnduur /
Percentage of maximum potential sunshine duration
Q = Globale straling (in J/cm2) / Global radiation (in J/cm2)
DR = Duur van de neerslag (in 0.1 uur) / Precipitation duration
(in 0.1 hour)
RH = Etmaalsom van de neerslag (in 0.1 mm) (-1 voor <0.05 mm) /
Daily precipitation amount (in 0.1 mm) (-1 for <0.05 mm)
RHX = Hoogste uursom van de neerslag (in 0.1 mm) (-1 voor <0.05
mm) / Maximum hourly precipitation amount (in 0.1 mm) (-1 for
<0.05 mm)
RHXH = Uurvak waarin RHX is gemeten / Hourly division in which
RHX was measured
PG = Etmaalgemiddelde luchtdruk herleid tot zeeniveau (in 0.1
hPa) berekend uit 24 uurwaarden / Daily mean sea level pressure
(in 0.1 hPa) calculated from 24 hourly values
PX = Hoogste uurwaarde van de luchtdruk herleid tot zeeniveau
(in 0.1 hPa) / Maximum hourly sea level pressure (in 0.1 hPa)
PXH = Uurvak waarin PX is gemeten / Hourly division in which PX
was measured
PN = Laagste uurwaarde van de luchtdruk herleid tot zeeniveau
(in 0.1 hPa) / Minimum hourly sea level pressure (in 0.1 hPa)
PNH = Uurvak waarin PN is gemeten / Hourly division in which PN
was measured
P = Luchtdruk (in 0.1 hPa) herleid tot zeeniveau, op het moment
van meten / Air pressure (in 0.1 hPa) reduced to mean sea level, at
the time of observation
VVN = Minimum opgetreden zicht / Minimum visibility; 0: <100 m,
1:100-200 m, 2:200-300 m,..., 49:4900-5000 m, 50:5-6 km,
56:6-7 km, 57:7-8 km,..., 79:29-30 km, 80:30-35 km, 81:35-40 km,
..., 89: >70 km)
VVNH = Uurvak waarin VVN is gemeten / Hourly division in which
VVN was measured
VVX = Maximum opgetreden zicht / Maximum visibility; 0: <100 m,
1:100-200 m, 2:200-300 m,..., 49:4900-5000 m, 50:5-6 km,
56:6-7 km, 57:7-8 km,..., 79:29-30 km, 80:30-35 km, 81:35-40 km,
..., 89: >70 km)
VVXH = Uurvak waarin VVX is gemeten / Hourly division in which
VVX was measured
NG = Etmaalgemiddelde bewolking (bedekkingsgraad van de
bovenlucht in achtsten, 9=bovenlucht onzichtbaar) / Mean daily
cloud cover (in octants, 9=sky invisible)
UG = Etmaalgemiddelde relatieve vochtigheid (in procenten) /
Daily mean relative atmospheric humidity (in percents)
UX = Maximale relatieve vochtigheid (in procenten) / Maximum
relative atmospheric humidity (in percents)
UXH = Uurvak waarin UX is gemeten / Hourly division in which UX
was measured
UN = Minimale relatieve vochtigheid (in procenten) / Minimum
relative atmospheric humidity (in percents)
UNH = Uurvak waarin UN is gemeten / Hourly division in which UN
was measured
EV24 = Referentiegewasverdamping (Makkink) (in 0.1 mm) /
Potential evapotranspiration (Makkink) (in 0.1 mm)
Returns
-------
ObsCollection
collection of multiple point observations
"""
oc = ObsCollection.from_knmi(
locations=locations,
stns=stns,
xy=xy,
meteo_vars=meteo_vars,
name=name,
starts=starts,
ends=ends,
ObsClasses=ObsClasses,
fill_missing_obs=fill_missing_obs,
interval=interval,
use_api=use_api,
raise_exceptions=raise_exceptions,
progress_callback=progress_callback,
fill_missing_obs_with_factor=fill_missing_obs_with_factor,
)
return oc
[docs]def read_knmi_scenarios(
stn: int | str,
years: Iterable[Literal["2033", "2050", "2100", "2150"]] = (
"2033",
"2050",
"2100",
"2150",
),
scenarios: Iterable[Literal["Ld", "Ln", "Md", "Mn", "Hd", "Hn"]] = (
"Ld",
"Ln",
"Md",
"Mn",
"Hd",
"Hn",
),
evap: Literal["EV24", "makkink", "penman", "hargreaves"] = "EV24",
meteo_vars: Iterable[Literal["TG", "RD", "Q", "TX", "TN", "UG", "FG", "EV24"]]
| None = None,
name: str = "",
):
"""Get KNMI climate scenario observations for a station.
Retrieves climate scenario data from KNMI and returns an ObsCollection
with temperature, precipitation, and evaporation observations for different
climate scenarios.
Parameters
----------
stn : int or str
Station number (e.g., 550 or "550").
years : tuple, optional
Years of climate scenario. The default is ('2033','2050','2100','2150').
scenarios : tuple, optional
Names of climate scenario. The default is ('Ld','Ln','Md','Mn','Hd','Hn').
This includes all scenarios including the original measurements.
evap : str, optional
Method for calculating evaporation. Options are 'EV24', 'makkink',
'penman', or 'hargreaves'. The default is 'EV24'.
meteo_vars : iterable of str or None, optional
Meteorological variables to include in the ObsCollection. Possible
variables include 'TG', 'RD', 'Q', 'TX', 'TN', 'UG', 'FG', and 'EV24'.
If None (default), all available variables are included.
name : str, optional
Name of the observation collection. The default is "".
Returns
-------
ObsCollection
Collection of climate scenario observations with temperature, precipitation,
and evaporation data for different scenarios.
Examples
--------
>>> oc = hpd.read_knmi_scenarios("550")
>>> oc = hpd.read_knmi_scenarios(
... "550",
... years=["2050", "2100"],
... scenarios=["Md", "Hd"],
... evap="Makkink"
... )
"""
oc = ObsCollection.from_knmi_scenarios(
stn=stn,
years=years,
scenarios=scenarios,
evap=evap,
meteo_vars=meteo_vars,
name=name,
)
return oc
[docs]def read_lizard(
extent=None,
codes=None,
name="",
tube_nr="all",
tmin=None,
tmax=None,
type_timeseries=None, # deprecated argument
which_timeseries=("hand", "diver"), # new preferred argument
datafilters=None,
combine_method="merge",
only_metadata=False,
organisation="vitens",
auth=None,
):
"""Get all observations from a list of codes of the monitoring wells and a list of
tube numbers.
Parameters
----------
extent : list, shapefile path or None
get groundwater monitoring wells within this extent [xmin, xmax, ymin, ymax]
or within a predefined Polygon from a shapefile
codes : lst of str or None
codes of the monitoring wells
tube_nr : lst of str
list of tube numbers of the monitoring wells that should be selected.
By default 'all' available tubes are selected.
tmin : str YYYY-m-d, optional
start of the observations, by default the entire time series is returned
tmax : str YYYY-m-d, optional
end of the observations, by default the entire time series is returned
type_timeseries : str, optional (deprecated)
Old keyword, use which_timeseries instead.
which_timeseries : tuple of str, optional
Which timeseries to retrieve. Options: "hand", "diver", "diver_validated".
Defaults to ("hand", "diver") (which should be correct for Vitens).
datafilters : list of strings, optional
Methods to filter the timeseries data.
If None (default), all measurements will be shown.
Currently implemented datafilter methods:
"remove_unvalidated_diver_values_when_validated_available": Removes diver values before last date with validated diver.
"remove_hand_during_diver_period": Removes hand measurements during periods where diver or diver_validated measurements are available.
combine_method : str, optional
"merge" (vertical stack with 'origin' column) or "columns" (side-by-side columns).
If None, defaults to "merge".
only_metadata : bool, optional
if True only metadata is returned and no time series data. The
default is False.
organisation : str, optional
organisation of the data, by default "vitens".
auth : tuple, optional
authentication credentials for the API request, e.g.: ("__key__", your_api_key)
Returns
-------
ObsCollection
ObsCollection DataFrame with the 'obs' column
"""
oc = ObsCollection.from_lizard(
extent=extent,
codes=codes,
name=name,
tube_nr=tube_nr,
tmin=tmin,
tmax=tmax,
type_timeseries=type_timeseries,
which_timeseries=which_timeseries,
datafilters=datafilters,
combine_method=combine_method,
only_metadata=only_metadata,
organisation=organisation,
auth=auth,
)
return oc
[docs]def read_matroos(
extent=None,
name="",
ObsClass=obs.WaterlvlObs,
locations=None,
units=None,
sources=None,
tmin=None,
tmax=None,
only_metadata=False,
keep_all_obs=False,
**kwargs,
):
"""Read measurement using the Matroos API within an extent.
Parameters
----------
extent : list, tuple, numpy-array or None, optional
get measurements within this extent
[xmin, xmax, ymin, ymax]
name : str, optional
name of the collection, by default ""
ObsClass : type
class of the observations, e.g. WaterlvlObs
locations : list, tuple or None, optional
locations to select, if None all locations are selected, by default None
units : list, tuple or None, optional
units to select, if None all units are selected, by default None
sources : list, tuple or None, optional
sources to select, if None all sources are selected, by default None
tmin : pd.Timestamp, str or None, optional
start time of observations. The default is None.
tmax : pd.Timestamp, str or None, optional
end time of observations. The default is None.
only_metadata : bool, optional
if True download only metadata, significantly faster. The default
is False.
keep_all_obs : bool, optional
if False, only observations with measurements are kept. The default
is True.
**kwargs
additional keyword arguments are passed to the ObsClass.from_matroos()
method
Returns
-------
ObsCollection
ObsCollection containing data
"""
oc = ObsCollection.from_matroos(
extent=extent,
name=name,
ObsClass=ObsClass,
locations=locations,
units=units,
sources=sources,
tmin=tmin,
tmax=tmax,
only_metadata=only_metadata,
keep_all_obs=keep_all_obs,
**kwargs,
)
return oc
[docs]def read_menyanthes(
path, name="", ObsClass=obs.Obs, load_oseries=True, load_stresses=True
):
"""Read a Menyanthes file.
Parameters
----------
path : str
full path of the .men file.
name : str, optional
name of the observation collection. The default is "".
ObsClass : type, optional
class of the observations, e.g. GroundwaterObs. The default is
Obs.
load_oseries : bool, optional
if True the observations are read. The default is True.
load_stresses : bool, optional
if True the stresses are read. The default is True.
Returns
-------
ObsCollection
collection of multiple point observations
"""
oc = ObsCollection.from_menyanthes(
path=path,
name=name,
ObsClass=ObsClass,
load_oseries=load_oseries,
load_stresses=load_stresses,
)
return oc
[docs]def read_modflow(
obs_collection,
ml,
hds_arr,
mtime,
modelname="",
nlay=None,
exclude_layers=None,
method="linear",
):
"""Read modflow groundwater heads at locations in obs_collection.
Parameters
----------
obs_collection : ObsCollection
locations of model observation
ml : flopy.modflow.mf.model
modflow model
hds_arr : numpy array
heads with shape (ntimesteps, nlayers, nrow, ncol)
mtime : list of datetimes
dates for each model timestep
modelname : str, optional
modelname
nlay : int, optional
number of layers if None the number of layers from ml is used.
exclude_layers : list of int, optional
exclude the observations in these model layers
method : str, optional
interpolation method, either 'linear' or 'nearest',
default is linear
Returns
-------
ObsCollection
collection of multiple point observations
"""
oc = ObsCollection.from_modflow(
obs_collection=obs_collection,
ml=ml,
hds_arr=hds_arr,
mtime=mtime,
modelname=modelname,
nlay=nlay,
exclude_layers=exclude_layers,
method=method,
)
return oc
[docs]def read_pickle(
filepath_or_buffer,
compression="infer",
storage_options=None,
):
"""Wrapper around pd.read_pickle.
Parameters
----------
filepath_or_buffer : str, path object, or file-like object
String, path object (implementing ``os.PathLike[str]``), or file-like
object implementing a binary ``readlines()`` function.
.. versionchanged:: 1.0.0
Accept URL. URL is not limited to S3 and GCS.
compression : str or dict, default 'infer'
For on-the-fly decompression of on-disk data. If 'infer' and
'filepath_or_buffer' is path-like, then detect compression from the
following extensions: '.gz','.bz2', '.zip', '.xz', or '.zst' (otherwise
no compression). If using 'zip', the ZIP file must contain only one data
file to be read in. Set to ``None`` for no decompression. Can also be a
dict with key ``'method'`` set to one of {``'zip'``, ``'gzip'``,
``'bz2'``, ``'zstd'``} and other key-value pairs are forwarded to
``zipfile.ZipFile``, ``gzip.GzipFile``, ``bz2.BZ2File``, or
``zstandard.ZstdDecompressor``, respectively. As an example, the
following could be passed for Zstandard decompression using a custom
compression dictionary: ``compression={'method': 'zstd', 'dict_data':
my_compression_dict}``.
.. versionchanged:: 1.4.0 Zstandard support.
storage_options : dict, optional
Extra options that make sense for a particular storage connection, e.g.
host, port, username, password, etc. For HTTP(S) URLs the key-value
pairs are forwarded to ``urllib`` as header options. For other URLs
(e.g. starting with "s3://", and "gcs://") the key-value pairs are
forwarded to ``fsspec``. Please see ``fsspec`` and ``urllib`` for more
details.
.. versionadded:: 1.2.0
Returns
-------
ObsCollection : same type as object stored in file
"""
return pd.read_pickle(filepath_or_buffer, compression, storage_options)
[docs]def read_waterconnect(
extent=None,
name="",
ObsClass=obs.GroundwaterObs,
tmin=None,
tmax=None,
only_metadata=False,
keep_all_obs=False,
location_gdf=None,
update=False,
**kwargs,
):
"""Read waterconnect measurement within an extent
Parameters
----------
extent : list, tuple, numpy-array or None, optional
get water connect measurements within this extent
[xmin, xmax, ymin, ymax], coordinates are in lat (y) lon (x).
name : str, optional
name of the collection, by default ""
ObsClass : type
class of the observations, e.g. GroundwaterObs
tmin : str or None, optional
start time of observations. The default is None.
tmax : str or None, optional
end time of observations. The default is None.
only_metadata : bool, optional
if True download only metadata, significantly faster. The default
is False.
keep_all_obs : bool, optional
if False, only observations with measurements are kept. The default
is True.
location_gdf : GeoDataFrame, optional
geodataframe with the locations of the water drill holes you want to include.
update : bool, optional
if True new locations are downloaded and stored locally (slow) otherwise a
cached version of the locations is used. By default False
**kwargs
additional keyword arguments are passed to the ObsClass.from_waterconnect()
method
Returns
-------
ObsCollection
ObsCollection containing data
"""
oc = ObsCollection.from_waterconnect(
extent=extent,
name=name,
ObsClass=ObsClass,
tmin=tmin,
tmax=tmax,
only_metadata=only_metadata,
keep_all_obs=keep_all_obs,
location_gdf=location_gdf,
update=update,
**kwargs,
)
return oc
[docs]def read_waterinfo(
file_or_dir=None,
extent=None,
name="",
ObsClass=obs.WaterlvlObs,
locatie=None,
grootheid_code=None,
groepering_code=None,
parameter_code=None,
proces_type=None,
tmin=None,
tmax=None,
only_metadata=False,
keep_all_obs=False,
epsg=28992,
progressbar=True,
location_gdf=None,
**kwargs,
):
"""Read waterinfo measurement within an extent or from a file or directory
Parameters
----------
file_or_dir : str or None, optional
path to file or directory. Files can be .csv or .zip
extent : list, tuple, numpy-array or None, optional
get waterinfo measurements within this extent
[xmin, xmax, ymin, ymax]
name : str, optional
name of the collection, by default ""
ObsClass : Obs, optional
type of Obs to read data as, by default WaterlvlObs
locatie : str or list of str, optional
select only measurement with this location(s), e.g. 'schoonhoven', default is None
grootheid_code : str or list of str, optional
select only measurement with this grootheid_code, e.g. 'WATHTE', default is None
groepering_code : str or list of str, optional
select only measurement with this groepering_code, e.g. 'GETETBRKD2', default is None
parameter_code : str or list of str, optional
select only measurement with this parameter_code, e.g. 'Cl', default is None
proces_type : str or list of str, optional
select only measurement with this proces_type, e.g. 'meting', default is None
tmin : pd.Timestamp, str or None, optional
start time of observations. The default is None.
tmax : pd.Timestamp, str or None, optional
end time of observations. The default is None.
only_metadata : bool, optional
if True download only metadata, significantly faster. The default
is False.
keep_all_obs : bool, optional
if False, only observations with measurements are kept. The default
is True.
epsg : int, optional
epsg code of the extent. The default is 28992 (RD).
progressbar : bool, optional
show progressbar, by default True
location_gdf : GeoDataFrame, optional
geodataframe with the locations of the measurements you want to include. If
location_gdf is provided the provided extent and epgs will be ignored.
Returns
-------
ObsCollection
ObsCollection containing data
"""
oc = ObsCollection.from_waterinfo(
extent=extent,
file_or_dir=file_or_dir,
name=name,
ObsClass=ObsClass,
locatie=locatie,
grootheid_code=grootheid_code,
groepering_code=groepering_code,
parameter_code=parameter_code,
proces_type=proces_type,
tmin=tmin,
tmax=tmax,
only_metadata=only_metadata,
keep_all_obs=keep_all_obs,
epsg=epsg,
progressbar=progressbar,
location_gdf=location_gdf,
**kwargs,
)
return oc
[docs]def read_wiski(
dirname,
ObsClass=obs.GroundwaterObs,
suffix=".csv",
unpackdir=None,
force_unpack=False,
preserve_datetime=False,
keep_all_obs=True,
**kwargs,
):
"""
Parameters
----------
dirname : str
path of the zipfile with wiski data.
ObsClass : type, optional
type of Obs. The default is GroundwaterObs.
suffix : str, optional
extension of filenames to read. The default is ".csv".
unpackdir : str or None, optional
directory to unpack zipped directory. The default is None.
force_unpack : bool, optional
force unzip, by default False.
preserve_datetime : bool, optional
preserve datetime of unzipped files, by default False
(useful for checking whether data has changed)
keep_all_obs : bool, optional
If True keep all observations even those without metadata. The default
is True.
**kwargs
Returns
-------
ObsCollection
ObsCollection containing observation data
"""
oc = ObsCollection.from_wiski(
dirname=dirname,
ObsClass=ObsClass,
suffix=suffix,
unpackdir=unpackdir,
force_unpack=force_unpack,
preserve_datetime=preserve_datetime,
keep_all_obs=keep_all_obs,
**kwargs,
)
return oc
[docs]def read_pastastore(
pstore,
libname,
ObsClass=obs.GroundwaterObs,
metadata_mapping=None,
):
"""Read pastastore library.
Parameters
----------
pstore : pastastore.PastaStore
PastaStore object
libname : str
name of library (e.g. oseries or stresses)
ObsClass : Obs, optional
type of Obs to read data as, by default GroundwaterObs
metadata_mapping : dict, optional
dictionary containing map between metadata field names in pastastore and
metadata field names expected by hydropandas, by default None.
Returns
-------
ObsCollection
ObsCollection containing data
"""
return ObsCollection.from_pastastore(
pstore=pstore,
libname=libname,
ObsClass=ObsClass,
metadata_mapping=metadata_mapping,
)
def _obscollection_constructor_with_fallback(*args, **kwargs):
"""
A flexible constructor for ObsCollection._constructor, which falls back
to returning a DataFrame (if a certain operation does not preserve the
obs column). Copied from geopandas.
"""
oc = ObsCollection(*args, **kwargs)
if "obs" not in oc.columns:
oc = pd.DataFrame(oc)
return oc
[docs]class ObsCollection(pd.DataFrame):
"""Class for a collection of point observations.
An ObsCollection object is a subclass of a pandas.DataFrame and allows for
additional attributes and methods. Additional attributes are
defined in the '_metadata' attribute.
Parameters
----------
*args observations, list of observations or a pandas DataFrame,
**kwargs can be one of these:
name : str
name of the observation collection
meta : dic
metadata of the observation collection
"""
# temporary properties
_internal_names = pd.DataFrame._internal_names + ["none"]
_internal_names_set = set(_internal_names)
# normal properties
_metadata = [
"name",
"meta",
]
def __init__(self, *args, **kwargs):
self.name = kwargs.pop("name", "")
self.meta = kwargs.pop("meta", {})
if len(args) == 0:
logger.debug("Create empty ObsCollection")
super().__init__(**kwargs)
elif isinstance(args[0], ObsCollection):
super().__init__(*args, **kwargs)
elif isinstance(args[0], (list, tuple)):
logger.debug("Convert list of observations to ObsCollection")
obs_df = util._obslist_to_frame(args[0])
super().__init__(obs_df, *args[1:], **kwargs)
elif isinstance(args[0], obs.Obs):
logger.debug("Convert observation(s) to ObsCollection")
obs_list = [o for o in args if isinstance(o, obs.Obs)]
remaining_args = [o for o in args if not isinstance(o, obs.Obs)]
obs_df = util._obslist_to_frame(obs_list)
super().__init__(obs_df, *remaining_args, **kwargs)
elif isinstance(args[0], pd.DataFrame) and (
"obs_list" in kwargs or "ObsClass" in kwargs
):
if "obs_list" in kwargs:
obs_list = kwargs.pop("obs_list")
obs_df = self.from_dataframe(*args, obs_list=obs_list, **kwargs)
elif "ObsClass" in kwargs:
ObsClass = kwargs.pop("ObsClass")
obs_df = self.from_dataframe(*args, ObsClass=ObsClass, **kwargs)
super().__init__(obs_df, **kwargs)
else:
super().__init__(*args, **kwargs)
@property
def _constructor(self):
return _obscollection_constructor_with_fallback
def _infer_otype(self):
"""Infer observation type from the obs column.
Parameters
----------
Returns
-------
otypes
list of types of the observation objects
"""
otypes = self.obs.apply(lambda x: type(x)).unique()
if otypes.shape[0] == 1:
logger.debug(f"inferred observation type: {otypes[0]}")
return otypes
elif otypes.shape[0] > 1:
logger.debug(f"inferred multiple otypes, types: {otypes}")
return otypes
else:
raise TypeError("could not infer observation type")
def _set_metadata_value(self, *args, **kwargs):
warnings.warn(
"the private method '_set_metadata_value' is deprecated and will eventually"
"be removed, please use the non-private 'set_metadata_value' method.",
DeprecationWarning,
)
return self.set_metadata_value(*args, **kwargs)
def _is_consistent(self, check_individual_obs=True):
"""check if an observation collection is consistent. An observation
collection is consistent if:
1. all observations have a unique name
2. there are no nan values in the obs column
3. (optional) the metadata of each observation has the same type
and value as the corresponding row in the observation collection
dataframe. Only checked if check_individual_obs is True.
Parameters
----------
check_individual_obs : bool, optional
If True the third condition in the list above is checked. The
default is True.
Returns
-------
bool
True -> consistent
False -> inconsistent.
"""
# check unique index
if not self.index.is_unique:
msg = (
f"index of observation collection '{self.name}' is "
"not unique. non unique indices are: "
)
msg += " ".join(self.index[self.index.duplicated()])
logger.warning(msg)
return False
# check nan values in observations
if self.obs.isna().any():
logger.warning(
f"missing observation(s) object in collection '{self.name}' "
)
return False
# check oc data with individual object attributes
if check_individual_obs:
for o in self.obs.values:
for att in o._get_meta_attr():
if att not in ["name", "meta"]:
v1 = self.loc[o.name, att]
v2 = getattr(o, att)
# check if values are equal
try:
if v1 != v2:
# check if both are nan
if isinstance(v1, numbers.Number) and isinstance(
v2, numbers.Number
):
if np.isnan(v1) and np.isnan(v2):
continue
# otherwise return Nan
logger.warning(
f"observation collection '{self.name}' not "
f"consistent because of metadata value '{att}' "
f"of observation '{o.name}'"
)
return False
except TypeError:
logger.warning(
f"observation collection '{self.name}' not "
f"consistent because of metadata type '{att}' "
f"of observation '{o.name}'"
)
return False
elif att == "name":
if o.name not in self.index:
logger.warning(
f"observation collection '{self.name}' not "
f"consistent because observation '{o.name}' not "
"in collection index"
)
return False
return True
[docs] def add_observation(self, o, check_consistency=True, inplace=False, **kwargs):
"""Add an observation to an existing observation collection. If the observation
exists the two observations are merged.
Parameters
----------
o : hpd.observation.Obs
Observation object.
check_consistency : bool, optional
If True the consistency of the collection is first checked. The
default is True.
inplace : bool, optional
If True, modifies the ObsCollection in place (do not create a new
object). The default is False.
**kwargs passed to Obs.merge_observation:
merge_metadata : bool, optional
If True and observations are merged the metadata of the two
objects are merged. If there are any differences the overlap
parameter is used to determine which metadata is used. If
merge_metadata is False, the metadata of the original
observation is always used for the merged observation. The
default is True.
overlap : str, optional
How to deal with overlapping timeseries with different values.
Options are:
- error : Raise a ValueError
- use_left : use the overlapping part from the existing
observations
- use_right : use the overlapping part from the new observation
Default is 'error'.
Raises
------
RuntimeError
when the observation collection is inconsistent.
TypeError
when the observation type is wrong.
Returns
-------
None.
"""
if check_consistency and not self._is_consistent():
raise RuntimeError("inconsistent observation collection")
if not isinstance(o, obs.Obs):
raise TypeError("Observation should be of type hydropandas.observation.Obs")
if inplace:
oc = self
else:
oc = self.copy(deep=True)
# add new observation to collection
if o.name not in oc.index:
logger.info(f"adding {o.name} to collection")
oc.loc[o.name] = o.to_collection_dict()
else:
logger.info(
f"observation name {o.name} already in collection, merging observations"
)
o1 = oc.loc[o.name, "obs"]
omerged = o1.merge_observation(o, **kwargs)
# overwrite observation in collection
oc.loc[o.name] = omerged.to_collection_dict()
if not inplace:
return oc
[docs] def add_obs_collection(
self, obs_collection, check_consistency=True, inplace=False, **kwargs
):
"""Add one observation collection to another observation collection. See
add_observation method for more details.
Parameters
----------
obs_collection : hpd.ObsCollection
ObsCollection object.
check_consistency : bool, optional
If True the consistency of both collections is first checked. The
default is True.
inplace : bool, optional
If True, modifies the ObsCollection in place (do not create a new
object). The default is False.
**kwargs passed to Obs.merge_observation:
merge_metadata : bool, optional
If True and observations are merged the metadata of the two
objects are merged. If there are any differences the overlap
parameter is used to determine which metadata is used. If
merge_metadata is False, the metadata of the original
observation is always used for the merged observation. The
default is True.
overlap : str, optional
How to deal with overlapping timeseries with different values.
Options are:
- error : Raise a ValueError
- use_left : use the overlapping part from the existing
observations
- use_right : use the overlapping part from the new observation
Default is 'error'.
Raises
------
RuntimeError
when the observation collection is inconsistent.
Returns
-------
ObsCollection or None
merged ObsCollection if ``inplace=True``.
"""
if check_consistency:
if not self._is_consistent():
raise RuntimeError(
f"inconsistent observation collection -> {self.name}"
)
if not obs_collection._is_consistent():
raise RuntimeError(
f"inconsistent observation collection -> {obs_collection.name}"
)
if inplace:
for o in obs_collection.obs.values:
self.add_observation(o, check_consistency=False, inplace=True, **kwargs)
else:
oc = self.copy()
for o in obs_collection.obs.values:
oc.add_observation(o, check_consistency=False, inplace=True, **kwargs)
return oc
[docs] def copy(self, deep=False):
"""Make a copy of this object's indices and data.
Parameters
----------
deep : bool, default True
Make a deep copy, including a deep copy of the observation objects.
With ``deep=False`` neither the indices nor the data are copied.
Returns
-------
ObsCollection
"""
if deep:
oc = super().copy(deep=deep)
# manually make a deep copy of the observations
oc["obs"] = [o.copy(deep=deep) for o in oc.obs.values]
return oc
return super().copy(deep=deep)
[docs] @classmethod
def from_bro(
cls,
extent=None,
bro_id=None,
name="",
tmin=None,
tmax=None,
only_metadata=False,
keep_all_obs=True,
epsg=28992,
ignore_max_obs=False,
engine="hydropandas",
):
"""Get all the observations within an extent or within a groundwatermonitoring
net.
Parameters
----------
extent : list, tuple, numpy-array or None, optional
get groundwater monitoring wells within this extent
[xmin, xmax, ymin, ymax]
bro_id : str or None, optional
starts with 'GMN'.
name : str, optional
name of the observation collection
tmin : str or None, optional
start time of observations. The default is None.
tmax : str or None, optional
end time of observations. The default is None.
only_metadata : bool, optional
if True download only metadata, significantly faster. The default
is False.
keep_all_obs : boolean, optional
add all observation points to the collection, even without
measurements
epsg : int, optional
epsg code of the extent. The default is 28992 (RD).
ignore_max_obs : bool, optional
by default you get a prompt if you want to download over a 1000
observations at once. if ignore_max_obs is True you won't get the
prompt. The default is False
engine : str, optional
Select how data from the bro-database is obtained, options are 'hydropandas',
'brodata' or 'brodata_gm'. 'brodata_gm' is only available when extent is not
None. When engine='brodata_gm' use the dataset Grondwatermonitoring (GM) in
samenhang - karakteristieken, hosted by PDOK. This
up-to-date dataset combines well- and tube-properties. So users do not have to
download each individual Groundwater Monitoring Well (GMW), which speeds up the
request. The gm-dataset does not contain the attributes `tube_top` and
`ground_level`, so you need to use engine='brodata' or 'hydropandas' if you
need those. The Groundwater Level Dossiers (GLD) are still downloaded
individually. The default is 'hydropandas'.
Returns
-------
ObsCollection
ObsCollection DataFrame with the 'obs' column
"""
from .io.bro import get_obs_list_from_extent, get_obs_list_from_gmn
if bro_id is None and (extent is not None):
obs_list = get_obs_list_from_extent(
extent,
obs.GroundwaterObs,
tmin=tmin,
tmax=tmax,
only_metadata=only_metadata,
keep_all_obs=keep_all_obs,
epsg=epsg,
ignore_max_obs=ignore_max_obs,
engine=engine,
)
meta = {}
elif bro_id is not None:
obs_list, meta = get_obs_list_from_gmn(
bro_id,
obs.GroundwaterObs,
only_metadata=only_metadata,
keep_all_obs=keep_all_obs,
engine=engine,
)
name = meta.pop("name")
else:
raise ValueError("specify bro_id or extent")
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
[docs] @classmethod
def from_lizard(
cls,
extent=None,
codes=None,
name="",
tube_nr="all",
tmin=None,
tmax=None,
type_timeseries=None, # deprecated argument
which_timeseries=("hand", "diver"), # new preferred argument
datafilters=None,
combine_method="merge",
only_metadata=False,
organisation="vitens",
auth=None,
):
"""Get all observations within a specified extent.
Parameters
----------
extent : list, shapefile path or None
get groundwater monitoring wells wihtin this extent [xmin, xmax, ymin, ymax]
or within a predefined Polygon from a shapefile
codes : lst of str or None
codes of the monitoring wells
tube_nr : lst of str
list of tube numbers of the monitoring wells that should be selected.
By default 'all' available tubes are selected.
tmin : str YYYY-m-d, optional
start of the observations, by default the entire serie is returned
tmax : Ttr YYYY-m-d, optional
end of the observations, by default the entire serie is returned
type_timeseries : str, optional (deprecated)
Old keyword, use which_timeseries instead.
which_timeseries : tuple of str, optional
Which timeseries to retrieve. Options: "hand", "diver", "diver_validated".
Defaults to ("hand", "diver") (which should be correct for Vitens).
datafilters : list of strings, optional
Methods to filter the timeseries data.
If None (default), all measurements will be shown.
Currently implemented datafilter methods:
"remove_unvalidated_diver_values_when_validated_available": Removes diver values before last date with validated diver.
"remove_hand_during_diver_period": Removes hand measurements during periods where diver or diver_validated measurements are available.
combine_method : str, optional
"merge" (vertical stack with 'origin' column) or "columns" (side-by-side columns).
If None, defaults to "merge".
only_metadata : bool, optional
if True only metadata is returned and no time series data. The
default is False.
organisation : str, optional
organisation of the data. The default is "vitens".
auth : tuple, optional
authentication credentials for the API request, e.g.: ("__key__", your_api_key)
Returns
-------
ObsCollection
ObsCollection DataFrame with the 'obs' column
"""
from .io.lizard import get_obs_list_from_codes, get_obs_list_from_extent
if extent is not None:
obs_list = get_obs_list_from_extent(
extent,
obs.GroundwaterObs,
tube_nr,
tmin,
tmax,
type_timeseries=type_timeseries,
which_timeseries=which_timeseries,
datafilters=datafilters,
combine_method=combine_method,
only_metadata=only_metadata,
organisation=organisation,
auth=auth,
)
elif codes is not None:
obs_list = get_obs_list_from_codes(
codes,
obs.GroundwaterObs,
tube_nr,
tmin,
tmax,
type_timeseries=type_timeseries,
which_timeseries=which_timeseries,
datafilters=datafilters,
combine_method=combine_method,
only_metadata=only_metadata,
organisation=organisation,
auth=auth,
)
else:
raise ValueError("specify codes or extent")
return cls(obs_list, name=name)
[docs] @classmethod
def from_bronhouderportaal_bro(
cls,
dirname,
full_meta=False,
):
"""Get all the metadata from dirname.
Parameters
----------
dirname : str
name of dirname that holds XML files
full_meta : bool , optional
process all metadata. The default is False.
Returns
-------
ObsCollection
ObsCollection DataFrame without the 'obs' column
"""
from .io.bronhouderportaal_bro import get_obs_list_from_dir
obs_list = get_obs_list_from_dir(
dirname,
obs.GroundwaterObs,
full_meta=full_meta,
)
return cls(obs_list)
[docs] @classmethod
def from_dataframe(cls, df, obs_list=None, ObsClass=obs.GroundwaterObs):
"""Create an observation collection from a DataFrame and optionally a list of
observations. If no list of observations is given empty observations are added
of the type specified by ObsClass.
Parameters
----------
df : pandas DataFrame
input dataframe. If this dataframe has a column named 'obs' the
column is replaced with new observation objects.
obs_list : list of observation.Obs, optional
list of observations. Default is None
ObsClass : class, optional
observation class used to create empty obs object, by
default GroundwaterObs
Returns
-------
ObsCollection
ObsCollection DataFrame with the 'obs' column
"""
if isinstance(df, pd.DataFrame):
if obs_list is None:
obs_list = [ObsClass() for i in range(len(df))]
df["obs"] = obs_list
else:
raise TypeError(f"df should be type pandas.DataFrame not {type(df)}")
return cls(df)
[docs] @classmethod
def from_dict(cls, d, **kwargs):
"""Create an observation collection from a dictionary. The dictionary should
have the same format as dictionaries created with the `to_dict` method of an
ObsCollection.
Parameters
----------
d : dict
dictionary with data
Returns
-------
ObsCollection
"""
d = d.copy()
assert "obstype" in d, (
"dictionary should contain an 'obstype' key with the observation type"
)
obstype = d.pop("obstype")
assert cls.__name__ == obstype, (
f"{obstype=} not an observation type supported by hydropandas"
)
df = d.pop("df", None)
df = pd.DataFrame(df, **kwargs)
obs_list = [
getattr(obs, od["obstype"]).from_dict(od) for od in d.pop("obs_list")
]
return cls(df, obs_list=obs_list, **d)
[docs] @classmethod
def from_excel(cls, path, meta_sheet_name="metadata"):
"""Create an observation collection from an excel file. The excel file should
have the same format as excel files created with the `to_excel` method of an
ObsCollection.
Parameters
----------
path : str
full file path (including extension) of the excel file.
meta_sheet_name : str, optional
sheetname with metadata. The default is "metadata".
Returns
-------
ObsCollection
Notes
-----
if you write an excel file using the 'to_excel' method and read an excel
with the 'read_excel' method you lose this information:
- The 'name' and 'meta' attributes of the ObsCollection
- metadata of each Observation stored in the 'meta' attribute
If you don't want to lose this data consider using the `to_json` and
`read_json` function.
"""
df = pd.read_excel(path, meta_sheet_name, index_col=0)
for oname, row in df.iterrows():
measurements = pd.read_excel(path, oname, index_col=0)
all_metadata = row.to_dict()
obsclass = getattr(obs, all_metadata.pop("obs"))
# get observation specific metadata
metadata = {
k: v for (k, v) in all_metadata.items() if k in obsclass._metadata
}
metadata["name"] = oname
extra_meta = {
k: v for (k, v) in all_metadata.items() if k not in obsclass._metadata
}
o = obsclass(measurements, meta=extra_meta, **metadata)
df.at[oname, "obs"] = o
return cls(df)
[docs] @classmethod
def from_dino(
cls,
dirname=None,
ObsClass=obs.GroundwaterObs,
subdir="DINO_Grondwaterstanden",
suffix=None,
keep_all_obs=True,
name=None,
**kwargs,
):
"""Read dino data within an extent from the server or from a directory with
downloaded files.
Parameters
----------
dirname : str, optional
directory name, can be a .zip file or the parent directory
of subdir
ObsClass : type
class of the observations, so far only GroundwaterObs is supported
subdir : str
subdirectory of dirname with data files. For old school dino zip files this
should be "Grondwaterstanden_Put". For new style the default value
DINO_Grondwaterstanden is sufficient. The default is DINO_Grondwaterstanden.
suffix : str or None, optional
suffix of files in subdir that will be read. For old school dino zip files
this should be '1.csv'. For new style the default value None is sufficient.
The default is None.
keep_all_obs : boolean, optional
add all observation points to the collection, even the points
without measurements or metadata
name : str, optional
the name of the observation collection
kwargs:
kwargs are passed to the hydropandas.io.dino.read_dino_dir() function
Returns
-------
cls(obs_df) : ObsCollection
collection of multiple point observations
"""
from .io.dino import read_dino_dir
# read dino directory
if name is None:
name = subdir
meta = {
"dirname": dirname,
"type": ObsClass,
"suffix": suffix,
"keep_all_obs": keep_all_obs,
}
obs_list = read_dino_dir(
dirname,
ObsClass,
subdir,
suffix,
keep_all_obs,
**kwargs,
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
[docs] @classmethod
def from_artdino_dir(
cls,
dirname=None,
ObsClass=obs.GroundwaterObs,
subdir="csv",
suffix=".csv",
unpackdir=None,
force_unpack=False,
preserve_datetime=False,
keep_all_obs=True,
name=None,
**kwargs,
):
"""Read a dino directory.
Parameters
----------
extent : list, optional
get dinodata online within this extent [xmin, xmax, ymin, ymax]
dirname : str, optional
directory name, can be a .zip file or the parent directory of subdir
ObsClass : type
class of the observations, e.g. GroundwaterObs or WaterlvlObs
subdir : str
subdirectory of dirname with data files
suffix : str
suffix of files in subdir that will be read
unpackdir : str
destination directory of the unzipped file
force_unpack : boolean, optional
force unpack if dst already exists
preserve_datetime : boolean, optional
use date of the zipfile for the destination file
keep_all_obs : boolean, optional
add all observation points to the collection, even without data or
metadata
name : str, optional
the name of the observation collection
kwargs:
kwargs are passed to the hydropandas.io.dino.read_dino_dir() function
Returns
-------
cls(obs_df) : ObsCollection
collection of multiple point observations
"""
from .io.dino import read_artdino_dir
if name is None:
name = subdir
meta = {
"dirname": dirname,
"type": ObsClass,
"suffix": suffix,
"unpackdir": unpackdir,
"force_unpack": force_unpack,
"preserve_datetime": preserve_datetime,
"keep_all_obs": keep_all_obs,
}
obs_list = read_artdino_dir(
dirname,
ObsClass,
subdir,
suffix,
unpackdir,
force_unpack,
preserve_datetime,
keep_all_obs,
**kwargs,
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
[docs] @classmethod
def from_fews_xml(
cls,
file_or_dir=None,
xmlstring=None,
ObsClass=obs.GroundwaterObs,
name="fews",
translate_dic=None,
filterdict=None,
locations=None,
remove_nan=True,
low_memory=True,
unpackdir=None,
force_unpack=False,
preserve_datetime=False,
**kwargs,
):
"""Read one or several FEWS PI-XML files.
Parameters
----------
file_or_dir : str
zip, xml or directory with zips or xml files to read
xmlstring : str or None
string with xml data, only used if file_or_dir is None. Default is
None
ObsClass : type or dict
class of the observations, e.g. GroundwaterObs or WaterlvlObs or a
dictionary with a class for each locationId
name : str, optional
name of the observation collection, 'fews' by default
translate_dic : dic or None, optional
translate names from fews. If None this default dictionary is used:
{'locationId': 'locatie'}.
filterdict : dict, optional
dictionary with tag name to apply filter to as keys, and list of
accepted names as dictionary values to keep in final result,
i.e. {"locationId": ["B001", "B002"]}
locations : list of str, optional
list of locationId's to read from XML file, others are skipped.
If None (default) all locations are read. Only supported by
low_memory=True method!
low_memory : bool, optional
whether to use xml-parsing method with lower memory footprint,
default is True
remove_nan : boolean, optional
remove nan values from measurements, flag information about the
nan values is also lost, only used if low_memory=False
unpackdir : str
destination directory to unzip file if path is a .zip
force_unpack : boolean, optional
force unpack if dst already exists
preserve_datetime : boolean, optional
whether to preserve datetime from zip archive
Returns
-------
cls(obs_df) : ObsCollection
collection of multiple point observations
"""
from .io.fews import read_xml_filelist, read_xmlstring
if translate_dic is None:
translate_dic = {"locationId": "location"}
meta = {}
if file_or_dir is not None:
# get files
dirname, unzip_fnames = util.get_files(
file_or_dir,
ext=".xml",
unpackdir=unpackdir,
force_unpack=force_unpack,
preserve_datetime=preserve_datetime,
)
meta.update({"filename": dirname})
obs_list = read_xml_filelist(
unzip_fnames,
ObsClass,
directory=dirname,
translate_dic=translate_dic,
filterdict=filterdict,
locations=locations,
remove_nan=remove_nan,
low_memory=low_memory,
**kwargs,
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
elif (file_or_dir is None) and (xmlstring is not None):
obs_list = read_xmlstring(
xmlstring,
ObsClass,
translate_dic=translate_dic,
filterdict=filterdict,
locationIds=locations,
low_memory=low_memory,
remove_nan=remove_nan,
**kwargs,
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
else:
raise ValueError("either specify variables file_or_dir or xmlstring")
[docs] @classmethod
def from_imod(
cls,
obs_collection,
ml,
runfile,
mtime,
model_ws,
modelname="",
nlay=None,
exclude_layers=0,
):
"""Read imod model results at point locations.
Parameters
----------
obs_collection : ObsCollection
collection of observations at which points imod results will be read
ml : flopy.modflow.mf.model
modflow model
runfile : Runfile
imod runfile object
mtime : list of datetimes
datetimes corresponding to the model periods
model_ws : str
model workspace with imod model
nlay : int, optional
number of layers if None the number of layers from ml is used.
modelname : str
modelname
exclude_layers : int
exclude modellayers from being read from imod
"""
from .io.modflow import read_imod_results
mo_list = read_imod_results(
obs_collection,
ml,
runfile,
mtime,
model_ws,
modelname=modelname,
nlay=nlay,
exclude_layers=exclude_layers,
)
obs_df = util._obslist_to_frame(mo_list)
return cls(obs_df, name=modelname)
[docs] @classmethod
def from_json(cls, path, **kwargs):
"""Create an observation collection from a json file. The json file should
have the same format as json files created with the `to_json` method of an
ObsCollection.
Parameters
----------
path : str
full file path (including extension) of the json file.
Returns
-------
ObsCollection
"""
if isinstance(path, (TextIOWrapper, StringIO)):
fo = path
closing = False
elif isinstance(path, (str, os.PathLike)):
fo = open(path, "r")
closing = True
else:
raise TypeError("path should be a string or a file-like object")
d = json.load(fo)
if closing:
fo.close()
assert "obstype" in d, (
"json file should contain an 'obstype' key with the observation type"
)
obstype = d.pop("obstype")
assert cls.__name__ == obstype, (
f"{obstype=} not an observation type supported by hydropandas"
)
df = pd.read_json(StringIO(d.pop("df")))
obs_list = []
for od in d.pop("obs_list"):
obstype = json.load(StringIO(od))["obstype"]
obs_list.append(getattr(obs, obstype).from_json(StringIO(od)))
return cls(df, obs_list=obs_list, **d)
[docs] @classmethod
def from_knmi(
cls,
locations=None,
stns=None,
xy=None,
meteo_vars=("RH",),
name="",
starts=None,
ends=None,
ObsClasses=None,
fill_missing_obs=False,
interval="daily",
use_api=True,
raise_exceptions=True,
progress_callback=None,
fill_missing_obs_with_factor=False,
):
"""Get knmi observations from a list of locations or a list of stations.
Parameters
----------
locations : pandas DataFrame or None
dataframe with columns 'x' and 'y' as coordinates. The
default is None
stns : list of str or None
list of knmi stations. The default is None
xy : list or numpy array, optional
xy coordinates of the locations. e.g. [[10,25], [5,25]]
meteo_vars : list or tuple of str
meteo variables e.g. ["RH", "EV24"]. The default is ("RH").
See list of all possible variables in the hpd.read_knmi docstring.
name : str, optional
name of the obscollection. The default is ''
starts : None, str, datetime or list, optional
start date of observations per meteo variable. The start date is
included in the time series.
If start is None the start date will be January 1st of the
previous year.
If start is str it will be converted to datetime.
If start is a list it should be the same length as meteo_vars and
the start time for each variable. The default is None
ends : list of str, datetime or None
end date of observations per meteo variable. The end date is
included in the time series.
If end is None the start date will be January 1st of the
previous year.
If end is a str it will be converted to datetime.
If end is a list it should be the same length as meteo_vars and
the end time for each meteo variable. The default is None
ObsClasses : list of type or None
class of the observations, can be PrecipitationObs, EvaporationObs
or MeteoObs. If None the type of observations is derived from the
meteo_vars.
fill_missing_obs : bool, optional
if True nan values in time series are filled with nearby time series.
The default is False.
interval : str, optional
desired time interval for observations. Options are 'daily' and
'hourly'. The default is 'daily'.
use_api : bool, optional
if True the api is used to obtain the data, API documentation is here:
https://www.knmi.nl/kennis-en-datacentrum/achtergrond/data-ophalen-vanuit-een-script
if False a text file is downloaded into a temporary folder and the
data is read from there. Default is True since the api is back
online (July 2021).
raise_exceptions : bool, optional
if True you get errors when no data is returned. The default is False.
progress_callback : callable or None, optional
callback function called with (i, total) for each station processed.
The default is None.
fill_missing_obs_with_factor : bool, optional
if True, donor-station values are scaled with an overlap-based factor
before filling missing values. This automatically enables
fill_missing_obs.
The default is False.
**kwargs :
kwargs are passed to the `hydropandas.io.knmi.get_knmi_obslist` function
"""
from .io.knmi import get_knmi_obslist
# obtain ObsClass
if ObsClasses is None:
ObsClasses = []
for meteo_var in meteo_vars:
if meteo_var in ("RH", "RD"):
ObsClasses.append(obs.PrecipitationObs)
elif meteo_var == "EV24":
ObsClasses.append(obs.EvaporationObs)
else:
ObsClasses.append(obs.MeteoObs)
elif isinstance(ObsClasses, type):
if issubclass(
ObsClasses, (obs.PrecipitationObs, obs.EvaporationObs, obs.MeteoObs)
):
ObsClasses = [ObsClasses] * len(meteo_vars)
else:
raise TypeError(
"must be None, PrecipitationObs, EvaporationObs, MeteoObs, "
"list or tuple"
)
elif isinstance(ObsClasses, (list, tuple)):
pass
else:
raise TypeError(
"must be None, PrecipitationObs, EvaporationObs, MeteoObs, "
"list or tuple"
)
meta = {}
meta["starts"] = starts
meta["ends"] = ends
meta["name"] = name
meta["ObsClasses"] = ObsClasses
meta["meteo_vars"] = meteo_vars
obs_list = get_knmi_obslist(
locations,
stns,
xy,
meteo_vars,
starts=starts,
ends=ends,
ObsClasses=ObsClasses,
fill_missing_obs=fill_missing_obs,
interval=interval,
use_api=use_api,
raise_exceptions=raise_exceptions,
progress_callback=progress_callback,
fill_missing_obs_with_factor=fill_missing_obs_with_factor,
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
[docs] @classmethod
def from_knmi_scenarios(
cls,
stn: int | str,
years: Iterable[Literal["2033", "2050", "2100", "2150"]] = (
"2033",
"2050",
"2100",
"2150",
),
scenarios: Iterable[Literal["Ld", "Ln", "Md", "Mn", "Hd", "Hn"]] = (
"Ld",
"Ln",
"Md",
"Mn",
"Hd",
"Hn",
),
evap: Literal["EV24", "makkink", "penman", "hargreaves"] = "EV24",
meteo_vars: Iterable[Literal["TG", "RD", "Q", "TX", "TN", "UG", "FG", "EV24"]]
| None = None,
name: str = "",
):
"""Create ObsCollection from KNMI climate scenario data.
The ``stn`` argument may be provided as an integer or a string. The data
are downloaded once and converted into individual observations. By
default every variable present in the returned dataset is turned into an
Obs; a user can restrict the output by specifying ``meteo_vars``.
Parameters
----------
stn : int or str
Station number (e.g., 550 or "550").
years : tuple, optional
Years of climate scenario. The default is ('2033','2050','2100','2150').
scenarios : tuple, optional
Names of climate scenario. The default is ('Ld','Ln','Md','Mn','Hd','Hn').
This includes all scenarios including the original measurements.
evap : str, optional
Method for calculating evaporation. Options are 'EV24', 'makkink', 'penman',
or 'hargreaves'. The default is 'EV24'.
meteo_vars : iterable of str or None, optional
Meteorological variables to include in the ObsCollection. Possible
variables include 'TG', 'RD', 'Q', 'TX', 'TN', 'UG', 'FG', and 'EV24'.
If None (default), all available variables are included.
name : str, optional
Name of the observation collection. The default is "".
Returns
-------
ObsCollection
Collection with climate scenario observations.
"""
from .io.knmi import get_knmi_scenarios_obs_list
from .observation import EvaporationObs, MeteoObs, PrecipitationObs
# Fetch and process climate scenario data
obs_list = get_knmi_scenarios_obs_list(
stn=stn,
years=years,
scenarios=scenarios,
evap=evap,
meteo_vars=meteo_vars,
ObsClass={
"RD": PrecipitationObs,
"TG": MeteoObs,
"Q": MeteoObs,
"TX": MeteoObs,
"TN": MeteoObs,
"UG": MeteoObs,
"FG": MeteoObs,
"EV24": EvaporationObs,
},
)
# Create and return observation collection
meta = {
"stn": str(stn),
"years": years,
"scenarios": scenarios,
"evaporation_method": evap,
}
if meteo_vars is not None:
meta["meteo_vars"] = meteo_vars
return cls(obs_list, name=name, meta=meta)
[docs] @classmethod
def from_list(cls, obs_list, name=""):
"""Read observations from a list of obs objects.
Parameters
----------
obs_list : list of observation.Obs
list of observations
name : str, optional
name of the observation collection
"""
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name)
[docs] @classmethod
def from_matroos(
cls,
extent=None,
name="",
ObsClass=obs.WaterlvlObs,
locations=None,
units=None,
sources=None,
tmin=None,
tmax=None,
only_metadata=False,
keep_all_obs=False,
**kwargs,
):
"""Read measurement using the Matroos API within an extent.
Parameters
----------
extent : list, tuple, numpy-array or None, optional
get measurements within this extent
[xmin, xmax, ymin, ymax]
name : str, optional
name of the collection, by default ""
ObsClass : type
class of the observations, e.g. WaterlvlObs
locations : list, tuple or None, optional
locations to select, if None all locations are selected, by default None
units : list, tuple or None, optional
units to select, if None all units are selected, by default None
sources : list, tuple or None, optional
sources to select, if None all sources are selected, by default None
tmin : pd.Timestamp, str or None, optional
start time of observations. The default is None.
tmax : pd.Timestamp, str or None, optional
end time of observations. The default is None.
only_metadata : bool, optional
if True download only metadata, significantly faster. The default
is False.
keep_all_obs : bool, optional
if False, only observations with measurements are kept. The default
is True.
**kwargs
additional keyword arguments are passed to the ObsClass.from_matroos()
method
Returns
-------
ObsCollection
ObsCollection containing data
"""
from .io import matroos
meta = {"name": name, "type": ObsClass}
obs_list = matroos.get_obs_list_from_extent(
ObsClass,
extent,
locations=locations,
units=units,
sources=sources,
tmin=tmin,
tmax=tmax,
only_metadata=only_metadata,
keep_all_obs=keep_all_obs,
**kwargs,
)
return cls(obs_list, name=name, meta=meta)
[docs] @classmethod
def from_menyanthes(
cls, path, name="", ObsClass=obs.Obs, load_oseries=True, load_stresses=True
):
from .io.menyanthes import read_file
menyanthes_meta = {"path": path, "type": ObsClass}
obs_list = read_file(
path, ObsClass, load_oseries=load_oseries, load_stresses=load_stresses
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, meta=menyanthes_meta, name=name)
[docs] @classmethod
def from_modflow(
cls,
obs_collection,
ml,
hds_arr,
mtime,
modelname="",
nlay=None,
exclude_layers=None,
method="linear",
):
"""Read modflow groundwater heads at points in obs_collection.
Parameters
----------
obs_collection : ObsCollection
locations of model observation
ml : flopy.modflow.mf.model
modflow model
hds_arr : numpy array
heads with shape (ntimesteps, nlayers, nrow, ncol)
mtime : list of datetimes
dates for each model timestep
modelname : str, optional
modelname
nlay : int, optional
number of layers if None the number of layers from ml is used.
exclude_layers : list of int, optional
exclude the observations in these model layers
method : str, optional
interpolation method, either 'linear' or 'nearest',
default is linear
"""
from .io.modflow import read_modflow_results
mo_list = read_modflow_results(
obs_collection,
ml,
hds_arr,
mtime,
modelname=modelname,
nlay=nlay,
method=method,
exclude_layers=exclude_layers,
)
obs_df = util._obslist_to_frame(mo_list)
return cls(obs_df)
[docs] @classmethod
def from_waterconnect(
cls,
extent=None,
name="",
ObsClass=obs.GroundwaterObs,
tmin=None,
tmax=None,
only_metadata=False,
keep_all_obs=False,
location_gdf=None,
update=False,
**kwargs,
):
"""Read waterconnect measurement within an extent or from a file or directory.
Parameters
----------
extent : list, tuple, numpy-array or None, optional
get water connect measurements within this extent
[xmin, xmax, ymin, ymax]
name : str, optional
name of the collection, by default ""
ObsClass : type
class of the observations, e.g. GroundwaterObs
tmin : str or None, optional
start time of observations. The default is None.
tmax : str or None, optional
end time of observations. The default is None.
only_metadata : bool, optional
if True download only metadata, significantly faster. The default
is False.
keep_all_obs : bool, optional
if False, only observations with measurements are kept. The default
is True.
location_gdf : GeoDataFrame, optional
geodataframe with the locations of the water drill holes you want to include.
update : bool, optional
if True new locations are downloaded and stored locally (slow) otherwise a
cached version of the locations is used. By default False
**kwargs
additional keyword arguments are passed to the ObsClass.from_waterconnect()
method
Returns
-------
ObsCollection
ObsCollection containing data
"""
from .io import water_connect
meta = {"name": name, "type": ObsClass}
if (extent is not None) or (location_gdf is not None):
obs_list = water_connect.get_obs_list_from_extent(
extent,
ObsClass,
tmin=tmin,
tmax=tmax,
only_metadata=only_metadata,
keep_all_obs=keep_all_obs,
location_gdf=location_gdf,
update=update,
**kwargs,
)
else:
raise ValueError("specify extent for water connect data")
return cls(obs_list, name=name, meta=meta)
[docs] @classmethod
def from_waterinfo(
cls,
file_or_dir=None,
extent=None,
name="",
ObsClass=obs.WaterlvlObs,
locatie=None,
grootheid_code=None,
groepering_code=None,
parameter_code=None,
proces_type=None,
tmin=None,
tmax=None,
only_metadata=False,
keep_all_obs=False,
epsg=28992,
progressbar=True,
location_gdf=None,
**kwargs,
):
"""Read waterinfo measurement within an extent or from a file or directory.
Parameters
----------
file_or_dir : str
path to file or directory. Files can be .csv or .zip
extent : list, tuple, numpy-array or None, optional
get waterinfo measurements within this extent
[xmin, xmax, ymin, ymax]
name : str, optional
name of the collection, by default ""
ObsClass : Obs, optional
type of Obs to read data as, by default WaterlvlObs
locatie : str or list of str, optional
select only measurement with this location(s), e.g. 'schoonhoven', default is None
grootheid_code : str or list of str, optional
select only measurement with this grootheid_code, e.g. 'WATHTE', default is None
groepering_code : str or list of str, optional
select only measurement with this groepering_code, e.g. 'GETETBRKD2', default is None
parameter_code : str or list of str, optional
select only measurement with this parameter_code, e.g. 'Cl', default is None
proces_type : str or list of str, optional
select only measurement with this proces_type, e.g. 'meting', default is None
tmin : pd.Timestamp, str or None, optional
start time of observations. The default is None.
tmax : pd.Timestamp, str or None, optional
end time of observations. The default is None.
only_metadata : bool, optional
if True download only metadata, significantly faster. The default
is False.
keep_all_obs : bool, optional
if False, only observations with measurements are kept. The default
is True.
epsg : int, optional
epsg code of the extent. The default is 28992 (RD).
progressbar : bool, optional
show progressbar, by default True
location_gdf : GeoDataFrame, optional
geodataframe with the locations of the measurements you want to include. If
location_gdf is provided the provided extent and epgs will be ignored.
Returns
-------
ObsCollection
ObsCollection containing data
"""
from .io import waterinfo
meta = {"name": name, "type": ObsClass}
if (extent is not None) or (location_gdf is not None):
obs_list = waterinfo.get_obs_list_from_extent(
extent,
ObsClass,
locatie=locatie,
grootheid_code=grootheid_code,
groepering_code=groepering_code,
parameter_code=parameter_code,
proces_type=proces_type,
tmin=tmin,
tmax=tmax,
only_metadata=only_metadata,
keep_all_obs=keep_all_obs,
epsg=epsg,
location_gdf=location_gdf,
)
elif file_or_dir is not None:
obs_list = waterinfo.read_waterinfo_obs(
file_or_dir, ObsClass, progressbar=progressbar, **kwargs
)
meta["filename"] = file_or_dir
else:
raise ValueError("specify extent or file_or_dir")
return cls(obs_list, name=name, meta=meta)
[docs] @classmethod
def from_wiski(
cls,
dirname,
ObsClass=obs.GroundwaterObs,
suffix=".csv",
unpackdir=None,
force_unpack=False,
preserve_datetime=False,
keep_all_obs=True,
**kwargs,
):
from .io.wiski import read_wiski_dir
meta = {
"dirname": dirname,
"type": ObsClass,
"suffix": suffix,
"unpackdir": unpackdir,
"force_unpack": force_unpack,
"preserver_datetime": preserve_datetime,
"keep_all_obs": keep_all_obs,
}
name = "wiski_import"
obs_list = read_wiski_dir(
dirname,
ObsClass=ObsClass,
suffix=suffix,
unpackdir=unpackdir,
force_unpack=force_unpack,
preserve_datetime=preserve_datetime,
keep_all_obs=keep_all_obs,
**kwargs,
)
obs_df = util._obslist_to_frame(obs_list)
return cls(obs_df, name=name, meta=meta)
[docs] @classmethod
def from_pastastore(
cls, pstore, libname, ObsClass=obs.GroundwaterObs, metadata_mapping=None
):
"""Read pastastore library.
Parameters
----------
pstore : pastastore.PastaStore
PastaStore object
libname : str
name of library (e.g. oseries or stresses)
ObsClass : Obs, optional
type of Obs to read data as, by default GroundwaterObs
metadata_mapping : dict, optional
dictionary containing map between metadata field names in pastastore and
metadata field names expected by hydropandas, by default None.
Returns
-------
ObsCollection
ObsCollection containing data
"""
from .io import pastas
obs_list = pastas.read_pastastore_library(
pstore, libname, ObsClass=ObsClass, metadata_mapping=metadata_mapping
)
obs_df = util._obslist_to_frame(obs_list)
meta = {
"name": pstore.name,
"conntype": pstore.conn.conn_type,
"library": libname,
}
return cls(obs_df, name=pstore.name, meta=meta)
[docs] def get_obs(self, name=None, **kwargs):
"""get an observation object from a collection
Parameters
----------
name : str or None, optional
name of the observation you want to select, by default None
**kwargs : any metadata, value pair e.g. for a collection of GroundwaterObs:
tube_nr = 1 or source = 'BRO'
Returns
-------
hpd.Obs
Observation object from the collection.
Raises
------
ValueError
If multiple observations in the collection match the given attribute values.
ValueError
If no observation in the collection match the given attribute values.
"""
# select by name
if name is None:
selected_obs = self
else:
selected_obs = self.loc[[name]]
# select by condition
for key, item in kwargs.items():
condition = selected_obs[key] == item
selected_obs = selected_obs.loc[condition]
# return an Obs objet
if len(selected_obs) == 1:
return selected_obs["obs"].values[0]
elif len(selected_obs) == 0:
raise ValueError("no observations for given conditions")
else:
raise ValueError(
f"multiple observations for given conditions {selected_obs.index}"
)
[docs] def to_csv(self, path, check_consistency=True, **kwargs):
"""Write all observations in the ObsCollection to csv files.
Parameters
----------
path : str
directory to which the csv files will be written.
check_consistency : bool, optional
If True the consistency of the collection is checked. If set to False the csv file(s) may be unreadable by hydropandas. The
default is True.
**kwargs : keyword arguments
kwargs are passed to the to_csv method of each observation.
Notes
-----
if you write a csv file using the 'to_csv' method and read a csv
with the 'read_csv' method you lose this information:
- The 'name' and 'meta' attributes of the ObsCollection
- metadata of each Observation stored in the 'meta' attribute
- integer dtypes may become floats
If you don't want to lose this data consider using the `to_json` and
`read_json` function.
If you want to write the metadata to a single csv file consider using:
'pd.DataFrame(oc).to_csv()'.
"""
if check_consistency and not self._is_consistent():
raise RuntimeError("inconsistent observation collection")
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
logger.info(f"writing {len(self)} observations to {path}")
for o in self.obs:
o.to_csv(path / f"{o.name}.csv", **kwargs)
[docs] def to_dict(self):
"""Convert ObsCollection to dictionary.
Returns
-------
dict
dictionary with metadata and observations
"""
d = {k: getattr(self, k) for k in self._metadata}
d["df"] = super().drop(columns="obs").to_dict()
d["obstype"] = self.__class__.__name__
d["obs_list"] = [o.to_dict() for o in self.obs]
return d
[docs] def to_excel(self, path, meta_sheet_name="metadata", check_consistency=True):
"""Write an ObsCollection to an excel, the first sheet in the excel contains the
metadata, the other tabs are the timeseries of each observation.
The excel can be read using the read_excel function of hydropandas.
Parameters
----------
path : str
full path of xlsx file.
meta_sheet_name : str, optional
sheetname with metadata. The default is "metadata".
check_consistency : bool, optional
If True the consistency of the collection is checked. If set to False the excel file may be unreadable by hydropandas. The
default is True.
Raises
------
RuntimeError
If the ObsCollection is inconsistent.
Returns
-------
None.
Notes
-----
The following data is NOT written to the excel file:
- The 'name' and 'meta' attributes of the ObsCollection
- metadata of each Observation stored in the 'meta' dictionary
If you don't want this consider using the `to_json` method.
"""
if check_consistency and not self._is_consistent():
raise RuntimeError("inconsistent observation collection")
oc = self.copy(deep=True)
with pd.ExcelWriter(path) as writer:
# replace obs column by observation type
obseries = oc.pop("obs")
oc["obs"] = [type(o).__name__ for o in obseries]
# write ObsCollection dataframe to first sheet
super(ObsCollection, oc).to_excel(writer, sheet_name=meta_sheet_name)
# write each observation time series to next sheets
for o in obseries:
sheetname = o.name
for ch in ["[", "]", ":", "*", "?", "/", "\\"]:
sheetname = sheetname.replace(ch, "_")
o.to_excel(writer, sheet_name=sheetname)
[docs] def to_json(self, path=None, cls=HydropandasEncoder, **kwargs):
"""Write ObsCollection to a JSON file.
Parameters
----------
path_or_buf : str, path object, file-like object, or None, default None
String, path object (implementing os.PathLike[str]), or file-like
object implementing a write() function. If None, the result is
returned as a string.
**kwargs
Additional keyword arguments passed to json.dump or json.dumps.
Returns
-------
None
"""
d = {k: getattr(self, k) for k in self._metadata}
d["obstype"] = type(self).__name__
if self.empty:
d["df"] = super().to_json(date_format="iso")
d["obs_list"] = []
else:
d["df"] = super().drop(columns="obs").to_json(date_format="iso")
d["obs_list"] = [o.to_json() for o in self.obs]
if path is None:
return json.dumps(d, cls=cls, **kwargs)
else:
with open(path, "w") as fo:
json.dump(d, fo, cls=cls, **kwargs)
[docs] def to_pi_xml(self, fname, timezone="", version="1.24"):
from .io import fews
fews.write_pi_xml(self, fname, timezone=timezone, version=version)
[docs] def to_gdf(self, xcol="x", ycol="y", crs=28992, drop_obs=True):
"""Convert ObsCollection to GeoDataFrame.
Parameters
----------
xcol : str
column name with x values
ycol : str
column name with y values
crs : int, optional
coordinate reference system, by default 28992 (RD new).
drop_obs : bool, optional
drop the column with observations. Useful for basic geodataframe
manipulations that require JSON serializable columns. The default
is True.
Returns
-------
gdf : geopandas.GeoDataFrame
"""
gdf = util.df2gdf(self, xcol=xcol, ycol=ycol, crs=crs)
if drop_obs:
return gdf.drop(columns="obs")
else:
return gdf
[docs] def to_pastastore(
self,
pstore=None,
pstore_name="",
col=None,
kind="oseries",
add_metadata=True,
conn=None,
overwrite=False,
):
"""Add observations to a new or existing pastastore.
Parameters
----------
pstore : pastastore.PastaStore, optional
Existing pastastore, if None a new pastastore is created
pstore_name : str, optional
Name of the pastastore only used if pstore is None
col : str, optional
Name of the column in the Obs dataframe to be used. If None the
first numeric column in the Obs Dataframe is used.
kind : str, optional
The kind of series that is added to the pastastore. Use 'oseries'
for observations and anything else for stresses.
add_metadata : boolean, optional
If True metadata from the observations added to the pastastore
conn : pastastore.connectors or None, optional
type of connector, if None the DictConnector is used. Default is
None.
overwrite : boolean, optional
if True, overwrite existing series in pastastore, default is False
Returns
-------
pstore : pastastore.PastaStore
the pastastore with the series from the ObsCollection
"""
from .io.pastas import create_pastastore
pstore = create_pastastore(
self,
pstore,
pstore_name,
add_metadata=add_metadata,
kind=kind,
col=col,
conn=conn,
overwrite=overwrite,
)
return pstore
[docs] def to_shapefile(self, path, xcol="x", ycol="y"):
"""Save ObsCollection as shapefile.
Parameters
----------
path : str
filename of shapefile (.shp) or geopackage (.gpkg). A geopackage
has the advantage that column names will not be truncated.
xcol : str
column name with x values
ycol : str
column name with y values
"""
from geopandas.array import GeometryDtype
gdf = util.df2gdf(self, xcol, ycol)
# remove obs column
if "obs" in gdf.columns:
gdf.drop(columns="obs", inplace=True)
# change dtypes that are not accepted for shapefiles
for colname, coltype in gdf.dtypes.items():
# ommit geometry dtype
if isinstance(coltype, GeometryDtype):
pass
# cast boolean columns to int
elif pd.api.types.is_bool_dtype(coltype):
gdf[colname] = gdf[colname].astype(int)
# cast datetime columns to str
elif np.issubdtype(coltype, np.datetime64):
gdf[colname] = gdf[colname].astype(str)
gdf.to_file(path)
[docs] def get_series(self, tmin=None, tmax=None, col=None):
"""
Parameters
----------
tmin : datetime, optional
start time for series. The default is None.
tmax : datetime, optional
end time for series. The default is None.
col : str or None, optional
the column of the obs dataframe to get measurements from. The
first numeric column is used if col is None, by default None.
Returns
-------
series of Series
series of a series of observations within a time frame.
"""
if tmin is None:
tmin = self.stats.dates_first_obs.min()
if tmax is None:
tmax = self.stats.dates_last_obs.max()
def get_s(o, tmin=tmin, tmax=tmax, col=col):
if col is None:
col = o._get_first_numeric_col_name()
return o.loc[tmin:tmax, col]
return self.obs.apply(lambda o: o.loc[tmin:tmax, col])
[docs] def interpolate(
self,
xy: list[list[float]],
kernel: str = "thin_plate_spline",
kernel2: str = "linear",
epsilon: int | None = None,
col: str | None = None,
):
"""Interpolation method for ObsCollections using the Scipy radial basis function
(RBF)
Parameters
----------
xy : List[List[float]]
xy coordinates of locations of interest e.g. [[10,25], [5,25]]
kernel : str, optional
Type of radial basis funtion, by default thin_plate_spline.
Other options are linear, gaussian, inverse_quadratic,
multiquadric, inverse_multiquadric, cubic or quintic.
kernel2 : str, optional
Kernel in case there are not enough observations (3 or 6) for
time step, by default linear. Other options are gaussian,
inverse_quadratic, multiquadric, or inverse_multiquadric.
epsilon : float, optional
Shape parameter that scales the input to the RBF. If kernel is
linear, thin_plate_spline, cubic, or quintic, this defaults to 1.
Otherwise this must be specified.
col : str, optional
Name of the column in the Obs dataframe to be used. If None the
first numeric column in the Obs Dataframe is used.
Returns
-------
ObsCollection
"""
otypes = self._infer_otype()
if len(otypes) > 1:
raise TypeError(
"Please make sure that all Obs are of the same type. Currently"
f" found {', '.join([x.__name__ for x in otypes])}."
)
xy_oc = pd.concat([self.loc[:, "x"], self.loc[:, "y"]], axis=1)
obsdf = util.oc_to_df(self, col=col)
fill_df = util.interpolate(
xy, obsdf, xy_oc, kernel=kernel, kernel2=kernel2, epsilon=epsilon
)
# add all metadata that is equal for all observations
kwargs = {}
meta_att = set(otypes[0]._metadata) - {
"x",
"y",
"location",
"monitoring_well",
"name",
"source",
"meta",
}
for att in meta_att:
if (self.loc[:, att] == self.iloc[0].loc[att]).all():
kwargs[att] = self.iloc[0].loc[att]
obs_list = []
for i, coll in enumerate(fill_df.columns):
o = otypes[0](
fill_df.loc[:, [coll]].copy(),
x=xy[i][0],
y=xy[i][1],
name=coll,
source=f"interpolation {self.name}",
meta={"interpolation_kernel": kernel, "interpolation_epsilon": epsilon},
**kwargs,
)
obs_list.append(o)
return self.from_list(obs_list)