"""
Module with functions to read or download time series with observations from knmi.
function levels:
1. get_knmi_obs_list: list of observations
2. get_knmi_obs: single observation
3. get_timeseries_stn, get_timeseries_from_file: get time series from station or file
4. fill_missing_measurements, get_evaporation: aggregate time series data
5. download_knmi_data: download a single timeseries
6a. get_hourly_meteo_api, get_daily_meteo_api, get_daily_rainfall_api,
get_daily_meteo_url, get_daily_rainfall_url
7a. request_api, request_url
7b. parse_data
6b. interpret_knmi_file
For knmi climate scenarios:
1. get_knmi_scenarios_obs_list: get knmi climate scenarios for a station
2. get_knmi_scenarios_data: get knmi climate scenarios for a station
"""
import datetime as dt
import logging
import os
import warnings
from functools import lru_cache
from io import BytesIO, StringIO
from pathlib import Path
from typing import Any, Iterable, Literal
from zipfile import ZipFile
import numpy as np
import pandas as pd
import requests
logger = logging.getLogger(__name__)
URL_DAILY_PREC = "https://www.daggegevens.knmi.nl/klimatologie/monv/reeksen"
URL_DAILY_METEO = "https://www.daggegevens.knmi.nl/klimatologie/daggegevens"
URL_HOURLY_METEO = "https://www.daggegevens.knmi.nl/klimatologie/uurgegevens"
URL_STATIONS = "https://klimaatscenarios-data.knmi.nl/api/v1/stations"
URL_KNMI_TRANSFORMED_SERIES = (
"https://klimaatscenarios-data.knmi.nl/api/v1/climate-series-data.zip"
)
KNMI_CLIMATE_YEARS = Literal["2033", "2050", "2100", "2150"]
KNMI_CLIMATE_SCENARIOS = Literal["Ld", "Ln", "Md", "Mn", "Hd", "Hn"]
[docs]def get_knmi_obs(
stn: int | None = None,
fname: str | None = None,
xy: list[float] | tuple[float] | None = None,
meteo_var: str | None = None,
start: pd.Timestamp | str | None = None,
end: pd.Timestamp | str | None = None,
**kwargs,
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""get knmi observation from stn, fname or nearest xy coordinates.
Parameters
----------
stn : int, str or None, optional
measurement station e.g. 829. The default is None.
fname : str, path object, file-like object or None, optional
filename of a knmi file. The default is None.
xy : list, tuple or None, optional
RD coördinates of a location in the Netherlands. The station nearest
to this location used. The Default is None.
meteo_var : str or None, optional
meteo variable e.g. "RH" or "EV24". See list with all options in the
hydropandas documentation.
start : str, datetime or None, optional
start date of observations. The default is None.
end : str, datetime or None, optional
end date of observations. The default is None.
**kwargs:
fill_missing_obs : bool, optional
if True nan values in time series are filled with nearby time series.
The default is False. Note: if the given stn has no data between start and
end the data from nearby stations is used. In this case the metadata of the
Observation is the metadata from the nearest station that has any
measurement in the given period.
fill_missing_obs_with_factor : bool, optional
if True, donor-station values are scaled with an overlap-based factor
before filling missing values. This automatically enables
fill_missing_obs.
The default is False.
interval : str, optional
desired time interval for observations. Options are 'daily' and
'hourly'. The default is 'daily'.
use_api : bool, optional
if True the api is used to obtain the data, API documentation is here:
https://www.knmi.nl/kennis-en-datacentrum/achtergrond/data-ophalen-vanuit-een-script
if False a text file is downloaded into a temporary folder and the
data is read from there. Default is True since the api is back
online (July 2021).
raise_exceptions : bool, optional
if True you get errors when no data is returned. The default is False.
Returns
-------
pd.DataFrame, measurements
dict, metadata
Raises
------
ValueError
if no meteo_var is given or stn, fname and xy are all None.
"""
if meteo_var is None and fname is None:
raise ValueError("To get knmi data a meteo_var should be specified")
elif meteo_var is not None and not isinstance(meteo_var, str):
raise (TypeError(f"meteo var should be string not {type(meteo_var)}"))
settings = _get_default_settings(kwargs)
start = start if start is None else pd.to_datetime(start)
end = end if end is None else pd.to_datetime(end)
if (stn in (967,)) and (meteo_var == "RD") and settings["use_api"]:
msg = (
f"precipitation data not available for station {stn} via the API. "
"setting use_api to False, more info here: "
"https://github.com/ArtesiaWater/hydropandas/issues/103"
)
logger.warning(msg)
settings["use_api"] = False
if stn is not None:
stn = int(stn)
start_str = str(start).replace(" 00:00:00", "")
end_str = str(end).replace(" 00:00:00", "")
logger.info(
f"get data from station {stn} and variable {meteo_var} "
f"from {start_str} to {end_str}"
)
ts, meta = get_timeseries_stn(
stn=stn, meteo_var=meteo_var, settings=settings, start=start, end=end
)
elif fname is not None:
logger.info(f"get KNMI data from file {fname} and meteo variable {meteo_var}")
ts, meta = get_timeseries_from_file(
fname=fname,
meteo_var=meteo_var,
settings=settings,
start=start,
end=end,
)
elif xy is not None:
logger.info(
f"get KNMI data from station nearest to coordinates {xy} and meteo"
f"variable {meteo_var}"
)
stns = get_n_nearest_stations_xy(
xy=xy,
meteo_var=meteo_var,
start=start,
end=end,
n=1,
stations=None,
ignore=None,
)
ts, meta = get_timeseries_stn(
stn=stns[0], meteo_var=meteo_var, settings=settings, start=start, end=end
)
else:
raise ValueError(
"specify KNMI station (stn), filename (fname) or coordinates (xy)"
)
return ts, meta
[docs]def get_knmi_timeseries_fname(
fname: str,
meteo_var: str,
settings: dict[str, Any],
start: pd.Timestamp,
end: pd.Timestamp,
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""Get a knmi time series and metadata from a file.
.. deprecated:: 0.13.3
`get_knmi_timeseries_fname` will be removed in hydropandas 1.0.0, it is replaced
by `get_timeseries_from_file`.
Parameters
----------
fname : str
filename of the knmi file.
meteo_var : str
observation type e.g. "RH" or "EV24". See list with all options in the
hpd.read_knmi function.
settings : dict
settings for obtaining the right time series, see _get_default_settings
for more information
start : pd.Timestamp
start date of observations.
end : pd.Timestamp
end date of observations.
Returns
-------
ts_df : pandas DataFrame
time series with measurements.
meta : dictionary
metadata from the measurement station.
"""
warnings.warn(
"the function 'get_knmi_timeseries_fname' is deprecated and will eventually be "
"removed, please use 'get_timeseries_from_file'.",
DeprecationWarning,
)
return get_timeseries_from_file(fname, meteo_var, settings, start, end)
[docs]def get_timeseries_from_file(
fname: str,
meteo_var: str,
settings: dict[str, Any],
start: pd.Timestamp,
end: pd.Timestamp,
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""Get a knmi time series and metadata from a file.
Parameters
----------
fname : str
filename of the knmi file.
meteo_var : str
observation type e.g. "RH" or "EV24". See list with all options in the
hpd.read_knmi function.
settings : dict
settings for obtaining the right time series, see _get_default_settings
for more information
start : pd.Timestamp
start date of observations.
end : pd.Timestamp
end date of observations.
Returns
-------
ts_df : pandas DataFrame
time series with measurements.
meta : dictionary
metadata from the measurement station.
"""
df, meta = parse_data(fname)
fname = str(fname)
# first try to figure out filetype by it's name
if "neerslaggeg" in fname:
# neerslagstation
meteo_var = "RD"
add_day = False
elif "etmgeg" in fname:
# meteo station
add_day = True
elif "uurgeg" in fname:
add_day = False
# hourly station
# if that doesn't work try to figure out by the meteo_var and settings
elif meteo_var is None or meteo_var == "RD":
# neerslagstation
meteo_var = "RD"
add_day = False
elif settings["interval"] == "daily":
# meteo station
add_day = True
elif settings["interval"] == "hourly":
# uurlijks station
add_day = False
else:
raise ValueError(
"please indicate how to read the file by specifying a meteo_var and"
" an interval"
)
if df.empty:
logger.warning(
f"No data for {meteo_var=} in {fname=} between{start=} and {end=}."
)
else:
ts, meta = interpret_knmi_file(
df=df,
meta=meta,
meteo_var=meteo_var,
start=start,
end=end,
add_day=add_day,
add_hour=True,
)
stn = meta["station"]
stations = get_stations(meteo_var=meteo_var)
stn_name = get_station_name(stn=stn, stations=stations)
# set metadata
meta.update(
{
"x": stations.loc[stn, "x"],
"y": stations.loc[stn, "y"],
"name": f"{meteo_var}_{stn_name}_{stn}",
"location": stn_name,
"source": "KNMI",
"filename": fname,
}
)
return ts, meta
def _get_default_settings(settings=None) -> dict[str, Any]:
"""adds the default settings to a dictinary with settings. If settings
is None all the settings are default. If there are already settings given
only the non-existing settings are added with their default value.
The default settings are:
fill_missing_obs = False
nan values in time series are filled with nearby time series.
fill_missing_obs_with_factor = False
if True, and overlapping measurements exist between the current
series and a donor station, donor values are scaled by an overlap-
based factor before filling missing values. This automatically enables
fill_missing_obs.
interval = 'daily'
desired time interval for observations. Can be 'daily' or 'hourly'.
'hourly' is only for precipitation ('RH') data from meteo stations.
use_api : bool, optional
if True the api is used to obtain the data, API documentation is here:
https://www.knmi.nl/kennis-en-datacentrum/achtergrond/data-ophalen-vanuit-een-script
if False a text file is downloaded into a temporary folder and the data is read
from that file
Default is True since the api is back online (July 2021).
raise_exceptions : bool, optional
if True you get errors when no data is returned. The default is False.
Parameters
----------
settings : dict or None
settings dictionary without default values.
Returns
-------
settings : dict
settings dictionary with default values of no values was given in
the input dictionary.
"""
default_settings = {
"fill_missing_obs": False,
"fill_missing_obs_with_factor": False,
"interval": "daily",
"use_api": True,
"raise_exceptions": True,
}
if settings is None:
settings = {}
if settings.get("fill_missing_obs_with_factor", False):
if not settings.get("fill_missing_obs", False):
logger.debug(
"set fill_missing_obs=True because fill_missing_obs_with_factor is True"
)
settings["fill_missing_obs"] = True
if "fill_missing_obs" in settings:
if "raise_exceptions" in settings:
if settings["fill_missing_obs"] and settings["raise_exceptions"]:
logger.debug(
"set raise_exceptions=False because fill_missing_obs is True"
)
settings["raise_exceptions"] = False
else:
settings["raise_exceptions"] = False
for key, value in default_settings.items():
if key not in settings:
settings[key] = value
return settings
[docs]def get_knmi_timeseries_stn(
stn: int,
meteo_var: str,
settings: dict[str, Any],
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""Get a knmi time series and metadata.
.. deprecated:: 0.13.3
`get_knmi_timeseries_stn` will be removed in hydropandas 1.0.0, it is replaced
by `get_timeseries_stn`.
Parameters
----------
stn : int
measurement station e.g. 829.
meteo_var : str
observation type e.g. "RH" or "EV24". See list with all options in the
hpd.read_knmi function.
settings : dict
settings for obtaining the right time series, see _get_default_settings
for more information
start : pd.Timestamp or None, optional
start date of observations. The default is None.
end : pd.Timestamp or None, optional
end date of observations. The default is None.
Returns
-------
ts_df : pandas DataFrame
time series with measurements.
meta : dictionary
metadata from the measurement station.
"""
warnings.warn(
"the function 'get_knmi_timeseries_stn' is deprecated and will eventually be "
"removed, please use 'get_timeseries_stn'.",
DeprecationWarning,
)
return get_timeseries_stn(stn, meteo_var, settings, start, end)
[docs]def get_timeseries_stn(
stn: int,
meteo_var: str,
settings: dict[str, Any],
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""Get a knmi time series and metadata.
Parameters
----------
stn : int
measurement station e.g. 829.
meteo_var : str
observation type e.g. "RH" or "EV24". See list with all options in the
hpd.read_knmi function.
settings : dict
settings for obtaining the right time series, see _get_default_settings
for more information
start : pd.Timestamp or None, optional
start date of observations. The default is None.
end : pd.Timestamp or None, optional
end date of observations. The default is None.
Returns
-------
ts_df : pandas DataFrame
time series with measurements.
meta : dictionary
metadata from the measurement station.
"""
# get station
stations = get_stations(meteo_var=meteo_var)
stn_name = get_station_name(stn=stn, stations=stations)
# raise error if hourly neerslag station data is requested
if (meteo_var == "RD") and settings["interval"].startswith("hour"):
message = (
"No hourly precipitation data available at precipitation station, "
"use meteo_var 'RH' to obtain hourly precipitation data from "
"meteo stations."
)
raise ValueError(message)
elif (
(meteo_var == "EV24")
and settings["interval"].startswith("hour")
and settings["use_api"]
):
message = (
"No hourly evaporation data available through the api, set use_api=False."
)
raise ValueError(message)
elif settings["fill_missing_obs"]:
# download data
ts_df, meta = fill_missing_measurements(
stn=stn,
meteo_var=meteo_var,
start=start,
end=end,
settings=settings,
stn_name=stn_name,
)
return ts_df, meta
elif meteo_var in ["penman", "makkink", "hargreaves"]:
# compute evaporation from data
ts_df, meta = get_evaporation(
meteo_var=meteo_var, stn=stn, start=start, end=end, settings=settings
)
else:
ts_df, variables, station_meta = download_knmi_data(
stn=stn,
meteo_var=meteo_var,
start=start,
end=end,
settings=settings,
stn_name=stn_name,
)
if ts_df.empty:
logger.warning(
f"No data for {meteo_var=} at {stn=} between{start=} and {end=}."
)
if str(stn) in station_meta.index:
meta = station_meta.loc[f"{stn}"].to_dict()
else:
meta = {}
# set metadata
x = stations.at[stn, "x"] if stn in stations.index else np.nan
y = stations.at[stn, "y"] if stn in stations.index else np.nan
meta.update(
{
"x": x,
"y": y,
"station": stn,
"name": f"{meteo_var}_{stn_name}_{stn}",
"location": stn_name,
"source": "KNMI",
}
)
meta.update(variables)
return ts_df, meta
[docs]def get_stations(
meteo_var: str,
start: pd.Timestamp | str | None = None,
end: pd.Timestamp | str | None = None,
) -> pd.DataFrame:
"""get knmi stations from json files according to variable.
Parameters
----------
meteo_var : str, optional
type of meteodata, by default 'RH'
start : str, datetime or None, optional
start date of observations. The default is None.
end : str, datetime or None, optional
end date of observations. The default is None.
Returns
-------
pandas DataFrame with stations, names and coordinates (Lat/Lon & RD)
"""
dir_path = os.path.dirname(os.path.realpath(__file__))
mstations = pd.read_json(os.path.join(dir_path, "../data/knmi_meteostation.json"))
pstations = pd.read_json(
os.path.join(dir_path, "../data/knmi_neerslagstation.json")
)
stations = pd.concat([mstations, pstations], axis=0)
stations = stations.where(~stations.isna(), False)
if meteo_var in ("makkink", "penman", "hargreaves"):
meteo_var = "EV24"
# select only stations with meteo_var
if meteo_var == slice(None):
meteo_mask = stations.loc[:, meteo_var].any(axis=1)
else:
meteo_mask = stations.loc[:, meteo_var]
stations = stations.loc[
meteo_mask, ["lon", "lat", "name", "x", "y", "altitude", "tmin", "tmax"]
]
# select only stations with measurement
if start is not None or end is not None:
stations = _get_stations_tmin_tmax(stations, start, end)
return stations
def _get_stations_tmin_tmax(stations_df, start, end):
"""select stations within period defined by start and end.
Parameters
----------
stations_df : pd.DataFrame
stations
start : datetime or None, optional
start date of observations.
end : datetime or None, optional
end date of observations.
Returns
-------
DataFrame with all station with measurements in selected period.
Notes
-----
Does not work on a DataFrames with duplicate indices
"""
if stations_df.index.duplicated().any():
raise IndexError("function does not work for dataframe with duplicated index")
if end is None:
tmin_stns = set(stations_df.index)
else:
# keep stations where tmin is unknown (=False)
stns_unknown_tmin = set(stations_df.loc[stations_df["tmin"] == False].index)
tmin_available = stations_df.loc[stations_df["tmin"] != False, "tmin"]
tmin_within_range = pd.to_datetime(tmin_available) < end
tmin_stns = set(tmin_available.loc[tmin_within_range].index) | stns_unknown_tmin
if start is None:
tmax_stns = set(stations_df.index)
else:
stns_unknown_tmax = set(stations_df.loc[stations_df["tmax"] == False].index)
tmax_available = stations_df.loc[stations_df["tmax"] != False, "tmax"]
tmax_available.loc[tmax_available.isnull()] = dt.datetime.now().date()
tmax_within_range = pd.to_datetime(tmax_available) > start
tmax_stns = set(tmax_available.loc[tmax_within_range].index) | stns_unknown_tmax
return stations_df.loc[list(tmin_stns & tmax_stns)]
[docs]def get_station_name(stn: int, stations: pd.DataFrame | None = None) -> str:
"""Returns the station name from a KNMI station.
Modifies the station name in such a way that a valid url can be obtained.
Parameters
----------
stn : int
station number.
stations : pandas DataFrame or None
DataFrame with the station metadata.
Returns
-------
str or None
Name of the station or None if station is not found.
"""
if stations is None:
stations = get_stations(meteo_var=slice(None))
if stn not in stations.index:
logger.warning(f"station {stn} not found")
return None
stn_name = stations.at[stn, "name"]
if isinstance(stn_name, pd.Series):
raise TypeError(
f'station {stn} is a meteo- and a precipitation station, please indicate which one you want to use using a "meteo_var"'
)
stn_name = stn_name.upper().replace(" ", "-").replace("(", "").replace(")", "")
return stn_name
[docs]def fill_missing_measurements(
stn: int,
meteo_var: str,
start: pd.Timestamp,
end: pd.Timestamp,
settings: dict[str, Any],
stn_name: str | None = None,
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""fill missing measurements in knmi data.
Parameters
----------
stn : int
measurement station.
meteo_var : str
observation type.
start : pd.Timestamp
start date of observations.
end : pd.Timestamp
end date of observations.
settings : dict
settings for obtaining data.
stn_name : str, optional
station name. If None the name is obtained form te stn number. The
default is None.
Returns
-------
knmi_df : pandas DataFrame
data from one station from one type of observation, with additional
column to see which station is used to fill the value
meta : dictionary
metadata from the originally requested station even if this station
has no data
"""
use_overlap_factor = settings.get("fill_missing_obs_with_factor", False)
if settings["interval"] == "hourly":
raise NotImplementedError("cannot yet fill missing values in hourly data")
if (start is None) or (end is None):
raise ValueError(
"a start and end date have to be specified if fill_missing_obs is True"
)
# 1. get stations
stations = get_stations(meteo_var=meteo_var)
if stn not in stations.index:
logger.error(f"station {stn} does not exists or does not measure {meteo_var}")
stations_period = get_stations(meteo_var=meteo_var, start=start, end=end)
if stn_name is None:
stn_name = get_station_name(stn=stn, stations=stations)
meta = stations.loc[stn].to_dict()
meta.update(
{
"station": stn,
"name": f"{meteo_var}_{stn_name}_{stn}",
"location": stn_name,
"source": "KNMI",
}
)
# 2. Download the data from the given station
if stn not in stations_period.index:
# no measurements in given period, continue with empty dataframe
ts_df = pd.DataFrame()
# add location of station without data to dataframe
stations_period = pd.concat([stations_period, stations.loc[[stn]]])
else:
# download data from station if it has data between start-end
ts_df, variables, _station_meta = download_knmi_data(
stn, meteo_var, start, end, settings, stn_name
)
# 3. Change start date
# NOTE: Assuming there is no data before the 'tmin' dates in the json files.
tmin_first_meas = pd.Timestamp(stations["tmin"].min())
if end < tmin_first_meas:
msg = (
f"There are no measurements available between {start} and {end} "
f"for variable {meteo_var}."
)
logger.warning(msg)
return ts_df, meta
elif start < tmin_first_meas:
msg = (
f"There are no measurements available for {meteo_var} before "
f"{tmin_first_meas} and a start of {start} was requested. Changing "
f"start to {tmin_first_meas}"
)
logger.info(msg)
start = tmin_first_meas
# 4. Change end date
# NOTE: Assuming there is no data after the last measurement available in de Bilt.
stn_de_bilt = 550 if meteo_var == "RD" else 260
first_meas_de_bilt = pd.Timestamp(stations.loc[stn_de_bilt, "tmin"])
# only change end if dataframe does not have measurements at the end date
if end < first_meas_de_bilt:
msg = (
f"knmi station De Bilt has no measurements before {first_meas_de_bilt}"
"no need to change the end date"
)
logger.debug(msg)
elif ts_df.empty or (end > ts_df.index[-1]):
# get measurements at de Bilt
start_de_bilt = start if ts_df.empty else ts_df.index[-1]
ts_de_bilt, _, _ = download_knmi_data(
stn_de_bilt,
meteo_var,
start=start_de_bilt,
end=end,
settings=settings,
stn_name="De Bilt",
)
if ts_de_bilt.empty:
msg = (
f"knmi station De Bilt has no measurements between {start_de_bilt} and"
f" {end} for variable {meteo_var}. It is assumed no data is available"
f" for this period"
)
logger.warning(msg)
return ts_df, meta
new_end = ts_de_bilt.index[-1]
if new_end < end:
msg = (
f"knmi station De Bilt has no measurements for {meteo_var} after "
f"{new_end} and an end date of {end} was requested. Changing end to "
f"{new_end}"
)
logger.info(msg)
end = new_end
# 5. get starting dataframe with measurements
# if there are no measurements that fit the requirements at the given stn
# read the nearby station
ignore = [stn]
while ts_df.empty:
stn_lst = get_nearest_station_df(
stations_period.loc[[ignore[0]]],
meteo_var=meteo_var,
start=start,
end=end,
stations=stations_period,
ignore=ignore,
)
if stn_lst is None:
logger.error(
f"there is no station with measurements of {meteo_var} between "
f"{start} and {end}"
)
return pd.DataFrame(), meta
msg = (
f"station {stn} has no measurements between {start} and {end} "
f"trying to get measurements from nearest station -> {stn_lst[0]}"
)
logger.info(msg)
stn = stn_lst[0]
stn_name = get_station_name(stn=stn, stations=stations_period)
ts_df, variables, _station_meta = download_knmi_data(
stn, meteo_var, start, end, settings, stn_name
)
# do not use station number for metadata
if "station" in variables:
variables.pop("station")
# ignore this station
ignore.append(stn)
# 6. find and fill missing values
ts_df = _add_missing_indices(ts_df, stn, start, end)
missing = ts_df[meteo_var].isna()
logger.debug(f"station {stn} has {missing.sum()} missing measurements")
ts_df.loc[~missing, "station"] = str(stn)
while np.any(missing) and not np.all(missing):
stn_comp = get_nearest_station_df(
stations_period.loc[[stn]],
meteo_var=meteo_var,
start=start,
end=end,
stations=stations_period,
ignore=ignore,
)
if stn_comp is None:
logger.warning(
"Could not fill all missing measurements as there are "
"no stations left to check!"
)
missing[:] = False
break
else:
stn_comp = stn_comp[0]
stn_name_comp = get_station_name(stn_comp, stations_period)
logger.debug(
f"Trying to fill {missing.sum()} missing measurements with "
f"station {stn_comp} {stn_name_comp}"
)
ts_df_comp, _, __ = download_knmi_data(
stn_comp, meteo_var, start, end, settings, stn_name_comp
)
if ts_df_comp.empty:
logger.debug(f"No data available for station {stn_comp}")
else:
# dropnans from new data
ts_df_comp = ts_df_comp.loc[~ts_df_comp[meteo_var].isna(), :]
if use_overlap_factor:
factor, n_overlap = _get_overlap_factor(ts_df, ts_df_comp, meteo_var)
if factor != 1.0:
ts_df_comp = ts_df_comp.copy()
ts_df_comp[meteo_var] = ts_df_comp[meteo_var] * factor
logger.info(
f"Scale station {stn_comp} data with factor {factor:.3f} "
f"based on overlap with station {stn} "
f"using {n_overlap} measurements"
)
# get index of missing data in original timeseries
missing_idx = missing.loc[missing].index
# if any missing are in the new data, update
if missing_idx.isin(ts_df_comp.index).any():
# index for missing but in newly downloaded data
ix_idx = missing_idx.intersection(ts_df_comp.index)
# update missing data
ts_df.loc[ix_idx, meteo_var] = ts_df_comp.loc[ix_idx, meteo_var]
# add source station number
ts_df.loc[ix_idx, "station"] = str(stn_comp)
logger.info(
f"Filled {ix_idx.size} observations from station {stn_comp} "
f"{stn_name_comp} -> {stn} {stn_name}"
)
else:
logger.debug(
f"No new data available from {stn_comp} {stn_name_comp} "
f"for filling missing measurements"
)
missing = ts_df[meteo_var].isna()
ignore.append(stn_comp)
meta.update(variables)
return ts_df, meta
def _get_overlap_factor(
ts_df: pd.DataFrame, ts_df_comp: pd.DataFrame, meteo_var: str
) -> tuple[float, int]:
"""Estimate scaling factor from overlap between two station series.
The returned factor scales donor-station values so they better align with
the current series before filling missing values.
"""
if (meteo_var not in ts_df.columns) or (meteo_var not in ts_df_comp.columns):
return 1.0, 0
overlap = pd.concat(
[ts_df.loc[:, [meteo_var]], ts_df_comp.loc[:, [meteo_var]]],
axis=1,
keys=["base", "donor"],
).dropna()
if overlap.empty:
return 1.0, 0
base = overlap[("base", meteo_var)]
donor = overlap[("donor", meteo_var)]
base_valid = base.replace([np.inf, -np.inf], np.nan).dropna()
donor_valid = donor.replace([np.inf, -np.inf], np.nan).dropna()
common_idx = base_valid.index.intersection(donor_valid.index)
if common_idx.empty:
return 1.0, 0
base_sum = float(base_valid.loc[common_idx].sum())
donor_sum = float(donor_valid.loc[common_idx].sum())
if donor_sum == 0.0 or base_sum == 0.0:
return 1.0, 0
factor = base_sum / donor_sum
if not np.isfinite(factor) or (factor <= 0):
return 1.0, 0
return factor, int(common_idx.size)
[docs]def download_knmi_data(
stn: int,
meteo_var: str,
start: pd.Timestamp,
end: pd.Timestamp,
settings: dict[str, Any],
stn_name: str | None = None,
) -> tuple[pd.DataFrame, dict[str, Any], pd.DataFrame]:
"""download knmi data of a measurements station for certain observation
type.
Parameters
----------
stn : int
measurement station.
meteo_var : str
observation type.
start : pd.Timestamp
start date of observations.
end : pd.Timestamp
end date of observations.
settings : dict
settings for obtaining data
stn_name : str, optional
station name. If None the name is obtained form te stn number. The
default is None.
Raises
------
ValueError
if the data from knmi cannot not be read a ValueError is raised.
Unless raise_exceptions is False
Returns
-------
ts_df : pandas DataFrame
data from one station from one type of observation
variables : dictionary
information about the observed variables
stations : pandas DataFrame
information about the measurement station
"""
msg = f"{stn}-{stn_name} between {start} and {end}"
logger.debug(f"download KNMI {meteo_var} data from station " + msg)
# define variables
ts_df = pd.DataFrame()
variables = {}
# download and read data
try:
try:
if settings["use_api"]:
if settings["interval"].startswith("hour"):
# hourly data from meteorological stations
df, meta = get_hourly_meteo_api(
stn=stn, meteo_var=meteo_var, start=start, end=end
)
add_day = False
elif meteo_var == "RD":
# daily data from rainfall-stations
df, meta = get_daily_rainfall_api(stn=stn, start=start, end=end)
add_day = False
else:
# daily data from meteorological stations
df, meta = get_daily_meteo_api(
stn=stn, meteo_var=meteo_var, start=start, end=end
)
add_day = True
if not df.empty:
ts_df, variables = interpret_knmi_file(
df=df,
meta=meta,
meteo_var=meteo_var,
start=start,
end=end,
add_day=add_day,
add_hour=True,
)
except (RuntimeError, requests.ConnectionError):
logger.warning(
"KNMI API failed, try setting the 'use_api' argument to 'False'"
)
if settings["raise_exceptions"]:
raise
logger.info("Try non api method")
settings = settings.copy()
settings["use_api"] = False
if not settings["use_api"]:
if settings["interval"].startswith("hour"):
# hourly data from meteorological stations
raise NotImplementedError()
elif meteo_var == "RD":
# daily data from rainfall-stations
df, meta = get_daily_rainfall_url(stn, stn_name)
add_day = False
else:
# daily data from meteorological stations
df, meta = get_daily_meteo_url(stn=stn)
add_day = True
if not df.empty:
ts_df, variables = interpret_knmi_file(
df=df,
meta=meta,
meteo_var=meteo_var,
start=start,
end=end,
add_day=add_day,
add_hour=True,
)
except (ValueError, KeyError, pd.errors.EmptyDataError) as e:
logger.error(f"{e} {msg}")
if settings["raise_exceptions"]:
raise
stations = get_stations(meteo_var=meteo_var).loc[[stn], :]
return ts_df, variables, stations
[docs]def get_knmi_daily_rainfall_api(
stn: int,
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""download and read knmi daily rainfall.
.. deprecated:: 0.13.3
`get_knmi_daily_rainfall_api` will be removed in hydropandas 1.0.0, it is replaced
by `get_daily_rainfall_api`.
Parameters
----------
stn : int
station number.
start : pd.Timestamp or None
start time of observations.
end : pd.Timestamp or None
end time of observations.
Raises
------
ValueError
if there is no data for the provided stn an error is raised.
Returns
-------
pandas DataFrame
measurements.
variables : dictionary
additional information about the variables
"""
warnings.warn(
"the function 'get_knmi_daily_rainfall_api' is deprecated and will eventually be "
"removed, please use 'get_daily_rainfall_api'.",
DeprecationWarning,
)
return get_daily_rainfall_api(stn, start, end)
[docs]@lru_cache
def get_daily_rainfall_api(
stn: int,
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""download and read knmi daily rainfall.
Parameters
----------
stn : int
station number.
start : pd.Timestamp or None
start time of observations.
end : pd.Timestamp or None
end time of observations.
Raises
------
ValueError
if there is no data for the provided stn an error is raised.
Returns
-------
pandas DataFrame
measurements.
variables : dictionary
additional information about the variables
"""
meteo_var = "RD"
url = URL_DAILY_PREC
params = {"vars": meteo_var, "stns": str(stn)}
if start is not None:
params["start"] = (start - pd.Timedelta(days=1)).strftime("%Y%m%d")
if end is not None:
params["end"] = end.strftime("%Y%m%d")
strio = request_api(url, params)
return parse_data(strio)
[docs]def request_url(url: str, fname=None) -> StringIO:
"""download and read knmi daily rainfall data.
Parameters
----------
stn : int
station number.
fname : str or None, optional
filename to save the data to, only used if not None. The default is None.
Returns
-------
StringIO
"""
# request zipfile
r = requests.get(url, stream=True, timeout=60)
r.raise_for_status()
# unpack data
with ZipFile(BytesIO(r.content), mode="r") as zf:
zipfile = zf.read(zf.namelist()[0])
result_str = zipfile.decode().replace("\r\n", "\n")
if fname is not None:
with open(fname, "w") as f:
f.write(result_str)
return StringIO(result_str)
[docs]def get_knmi_daily_rainfall_url(
stn: int,
stn_name: str,
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""download and read knmi daily rainfall.
.. deprecated:: 0.13.3
`get_knmi_daily_rainfall_url` will be removed in hydropandas 1.0.0, it is replaced
by `get_daily_rainfall_url`.
Parameters
----------
stn : int
station number.
stn_name : str
the name of the station in capital letters, can be tricky
Raises
------
ValueError
if there is no data for the provided stn an error is raised.
Returns
-------
pandas DataFrame
measurements.
variables : dictionary
additional information about the variables
"""
warnings.warn(
"the function 'get_knmi_daily_rainfall_url' is deprecated and will eventually be "
"removed, please use 'get_daily_rainfall_url'.",
DeprecationWarning,
)
get_daily_rainfall_url(stn, stn_name)
[docs]@lru_cache
def get_daily_rainfall_url(
stn: int,
stn_name: str,
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""download and read knmi daily rainfall.
Parameters
----------
stn : int
station number.
stn_name : str
the name of the station in capital letters, can be tricky
Raises
------
ValueError
if there is no data for the provided stn an error is raised.
Returns
-------
pandas DataFrame
measurements.
variables : dictionary
additional information about the variables
"""
stn = f"{stn:03d}" # make sure there are leading zeros
url = (
"https://cdn.knmi.nl/knmi/map/page/klimatologie/"
f"gegevens/monv_reeksen/neerslaggeg_{stn_name}_{stn}.zip"
)
strio = request_url(url)
return parse_data(strio)
def _transform_variables(
df: pd.DataFrame, variables: dict[str, Any]
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""Transforms the timeseries to default units and settings.
Does 4 things:
1. all values equal to -1 are converted to zero
2. the units are changed from 0.1 mm to 1 mm.
3. the units are changed from mm to m.
4. the timezone is converted from UTC to UTC+1
Parameters
----------
df : pandas DataFrame
time series.
variables : dictionary
description of variables in time series.
Raises
------
KeyError
if there are columns in the DataFrame and no matching key in the
variables dictionary.
Returns
-------
df : pandas DataFrame
time series.
variables : dictionary
description of variables in time series.
"""
variables = variables.copy()
df = df.copy()
add_m_unit = False
for key, value in variables.items():
# test if key existst in data
if key not in df.columns:
if key in ["YYYYMMDD", "HH"]:
pass
elif key == "T10N":
variables.pop(key)
key = "T10"
else:
raise KeyError(key + " does not exist in data")
if "(-1 voor <0.05 mm)" in value:
# remove -1 for precipitation smaller than <0.05 mm
df.loc[df.loc[:, key] == -1, key] = 0.0
value = value.replace("(-1 voor <0.05 mm)", "(0 voor <0.05mm)").replace(
"(-1 for <0.05 mm)", "(0 for <0.05mm)"
)
logger.debug(f"transform {key}, {value} lower than 0.05 mm to 0 mm")
if "0.1 " in value:
logger.debug(f"transform {key}, {value} from 0.1 to 1")
df[key] = df[key] * 0.1
value = value.replace("0.1 ", "")
if " tiende " in value:
logger.debug(f"transform {key}, {value} from 0.1 to 1")
df[key] = df[key] * 0.1
value = value.replace(" tiende ", " ")
if " mm" in value:
logger.debug(f"transform {key}, {value} from mm to m")
df[key] = df[key] * 0.001
value = value.replace(" mm", " m")
add_m_unit = True
if " millimeters" in value:
logger.debug(f"transform {key}, {value} from mm to m")
df[key] = df[key] * 0.001
add_m_unit = True
if "08.00 UTC" in value:
logger.debug(f"transform {key}, {value} from UTC to UTC+1")
# over the period 08.00 preceding day - 08.00 UTC present day
df.index = df.index + pd.to_timedelta(8, unit="h")
value = value.replace("08.00", "09.00").replace("UTC", "UTC+1")
# Store new variable
variables[key] = value
if add_m_unit:
variables["unit"] = "m"
else:
variables["unit"] = ""
return df, variables
[docs]def request_api(url: str, params: dict[str, str], fname=None) -> StringIO:
"""Download KNMI data from the API
Parameters
----------
url : str
URL to parse the request to
params : Dict[str, str]
Dictionary with parameters that are parsed to the request get
fname : str or None, optional
filename to save the data to only used if not None, by default None
Returns
-------
StringIO
"""
r = requests.get(url, params=params, timeout=60)
r.raise_for_status()
result_str = r.text
if result_str.startswith("<!DOCTYPE html>"):
raise RuntimeError("KNMI API down\n" + result_str)
if fname is not None:
with open(fname, "w") as f:
f.write(result_str)
return StringIO(result_str)
[docs]def get_knmi_daily_meteo_api(
stn, start=None, end=None, meteo_var: str | None = None
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""download and read knmi daily meteo data.
Parameters
----------
stn : int
station number.
meteo_var : str
e.g. 'EV24'.
start : pd.Timestamp or None
start time of observations.
end : pd.Timestamp or None
end time of observations.
Returns
-------
pandas DataFrame
measurements.
variables : dictionary
additional information about the variables
stations : pandas DataFrame
additional data about the measurement station
"""
warnings.warn(
"the function 'get_knmi_daily_meteo_api' is deprecated and will eventually be "
"removed, please use 'get_daily_meteo_api'.",
DeprecationWarning,
)
return get_daily_meteo_api(stn, start, end, meteo_var)
[docs]@lru_cache
def get_daily_meteo_api(
stn, start=None, end=None, meteo_var: str | None = None
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""download and read knmi daily meteo data.
Parameters
----------
stn : int
station number.
meteo_var : str
e.g. 'EV24'.
start : pd.Timestamp or None
start time of observations.
end : pd.Timestamp or None
end time of observations.
Returns
-------
pandas DataFrame
measurements.
variables : dictionary
additional information about the variables
stations : pandas DataFrame
additional data about the measurement station
"""
url = URL_DAILY_METEO
params = {
"stns": str(stn),
}
if meteo_var is not None:
params["vars"] = meteo_var
# modify start and end date for the api call because the dates are later
# transformed, see example notebook 02_knmi_observations.
if start is not None:
start = pd.Timestamp(start) - dt.timedelta(days=1)
params["start"] = start.strftime("%Y%m%d")
if end is not None:
end = pd.Timestamp(end) - dt.timedelta(days=1)
params["end"] = end.strftime("%Y%m%d")
strio = request_api(url, params)
return parse_data(strio)
[docs]def get_knmi_daily_meteo_url(stn: int) -> tuple[pd.DataFrame, dict[str, Any]]:
"""download and read knmi daily meteo data.
.. deprecated:: 0.13.3
`get_knmi_daily_meteo_url` will be removed in hydropandas 1.0.0, it is replaced
by `get_daily_meteo_url`.
Parameters
----------
stn : int
station number.
Returns
-------
pandas DataFrame
measurements.
meta : dictionary
additional information about the variables
"""
warnings.warn(
"the function 'get_knmi_daily_meteo_url' is deprecated and will eventually be "
"removed, please use 'get_daily_meteo_url'.",
DeprecationWarning,
)
return get_daily_meteo_url(stn)
[docs]@lru_cache
def get_daily_meteo_url(stn: int) -> tuple[pd.DataFrame, dict[str, Any]]:
"""download and read knmi daily meteo data.
Parameters
----------
stn : int
station number.
Returns
-------
pandas DataFrame
measurements.
meta : dictionary
additional information about the variables
"""
url = (
"https://cdn.knmi.nl/knmi/map/page/klimatologie"
f"/gegevens/daggegevens/etmgeg_{stn}.zip"
)
strio = request_url(url)
return parse_data(strio)
[docs]def parse_data(
path: str | Path | StringIO,
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""read knmi daily meteo data from a file
Parameters
----------
path : str, pathlib.Path or StringIO
file path of .txt file or str representation.
Returns
-------
pandas DataFrame
measurements.
meta : dictionary
additional information about the variables
"""
meta_id1 = " = "
meta_id2 = " : "
data_id = "STN,"
meta = {}
df = None
if isinstance(path, (os.PathLike, str)):
f = open(path, "r")
elif isinstance(path, StringIO):
f = path
else:
raise TypeError("Expected a path as str or StringIO object")
line = f.readline()
while line:
if meta_id1 in line or meta_id2 in line:
key, item = (
line.split(meta_id1) if meta_id1 in line else line.split(meta_id2)
)
meta[key.strip("# ")] = item.strip()
elif data_id in line:
try:
columns = line.strip("# ").strip("\n").split(",")
columns = [x.strip(" ") for x in columns]
df = pd.read_csv(
f,
header=None,
na_values=" ",
delimiter=",",
skip_blank_lines=True,
)
if columns[-2:] == ["HH", "YYYYMMDD"]:
columns = columns[:-2]
df.columns = columns
hour_col = "HH" if "HH" in columns else None
if hour_col is not None:
df.loc[df[hour_col] == 24, hour_col] = 0
datetime = pd.to_datetime(
df.YYYYMMDD.astype(str) + df[hour_col].astype(str).str.zfill(2),
format="%Y%m%d%H",
)
datetime.loc[datetime.dt.hour == 0] = datetime + dt.timedelta(
days=1
)
df = df.drop(columns=["YYYYMMDD", hour_col]).loc[
df.index.notnull(), :
]
else:
datetime = pd.to_datetime(df.YYYYMMDD, format="%Y%m%d")
df = df.drop(columns=["YYYYMMDD"]).loc[df.index.notnull(), :]
df = df.set_index(datetime)
except pd.errors.EmptyDataError as e:
logger.warning(f"{e!s}. Returning empty DataFrame.")
df = pd.DataFrame()
f.close()
return df, meta
line = f.readline()
f.close()
if df is None:
raise ValueError(f"Could not read: {path}")
[docs]def interpret_knmi_file(
df: pd.DataFrame,
meta: dict[str, Any],
meteo_var: str,
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
add_day: bool = False,
add_hour: bool = True,
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""interpret data from knmi by selecting meteo_var data and meta
and transforming the variables
Parameters
----------
df: DataFrame
dataframe with meteo_var as column
meta: dictionary
dictionary with meteo_var as key
meteo_var : str
e.g. 'EV24'.
start : pd.Timestamp or None
start time of observations.
end : pd.Timestamp or None
end time of observations.
add_day : boolean, optional
add 1 day so that the timestamp is at the end of the period the data describes,
default is False, and has to be set per type of file.
add_hour : boolean, optional
add 1 hour to convert from UT to UT+1 (standard-time in the Netherlands),
default is True as this is usually the case.
Returns
-------
pandas DataFrame
measurements.
variables : dictionary
additional information about the variables
station : int
meteostation
"""
variables = {meteo_var: meta[meteo_var]}
stn = None
if not df.empty:
unique_stn = df["STN"].unique()
if len(unique_stn) > 1:
raise ValueError(
f"Cannot handle multiple stations {unique_stn} in single file"
)
stn = unique_stn[0]
if add_day or add_hour:
if add_day and add_hour:
timedelta = pd.Timedelta(1, "d") + pd.Timedelta(1, "h")
elif add_hour:
timedelta = pd.Timedelta(1, "h")
else:
timedelta = pd.Timedelta(1, "d")
df = df.copy()
df.index = df.index + timedelta
if df.index.has_duplicates:
df = df.loc[~df.index.duplicated(keep="first")]
logger.info("duplicate indices removed from RD measurements")
if df.empty:
return pd.DataFrame(), variables
mdf, variables = _transform_variables(df, variables)
variables["station"] = stn
istart = (
mdf.index.get_indexer([start], method="backfill")[0]
if start is not None
else 0
)
iend = (
mdf.index.get_indexer([end], method="backfill")[0]
if end is not None
else -1
)
iend = len(mdf) if iend == -1 else iend + 1
icol = mdf.columns.get_indexer([meteo_var])
meteo_df = mdf.iloc[istart:iend, icol].dropna()
if not meteo_df.empty:
return meteo_df, variables
return pd.DataFrame(), variables
[docs]def get_knmi_hourly_meteo_api(
stn: int, start: pd.Timestamp, end: pd.Timestamp, meteo_var: str | None = None
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""Retrieve hourly meteorological data from the KNMI API.
.. deprecated:: 0.13.3
`get_knmi_hourly_meteo_api` will be removed in hydropandas 1.0.0, it is replaced
by `get_hourly_meteo_api`.
Parameters
----------
stn : int
The station number.
start : pd.Timestamp or None
The start date and time of the data.
end : pd.Timestamp or None
The end date and time of the data.
Returns
-------
pandas.DataFrame
A DataFrame containing the requested meteorological variable data.
dict
A dictionary containing information about the variables in the DataFrame.
Raises
------
requests.ConnectionError
If there is a connection error while accessing the KNMI API.
RuntimeError
If the KNMI API is down or returns unexpected data.
"""
warnings.warn(
"the function 'get_knmi_hourly_meteo_api' is deprecated and will eventually be "
"removed, please use 'get_hourly_meteo_api'.",
DeprecationWarning,
)
return get_hourly_meteo_api(stn, start, end, meteo_var)
[docs]@lru_cache
def get_hourly_meteo_api(
stn: int, start: pd.Timestamp, end: pd.Timestamp, meteo_var: str | None = None
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""Retrieve hourly meteorological data from the KNMI API.
Parameters
----------
stn : int
The station number.
start : pd.Timestamp or None
The start date and time of the data.
end : pd.Timestamp or None
The end date and time of the data.
Returns
-------
pandas.DataFrame
A DataFrame containing the requested meteorological variable data.
dict
A dictionary containing information about the variables in the DataFrame.
Raises
------
requests.ConnectionError
If there is a connection error while accessing the KNMI API.
RuntimeError
If the KNMI API is down or returns unexpected data.
"""
url = URL_HOURLY_METEO
params = {
"stns": str(stn),
}
if meteo_var is not None:
params["vars"] = meteo_var
# It looks like the API has an option to select the start and end hour that
# you want but in reality it does not have this option.
if start is None:
raise ValueError("A start date is required when using hourly interval")
if end is None:
raise ValueError("An end date is required when using hourly interval")
if (end - start).days > 365 * 10:
raise ValueError("time span for hourly data cannot be greater than 10 years")
if (end - start).days < 1:
raise ValueError("time span should be more than 1 day")
params["end"] = end.strftime("%Y%m%d") + "24"
s = start - pd.Timedelta(1, unit="h")
params["start"] = s.strftime("%Y%m%d") + "01"
strio = request_api(url=url, params=params)
return parse_data(strio)
[docs]def get_nearest_station_df(
locations: pd.DataFrame,
xcol: str = "x",
ycol: str = "y",
stations: pd.DataFrame | None = None,
meteo_var: str = "RH",
start: pd.Timestamp | str | None = None,
end: pd.Timestamp | str | None = None,
ignore: list[str] | None = None,
) -> list[int]:
"""Find the KNMI stations that measure 'meteo_var' closest to the
coordinates in 'locations'.
Parameters
----------
locations : pandas.DataFrame
DataFrame containing x and y coordinates
xcol : str
name of the column in the locations dataframe with the x values
ycol : str
name of the column in the locations dataframe with the y values
stations : pandas DataFrame, optional
if None stations will be obtained using the get_stations function.
The default is None.
meteo_var : str
measurement variable e.g. 'RH' or 'EV24'
start : str, datetime or None, optional
start date of observations. The default is None.
end : str, datetime or None, optional
end date of observations. The default is None.
ignore : list, optional
list of stations to ignore. The default is None.
Returns
-------
stns : list
station numbers.
"""
if stations is None:
stations = get_stations(meteo_var=meteo_var, start=start, end=end)
if ignore is not None:
stations = stations.drop(ignore)
if stations.empty:
return None
xo = pd.to_numeric(locations[xcol])
xt = pd.to_numeric(stations.x)
yo = pd.to_numeric(locations[ycol])
yt = pd.to_numeric(stations.y)
xh, xi = np.meshgrid(xt, xo)
yh, yi = np.meshgrid(yt, yo)
distances = pd.DataFrame(
np.sqrt((xh - xi) ** 2 + (yh - yi) ** 2),
index=locations.index,
columns=stations.index,
)
stns = distances.idxmin(axis=1).to_list()
return stns
[docs]def get_nearest_station_xy(
xy: list[list[float]],
stations: pd.DataFrame | None = None,
meteo_var: str = "RH",
ignore: list[str] | None = None,
) -> list[int]:
"""find the KNMI stations that measure 'meteo_var' closest to the given
x and y coordinates.
Parameters
----------
xy : list or numpy array, optional
xy coordinates of the locations. e.g. [[10,25], [5,25]]
stations : pandas DataFrame, optional
if None stations will be obtained using the get_stations function.
The default is None.
meteo_var : str
measurement variable e.g. 'RH' or 'EV24'
ignore : list, optional
list of stations to ignore. The default is None.
Returns
-------
stns : list
station numbers.
Notes
-----
assumes you have a structured rectangular grid.
"""
locations = pd.DataFrame(data=xy, columns=["x", "y"])
stns = get_nearest_station_df(
locations,
xcol="x",
ycol="y",
stations=stations,
meteo_var=meteo_var,
ignore=ignore,
)
return stns
[docs]def get_n_nearest_stations_xy(
xy: list[list[float]],
meteo_var: str,
start: pd.Timestamp | str | None = None,
end: pd.Timestamp | str | None = None,
n: int = 1,
stations: pd.DataFrame | None = None,
ignore: list[str] | None = None,
) -> list[int]:
"""Find the N nearest KNMI stations that measure variable 'meteo_var' to
the x, y coordinates.
Parameters
----------
xy : list, tuple or numpy.array of int or float
sinlge pair of xy coordinates. e.g. (150_000., 400_000.)
meteo_var : str
measurement variable e.g. 'RH' or 'EV24'
start : str, datetime or None, optional
start date of observations. The default is None.
end : str, datetime or None, optional
end date of observations. The default is None.
n : int, optional
number of stations you want to return. The default is 1.
stations : pandas DataFrame, optional
if None stations will be obtained using the get_stations function.
The default is None.
ignore : list, optional
list of stations to ignore. The default is None.
Returns
-------
list
station numbers.
"""
if stations is None:
stations = get_stations(meteo_var=meteo_var, start=start, end=end)
if ignore is not None:
stations.drop(ignore, inplace=True)
if stations.empty:
return None
distance = np.sqrt((stations.x - xy[0]) ** 2 + (stations.y - xy[1]) ** 2)
stns = distance.nsmallest(n).index.to_list()
return stns
def _add_missing_indices(
ts_df: pd.DataFrame, stn: int, start: pd.Timestamp, end: pd.Timestamp
) -> pd.DataFrame:
"""When downloading KNMI data you don't always get a DataFrame with the
periods that you provided in your request. Thus the index does not cover
the complete period that you are interested in. This function adds the
missing period to the index of the DataFrame.
Parameters
----------
ts_df : pandas DataFrame
data from one station from one type of observation, with additional
column to see which station is used to fill the value
stn : int or str
measurement station.
start : pd.Timestamp
start time of observations.
end : pd.Timestamp
end time of observations.
Returns
-------
ts_df : pandas DataFrame
data from one station from one type of observation
"""
# check if given dates are more or less similar than measurement dates
if (ts_df.index[0] - start).days < 1:
new_start = ts_df.index[0]
else:
new_start = pd.Timestamp(
year=start.year,
month=start.month,
day=start.day,
hour=ts_df.index[0].hour,
minute=ts_df.index[0].minute,
second=ts_df.index[0].second,
)
logger.info(f"station {stn} has no measurements before {ts_df.index[0]}")
if (end - ts_df.index[-1]).days < 0:
new_end = ts_df.index[-1]
else:
new_end = pd.Timestamp(
year=end.year,
month=end.month,
day=end.day,
hour=ts_df.index[-1].hour,
minute=ts_df.index[-1].minute,
second=ts_df.index[-1].second,
)
logger.info(f"station {stn} has no measurements after {ts_df.index[-1]}")
# add missing indices
new_index = pd.date_range(new_start, new_end, freq="D")
ts_df = ts_df.reindex(new_index)
return ts_df
[docs]def get_knmi_obslist(
locations: pd.DataFrame | None = None,
stns: list[int] | None = None,
xy: list[list[float]] | None = None,
meteo_vars: tuple[str] = ("RH",),
starts: pd.Timestamp | list[pd.Timestamp] | None = None,
ends: pd.Timestamp | list[pd.Timestamp] | None = None,
ObsClasses: list[Any] | None = None,
progress_callback=None,
**kwargs,
) -> list[Any]:
"""Get a list of observations of knmi stations. Either specify a list of
knmi stations (stns) or a dataframe with x, y coordinates (locations).
Parameters
----------
locations : pandas DataFrame or None
dataframe with x and y coordinates. The default is None
stns : list of int or None
list of knmi stations. The default is None
xy : list or numpy array, optional
xy coordinates of the locations. e.g. [[10,25], [5,25]]
meteo_vars : list or tuple of str
meteo variables e.g. ["RH", "EV24"]. The default is ("RH")
starts : None, str, datetime or list, optional
start date of observations per meteo variable. The start date is
included in the time series.
If start is None the start date will be January 1st of the
previous year.
if start is str it will be converted to datetime
if start is a list it should be the same length as meteo_vars and
the start time for each variable. The default is None
ends : list of str, datetime or None
end date of observations per meteo variable. The end date is
included in the time series.
If end is None the start date will be January 1st of the
previous year.
if end is a str it will be converted to datetime
if end is a list it should be the same length as meteo_vars and
the end time for each meteo variable. The default is None
ObsClasses : list of type or None
class of the observations, can be PrecipitationObs or
EvaporationObs. The default is None.
progress_callback : callable or None, optional
callback function that is called with (i, total) for each station
processed, where i is the zero-based index and total is the total
number of stations. The default is None.
**kwargs:
fill_missing_obs : bool, optional
if True nan values in time series are filled with nearby time series.
The default is False. Note: if the given stn has no data between start and
end the data from nearby stations is used. In this case the metadata of the
Observation is the metadata from the nearest station that has any
measurement in the given period.
fill_missing_obs_with_factor : bool, optional
if True, donor-station values are scaled with an overlap-based factor
before filling missing values. This automatically enables
fill_missing_obs.
The default is False.
interval : str, optional
desired time interval for observations. Options are 'daily' and
'hourly'. The default is 'daily'.
use_api : bool, optional
if True the api is used to obtain the data, API documentation is here:
https://www.knmi.nl/kennis-en-datacentrum/achtergrond/data-ophalen-vanuit-een-script
if False a text file is downloaded into a temporary folder and the
data is read from there. Default is True since the api is back
online (July 2021).
raise_exceptions : bool, optional
if True you get errors when no data is returned. The default is False.
Returns
-------
obs_list : list of observation objects
collection of multiple point observations
"""
settings = _get_default_settings(kwargs)
if isinstance(meteo_vars, str):
meteo_vars = [meteo_vars]
if starts is None:
starts = [None] * len(meteo_vars)
elif isinstance(starts, (str, dt.datetime)):
starts = [starts] * len(meteo_vars)
elif isinstance(starts, list):
pass
else:
raise TypeError("must be None, str, dt.datetime or list")
if ends is None:
ends = [None] * len(meteo_vars)
elif isinstance(ends, (str, dt.datetime)):
ends = [ends] * len(meteo_vars)
elif isinstance(ends, list):
pass
else:
raise TypeError("must be None, str, dt.datetime or list")
if not isinstance(xy, (tuple, list, np.ndarray, type(None))):
raise TypeError("must be tuple, list or numpy.ndarray")
obs_list = []
for meteo_var, start, end, ObsClass in zip(meteo_vars, starts, ends, ObsClasses):
start = start if start is None else pd.to_datetime(start)
end = end if end is None else pd.to_datetime(end)
# get stations
if stns is None:
stations = get_stations(meteo_var=meteo_var, start=start, end=end)
if (locations is None) and (xy is not None):
_stns = get_nearest_station_xy(
xy, stations=stations, meteo_var=meteo_var
)
elif locations is not None:
_stns = get_nearest_station_df(
locations,
stations=stations,
meteo_var=meteo_var,
start=start,
end=end,
)
else:
raise ValueError(
"stns, location and xy are all None. "
"Please specify one of these arguments."
)
_stns = np.unique(_stns)
if locations is not None:
xy = locations[["x", "y"]].values
else:
_stns = stns
for i, stn in enumerate(_stns):
if progress_callback is not None:
progress_callback(i, len(_stns))
o = ObsClass.from_knmi(
meteo_var=meteo_var,
stn=stn,
start=start,
end=end,
**settings,
)
obs_list.append(o)
return obs_list
[docs]def get_evaporation(
meteo_var: str,
stn: int = 260,
start: pd.Timestamp | None = None,
end: pd.Timestamp | None = None,
settings: dict[str, Any] | None = None,
) -> pd.DataFrame:
"""Collect different types of (reference) evaporation
from KNMI weather stations
Parameters
----------
meteo_var : str
Choice between 'penman', 'makkink' or 'hargraves'.
stn : str
station number, defaults to 260 De Bilt
start : pd.Timestamp
start time of observations.
end : pd.Timestamp
end time of observations.
settings : dict or None, optional
settings for the time series
Returns
------
pandas.DataFrame
"""
if meteo_var == "hargreaves":
if not settings["use_api"]:
raise NotImplementedError(
"cannot use hargreaves without the api because hargreaves needs "
"the lattitude"
)
d = {}
mvs = ["TG", "TN", "TX"]
for mv in mvs:
ts, meta = get_timeseries_stn(
stn, meteo_var=mv, start=start, end=end, settings=settings
)
d[mv] = ts.squeeze()
et = hargreaves(
d["TG"],
d["TN"],
d["TX"],
d["TG"].index,
meta.get("lat", 52.1),
).to_frame(name=meteo_var)
elif meteo_var == "makkink":
d = {}
mvs = ["TG", "Q"]
for mv in mvs:
ts, meta = get_timeseries_stn(
stn, meteo_var=mv, start=start, end=end, settings=settings
)
d[mv] = ts.squeeze()
et = makkink(d["TG"], d["Q"]).to_frame(name=meteo_var)
elif meteo_var == "penman":
if not settings["use_api"]:
raise NotImplementedError(
"cannot use penman without the api because penman needs the "
"lattitude and height of the meteo station"
)
d = {}
mvs = ["TG", "TN", "TX", "Q", "FG", "UG"]
for mv in mvs:
ts, meta = get_timeseries_stn(
stn, meteo_var=mv, start=start, end=end, settings=settings
)
d[mv] = ts.squeeze()
et = penman(
d["TG"],
d["TN"],
d["TX"],
d["Q"],
d["FG"],
d["UG"],
d["TG"].index,
meta.get("lat", 52.1),
meta.get("altitude", 0.0),
).to_frame(name=meteo_var)
else:
raise ValueError(
"Provide valid argument for meteo_var -> 'hargreaves', 'makkink' or "
"'penman'"
)
stn_name = get_station_name(meta["station"], stations=get_stations(meteo_var))
meta["name"] = f"{meteo_var}_{stn_name}_{stn}"
meta["location"] = stn_name
meta["unit"] = "m"
return et, meta
[docs]def makkink(tmean: pd.Series, K: pd.Series) -> pd.Series:
"""Estimate of Makkink reference evaporation
according to KNMI.
Parameters
----------
tmean : tmean : pandas.Series
Daily mean temperature in Celsius
K : pandas.Series
Global radiation estimate in J/cm2
Returns
-------
pandas.Series
"""
a = 0.646 + 0.0006 * tmean
b = 1 + tmean / 237.3
c = 7.5 * np.log(10) * 6.107 * 10 ** (7.5 * (1 - 1 / b))
et = 0.0065 * (1 - a / (c / (237.3 * b * b) + a)) / (2501 - 2.38 * tmean) * K
return et
[docs]def penman(
tmean: pd.Series,
tmin: pd.Series,
tmax: pd.Series,
K: pd.Series,
wind: pd.Series,
rh: pd.Series,
dates: pd.Series,
z: float = 1.0,
lat: float = 52.1,
G: float = 0.0,
wh: float = 10.0,
tdew: pd.Series | None = None,
) -> pd.Series:
"""Estimate of Penman reference evaporation
according to Allen et al 1990.
Parameters
----------
tmean : pandas.Series
Daily mean temperature in Celsius
tmin : pandas.Series
Daily minimum temperature in Celsius
tmax : pandas.Series
Daily maximum temperature in Celsius
K : pandas.Series
Global radiation estimate in J/cm2
wind : pandas.Series
Daily mean wind speed in m/s
rh : pandas.Series
Relative humidity in %
dates : pandas.Series
Dates
z : float, optional
Elevation of station in m, by default 1.0
lat : float, optional
Latitude of station, by default 52.1
G : float, optional
Ground flux in MJ/m2, by default 0.0
wh : float, optional
Height of wind measurement in m, by default 10.0
tdew : pandas.Series
Dew point temperature in C, by default None
Returns
-------
pandas.Series
"""
K = K * 1e4 * 0.000012 # J/cm2 to W/m2/d
P = 101.3 * ((293 - 0.0065 * z) / 293) ** 5.26 # kPa
gamma = 0.665e-3 * P # kPa/C
tg = (tmax - tmin) / 2 # C
s = 4098 * (0.6108 * np.exp(17.27 * tg / (tg + 237.3)) / (tg + 237.3) ** 2) # kPa/C
es0 = 0.6108 * np.exp(17.27 * tmean / (tmean + 237.3)) # kPa
esmax = 0.6108 * np.exp(17.27 * tmax / (tmax + 237.3)) # kPa
esmin = 0.6108 * np.exp(17.27 * tmin / (tmin + 237.3)) # kPa
es = (esmax + esmin) / 2 # kpa
if tdew:
ea = 0.6108 * np.exp(17.27 * tdew / (tdew + 237.3)) # kPa
else:
ea = es0 * rh / 100 # kPa
u2 = wind * 4.87 / np.log(67.8 * wh - 5.42) # m/s
J = pd.Series([int(date.strftime("%j")) for date in dates], index=dates)
dr = 1 + 0.033 * np.cos(2 * np.pi * J / 365) # -
delt = 0.409 * np.sin(2 * np.pi * J / 365 - 1.39) # rad
phi = lat * np.pi / 180 # rad
ws = np.arccos(-np.tan(phi) * np.tan(delt)) # rad
Kext = (
1366
* dr
/ np.pi
* (ws * np.sin(phi) * np.sin(delt) + np.cos(phi) * np.cos(delt) * np.sin(ws))
) # W/m2/d
K0 = Kext * (0.75 + 2e-5 * z) # W/m2/d
Lout_in = (
5.67e-8
* (((tmax + 273) ** 4 + (tmin + 273) ** 4) / 2)
* (0.34 - 0.14 * np.sqrt(ea))
* (1.35 * K / K0 - 0.35)
) # W/m2/d
Q = (1 - 0.23) * K + Lout_in # W/m2/d
Q = Q * 0.0864 # W/m2/d to MJ/m2
straling = 0.408 * s * (Q - G)
windterm = gamma * 900 / (tmean + 273) * u2 * (es - ea)
denom = s + gamma * (1 + 0.34 * u2)
et = (straling + windterm) / denom * 1e-3
et[et < 0] = 0
return et
[docs]def hargreaves(
tmean: pd.Series,
tmin: pd.Series,
tmax: pd.Series,
dates: pd.Series,
lat: float = 52.1,
x: list[float] | None = None,
) -> pd.Series:
"""Estimate of Hargraves potential evaporation
according to Allen et al. 1990.
Parameters
----------
tmean : pandas.Series
Daily mean temperature
tmin : pandas.Series
Daily minimum temperature
tmax : pandas.Series
Daily maximum temperature
dates : pandas.Series.index
Dates
lat : float, optional
Latitude of station, by default 52.1
x : _type_, optional
Optional parameter to scale evaporation estimate, by default None
Returns
-------
pandas.Series
"""
J = pd.Series([int(date.strftime("%j")) for date in dates], index=dates)
dr = 1 + 0.033 * np.cos(2 * np.pi * J / 365)
delt = 0.409 * np.sin(2 * np.pi * J / 365 - 1.39)
phi = lat * np.pi / 180
ws = np.arccos(-np.tan(phi) * np.tan(delt))
Kext = (
12
* 60
* 0.0820
* dr
/ np.pi
* (ws * np.sin(phi) * np.sin(delt) + np.cos(phi) * np.cos(delt) * np.sin(ws))
) # MJ/m2/d
Kext = 0.408 * Kext # MJ/m2/d to mm/d
et = 0.0023 * (tmean + 273 + 17.8) / np.sqrt(tmax - tmin) * Kext * 1e-3
if x:
et = x[0] + x[1] * et
return et
[docs]def get_stations_scenarios() -> pd.DataFrame:
"""Get KNMI station information for climate scenarios."""
response = requests.get(URL_STATIONS)
json_data = response.json()
df = pd.DataFrame(json_data["stations"])
df.index = df.loc[:, "key"].str.split("_").str[0].rename("stn")
return df
[docs]def get_knmi_scenarios_data(
stn: int | str,
years: Iterable[KNMI_CLIMATE_YEARS] = ("2033", "2050", "2100", "2150"),
scenarios: Iterable[KNMI_CLIMATE_SCENARIOS] = ("Ld", "Ln", "Md", "Mn", "Hd", "Hn"),
evap: Literal["EV24", "makkink", "penman", "hargreaves"] = "EV24",
) -> dict[str, pd.DataFrame]:
"""Fetch and process KNMI climate scenario data for a station.
The station argument is accepted as an integer or string for convenience.
Internally it is converted to a string when interacting with the KNMI API.
Retrieves climate scenario data from KNMI and returns a dictionary of
processed DataFrames with temperature, precipitation, and evaporation data.
Parameters
----------
stn : int or str
Station number (e.g., 550 or "550").
years : tuple, optional
Years of climate scenario. The default is ('2033','2050','2100','2150').
scenarios : tuple, optional
Names of climate scenario. The default is ('Ld','Ln','Md','Mn','Hd','Hn').
This includes all scenarios including the original measurements.
evap : str, optional
Method for calculating evaporation. Options are 'EV24', 'makkink', 'penman',
or 'hargreaves'. The default is 'EV24'.
Returns
-------
dict
Dictionary mapping scenario names to pandas DataFrames with processed
climate data. Each DataFrame has a datetime index and columns:
TG (temperature), RH (precipitation), Q (radiation), TX, TN, UG, FG,
and EV24 (evaporation).
Raises
------
RuntimeError
If the API request fails or data cannot be retrieved.
"""
# allow int input for station
stn = str(stn)
# Get station KNMI ID
stations = get_stations_scenarios()
if stn not in stations.index:
raise KeyError(
f"Station {stn} not found in KNMI climate scenario station list."
"Check knmi.get_stations_scenarios() to see what stations are available."
)
station = stations.at[stn, "key"]
# Build request parameters
params = (
[("series_variables[scenarios][]", s) for s in scenarios]
+ [("series_variables[years][]", y) for y in years]
+ [
("series_variables[station]", station),
("series_variables[date_range][]", "1991-01-01"),
("series_variables[date_range][]", "2020-12-31"),
("series_variables[climate_variables]", "temp"),
]
)
# Download data from KNMI API
response = requests.get(URL_KNMI_TRANSFORMED_SERIES, params=params)
response.raise_for_status()
zipped = ZipFile(BytesIO(response.content))
# Read and process raw CSV files
column_renamed = {
"temp": "TG",
"precip": "RD",
"radiation": "Q",
"max-temp": "TX",
"min-temp": "TN",
"rel-humidity": "UG",
"windspeed": "FG",
}
dfs = {}
for name in zipped.namelist():
if name.endswith(".csv"):
base = os.path.splitext(name)[0]
base_ext = base.split("_")[-1]
df = pd.read_csv(
zipped.open(name),
sep=",",
skiprows=3 if base_ext == "obs" else 4,
usecols=list(range(8)),
index_col=0,
parse_dates=True,
date_format="%Y%m%d",
)
df.columns = [column_renamed[x.strip()] for x in df.columns]
df.index.name = "date"
if -99.99 in df.values:
logger.info(
f"Station {stn} scenario {base_ext} contains -99.99 values, replacing with NaN."
)
df = df.replace(-99.99, np.nan)
# Calculate evaporation based on selected method
if evap in ("EV24", "makkink"):
K = df["Q"] * 8.64 # Convert from W/m² to J/cm²/day: 60*60*24/10000
df["EV24"] = makkink(tmean=df["TG"], K=K)
elif evap == "penman":
df["EV24"] = penman(
tmean=df["TG"],
tmin=df["TN"],
tmax=df["TX"],
K=df["Q"] * 8.64, # Convert from W/m² to J/cm²/day
wind=df["FG"],
rh=df["UG"],
dates=df.index,
)
elif evap == "hargreaves":
df["EV24"] = hargreaves(
tmean=df["TG"],
tmin=df["TN"],
tmax=df["TX"],
dates=df.index,
lat=stations.at[stn, "lat"],
)
else:
raise ValueError(
f"Unknown evaporation method: {evap}. "
"Choose from 'EV24', 'makkink', 'penman', or 'hargreaves'."
)
# make sure RD unit is m/d, same as normal knmi data
df["RD"] = df["RD"].multiply(1e-3)
dfs[base] = df
else:
logger.warning(f"Unexpected file in zip: {name}")
return dfs
[docs]def get_knmi_scenarios_obs_list(
stn: int | str,
years: Iterable[KNMI_CLIMATE_YEARS] = ("2033", "2050", "2100", "2150"),
scenarios: Iterable[KNMI_CLIMATE_SCENARIOS] = ("Ld", "Ln", "Md", "Mn", "Hd", "Hn"),
evap: Literal["EV24", "makkink", "penman", "hargreaves"] = "EV24",
meteo_vars: Iterable[Literal["TG", "RD", "Q", "TX", "TN", "UG", "FG", "EV24"]]
| None = None,
ObsClass: dict[str, Any] | None = None,
) -> list[Any]:
"""Convert climate scenario dataframes into observation objects.
Parameters
----------
stn : int or str
Station number (e.g., 550 or "550").
years : tuple, optional
Years of climate scenario. The default is ('2033','2050','2100','2150').
scenarios : tuple, optional
Names of climate scenario. The default is ('Ld','Ln','Md','Mn','Hd','Hn').
This includes all scenarios including the original measurements.
evap : Literal["EV24", "makkink", "penman", "hargreaves"], optional
Method for calculating evaporation. Options are 'EV24', 'makkink', 'penman',
or 'hargreaves'. The default is 'EV24'.
meteo_vars : iterable of str or None, optional
Meteorological variables to include in the ObsCollection. Possible
variables include 'TG', 'RD', 'Q', 'TX', 'TN', 'UG', 'FG', and 'EV24'.
If None (default), all available variables are included.
ObsClass : dict[str, PrecipitationObs | EvaporationObs | MeteoObs]
Dictionary mapping variable names to observation classes. The function will
use these classes to instantiate the observations.
Returns
-------
list
List of instantiated observation objects. Each object has ``station``
and ``meteo_var`` attributes set in addition to the usual metadata.
"""
if ObsClass is None:
raise ValueError(
"ObsClass must be provided to map variables to observation classes."
)
# Get measurements data
dfs = get_knmi_scenarios_data(
stn=stn,
years=years,
scenarios=scenarios,
evap=evap,
)
units = {
"TG": "°C",
"RD": "m/day",
"Q": "W/m²",
"TX": "°C",
"TN": "°C",
"UG": "%",
"FG": "m/s",
"EV24": "m/day",
}
meteo_vars = list(units) if meteo_vars is None else meteo_vars
stations = get_stations("RD")
obs_list = []
for key, df in dfs.items():
for col in df.columns:
# apply optional filter
if col not in meteo_vars:
logger.debug(
f"Skipping variable {col} as it is not in"
f"the provided meteo_vars {meteo_vars} list."
)
continue
meas = pd.DataFrame(index=df.index, data={col: df[col]})
stn_num = int(key.split("_")[0])
variable = "".join(col.split())
scenario = key.split("_")[
-1
] # includes year and scenario name, e.g. "2033_Ld"
location = key.split("_")[1].upper()
obs_cls = ObsClass[variable]
o = obs_cls(
meas,
name=f"{variable}_{stn_num}_{location}_{scenario}",
unit=units.get(variable, ""),
source=f"KNMI-Climate-Scenario-{scenario}",
x=stations.loc[stn_num, "x"],
y=stations.loc[stn_num, "y"],
location=location,
station=stn_num,
meteo_var=variable,
meta={"scenario": scenario},
)
obs_list.append(o)
return obs_list