"""
Module with functions to read or download time series with observations from knmi.
function levels:
1. get_obs_list_from_extent: list of observations from extent
2. get_obs_list_from_codes: list of observations from codes
3. get_lizard_groundwater: get single obs object
4. get_timeseries_tube: get timeseries for a tube
5. get_timeseries_uuid: get timeseries for a uuid
"""
import logging
import math
import pathlib
import warnings
from concurrent.futures import ThreadPoolExecutor
import geopandas
import pandas as pd
import requests
from pyproj import Transformer
from shapely.geometry import Polygon
from tqdm import tqdm
logger = logging.getLogger(__name__)
# TODO:
# - check transformation from EPSG:28992 to WGS84 (elsewhere in hydropandas we use
# another definition for EPSG:28992 that is provided in util.py)
# Generic Lizard API endpoint (with 'organisation' as placeholder, following the Lizard documentation
lizard_api_endpoint = "https://{organisation}.lizard.net/api/v4/"
[docs]def check_status_obs(metadata, timeseries):
"""Checks if a monitoring tube is still active.
If there are no measurements in the last 180 days, the monitoring
tube is considered inactive.
Parameters
----------
metadata : dict
metadata of the monitoring tube
timeseries : pandas DataFrame
timeseries of the monitoring well
Returns
-------
metadata DataFrame including the status of the monitoring well
"""
if timeseries.empty:
metadata["status"] = "no timeseries available"
return metadata
last_measurement_date = timeseries.last_valid_index()
today = pd.to_datetime("today").normalize()
if today - last_measurement_date < pd.Timedelta(days=180):
metadata["status"] = "active"
else:
metadata["status"] = "inactive"
return metadata
[docs]def extent_to_wgs84_polygon(extent):
"""Translates an extent (xmin, xmax, ymin, ymax) to a polygon with coordinate system
WGS84.
Parameters
----------
extent : list or tuple
extent in epsg 28992 within which the observations are collected.
Returns
-------
polygon of the extent with coordinate system WGS84
"""
transformer = Transformer.from_crs("EPSG:28992", "WGS84")
lon_min, lat_min = transformer.transform(extent[0], extent[2])
lon_max, lat_max = transformer.transform(extent[1], extent[3])
poly_T = Polygon(
[(lat_min, lon_min), (lat_max, lon_min), (lat_max, lon_max), (lat_min, lon_max)]
)
return poly_T
[docs]def translate_flag(timeseries):
"""Translates Vitens Lizard flags from integer to text.
Parameters
----------
timeseries : pandas.DataFrame
timeseries of a monitoring well with flags
Returns
-------
timeseries : pandas.DataFrame
timeseries with translated quality flags
"""
translate_dic = {
0: "betrouwbaar",
1: "betrouwbaar",
2: "onbeslist",
3: "onbeslist",
4: "onbeslist",
6: "onbetrouwbaar",
7: "onbetrouwbaar",
99: "ongevalideerd",
-99: "verwijderd",
}
timeseries["flag_description"] = timeseries["flag"].replace(translate_dic)
return timeseries
def _prepare_API_input(nr_pages, url_groundwater):
"""Get API data pages within the defined extent.
Parameters
----------
nr_pages : int
number of the pages on which the information is stored
url_groundwater : str
location of the used API to extract the data
Returns
-------
urls : list
list of the page number and the corresponding url
"""
urls = []
for page in range(nr_pages):
true_page = page + 1 # The real page number is attached to the import thread
urls += [url_groundwater + f"&page={true_page}"]
return urls
def _download(url, timeout=1800, auth=None):
"""Function to download the data from the API using the ThreadPoolExecutor.
Parameters
----------
url : str
url of an API page
timeout : int, optional
number of seconds to wait before terminating request
auth : tuple, optional
authentication credentials for the API request, e.g.: ("__key__", your_api_key)
Returns
-------
dictionary with timeseries data
"""
try:
r = requests.get(url=url, timeout=timeout, auth=auth)
r.raise_for_status()
data = r.json()["results"]
except requests.exceptions.Timeout:
logger.error(f"Timeout while requesting {url}. Please check your connection.")
data = []
return data
def _split_mw_tube_nr(code):
"""get the tube number from a code that consists of the name and the tube number.
Parameters
----------
code : str
name + tube_nr. e.g. 'BUWP014-11' or 'BUWP014012'
Returns
-------
monitoring well, tube_number (str, int)
Notes
-----
The format of the name + tube_nr is not very consistent and this function may need
further finetuning.
"""
if code[-3:].isdigit():
return code[:-3], int(code[-3:])
else:
# assume there is a '-' to split name and filter number
tube_nr = code.split("-")[-1]
return code.strip(f"-{tube_nr}"), int(tube_nr)
def _extract_timeseries_info_from_tube(mtd_tube, auth=None):
"""
Extracts timeseries information (hand/diver UUIDs and types) from a tube/filter dict.
Parameters
----------
mtd_tube : dict
Tube/filter metadata dictionary.
auth : tuple, optional
Authentication credentials for the API request, e.g.: ("__key__", your_api_key)
Returns
-------
dict
Dictionary with timeseries info and type.
"""
info = {}
if not mtd_tube["timeseries"]:
info["timeseries_type"] = None
return info
for series in mtd_tube["timeseries"]:
r = requests.get(series, auth=auth)
r.raise_for_status()
series_info = r.json()
# Note: See Github issue #311 for an explanation of 'wns_string'
wns_string = series_info["code"].split(":", 1)[0]
if wns_string == "WNS9040.hand":
info["uuid_hand"] = series_info["uuid"]
info["start_hand"] = series_info["start"]
elif wns_string == "WNS9040":
info["uuid_diver"] = series_info["uuid"]
info["start_diver"] = series_info["start"]
elif wns_string == "WNS9040.val":
info["uuid_diver_validated"] = series_info["uuid"]
info["start_diver_validated"] = series_info["start"]
info["end_diver_validated"] = series_info["end"]
# Create string with all timeseries types
ts_types = []
if info.get("start_hand") is not None:
ts_types.append("hand")
if info.get("start_diver") is not None:
ts_types.append("diver")
if info.get("start_diver_validated") is not None:
ts_types.append("diver_validated")
info["timeseries_type"] = " + ".join(ts_types) if ts_types else None
return info
[docs]def get_timeseries_uuid(
uuid, tmin, tmax, page_size=100000, nr_threads=10, organisation="vitens", auth=None
):
"""
Get the time series (hand or diver) using the uuid.
Handles pagination to retrieve all results across multiple pages.
Parameters
----------
uuid : str
Universally Unique Identifier of the tube and type of time series.
tmin : str YYYY-m-d
start of the observations, by default the entire serie is returned
tmax : str YYYY-m-d
end of the observations, by default the entire serie is returned
page_size : int, optional
Query parameter which can extend the response size. The default is 100000.
nr_threads : int, optional
number of threads to use for the API requests, default is 10
organisation : str, optional
organisation as used by Lizard, currently only "vitens" is officially supported.
auth : tuple, optional
authentication credentials for the API request, e.g.: ("__key__", your_api_key)
Returns
-------
pd.DataFrame
pandas DataFrame with the timeseries of the monitoring well
"""
base_url = lizard_api_endpoint.format(organisation=organisation)
url_timeseries = f"{base_url}timeseries/{uuid}/events/"
if tmin is not None:
tmin = pd.to_datetime(tmin).isoformat("T")
if tmax is not None:
tmax = pd.to_datetime(tmax).isoformat("T")
params = {"start": tmin, "end": tmax, "page_size": page_size}
# First request to get total count and determine pagination needs
first_response = requests.get(url=url_timeseries, params=params, auth=auth)
first_response.raise_for_status()
first_data = first_response.json()
total_count = first_data.get("count", 0)
time_series_events = first_data.get("results", [])
if total_count == 0:
return pd.DataFrame()
# Calculate number of pages needed
nr_pages = math.ceil(total_count / page_size)
# logger.info(f"Number of timeseries events: {total_count}")
# logger.info(f"Number of pages: {nr_pages}")
# If only one page, return first page results
if nr_pages <= 1:
pass # time_series_events already contains first page data
else:
# Multi-page: use existing helper functions with ThreadPoolExecutor
from urllib.parse import urlencode
base_url_with_params = url_timeseries + "?" + urlencode(params)
# Prepare URLs for all pages using existing helper
urls = _prepare_API_input(nr_pages, base_url_with_params)
# Adjust nr_threads if more threads than pages
nr_threads = min(nr_threads, nr_pages)
# Download all pages in parallel using existing helper
with ThreadPoolExecutor(max_workers=nr_threads) as executor:
all_page_results = []
for result in tqdm(
executor.map(lambda url: _download(url, auth=auth), urls),
total=nr_pages,
desc=f"Downloading timeseries {uuid}",
):
all_page_results.extend(result)
# Use results from all pages
time_series_events = all_page_results
# Convert to DataFrame and process (existing logic)
if not time_series_events:
return pd.DataFrame()
time_series_df = pd.DataFrame(time_series_events)
time_series_df = translate_flag(time_series_df)
timeseries_sel = time_series_df.loc[:, ["time", "value", "flag", "comment"]]
timeseries_sel["time"] = pd.to_datetime(
timeseries_sel["time"], format="%Y-%m-%dT%H:%M:%SZ", errors="coerce"
) + pd.DateOffset(hours=1)
timeseries_sel = timeseries_sel[~timeseries_sel["time"].isnull()]
timeseries_sel.set_index("time", inplace=True)
timeseries_sel.index.rename("peil_datum_tijd", inplace=True)
logger.info(
f"Successfully retrieved {len(timeseries_sel)} timeseries events for UUID {uuid}"
)
return timeseries_sel
def _filter_timeseries(ts_dict, datafilters):
"""
Generic filter function for multiple timeseries.
Parameters
----------
ts_dict : dict
Dictionary of timeseries DataFrames, e.g. {'hand': df1, 'diver': df2, ...}
datafilters : list of str
List of datafilter names as strings, e.g. ["remove_unvalidated_diver_values_when_validated_available", "remove_hand_during_diver_period"]
Returns
-------
dict
Filtered ts_dict.
"""
# Define standard filters by name. Note that the order may be relevant (uppermost filter is applied first).
standard_datafilters = {
"remove_unvalidated_diver_values_when_validated_available": {
"target": "diver",
"action": "remove_before",
"reference": "diver_validated",
"how": "max",
},
"remove_hand_during_diver_period": {
"target": "hand",
"action": "remove_between",
"reference": ["diver", "diver_validated"],
"how": "range",
},
}
# Convert string datafilters to dicts using standard_datafilters
datafilter_dicts = []
for f in datafilters:
if isinstance(f, str):
if f in standard_datafilters:
datafilter_dicts.append(standard_datafilters[f])
else:
raise ValueError(f"Unknown filter name: {f}")
else:
raise TypeError(
"Each filter must be a string referring to a standard filter."
)
ts_dict = {k: v.copy() for k, v in ts_dict.items()}
for f in datafilter_dicts:
targets = f["target"] if isinstance(f["target"], list) else [f["target"]]
refs = f["reference"] if isinstance(f["reference"], list) else [f["reference"]]
how = f.get("how", "range")
on = f.get("on", None)
for target in targets:
df = ts_dict.get(target)
if df is None or df.empty:
continue
mask = pd.Series(True, index=df.index)
for ref in refs:
ref_df = ts_dict.get(ref)
if ref_df is None or ref_df.empty:
continue
idx = ref_df.index if on is None else ref_df[on]
if f["action"] == "remove_before":
val = idx.max() if how == "max" else idx.min()
mask &= df.index > val
elif f["action"] == "remove_after":
val = idx.min() if how == "min" else idx.max()
mask &= df.index < val
elif f["action"] == "remove_between":
mask &= ~df.index.to_series().between(idx.min(), idx.max())
elif f["action"] == "keep_only":
mask &= df.index.to_series().between(idx.min(), idx.max())
# Add more actions as needed
ts_dict[target] = df[mask]
return ts_dict
[docs]def get_timeseries_tube(
tube_metadata,
tmin,
tmax,
type_timeseries=None, # deprecated argument
which_timeseries=("hand", "diver"), # new preferred argument
datafilters=None,
combine_method="merge",
organisation="vitens",
auth=None,
):
"""
Extracts specified timeseries for a tube and combines them as requested.
Parameters
----------
tube_metadata : dict
metadata of a tube
tmin : str YYYY-m-d, optional
start of the observations
tmax : str YYYY-m-d, optional
end of the observations
type_timeseries : str, optional (deprecated)
deprecated, use 'which_timeseries' and 'combine_method' instead.
which_timeseries : tuple of str, optional
Which timeseries to retrieve. Options: "hand", "diver", "diver_validated".
Defaults to ("hand", "diver") (which should be correct for Vitens).
datafilters : list of strings, optional
Methods to filter the timeseries data.
If None (default), all measurements will be shown.
Currently implemented datafilter methods:
"remove_unvalidated_diver_values_when_validated_available": Removes diver values before last date with validated diver.
"remove_hand_during_diver_period": Removes hand measurements during periods where diver or diver_validated measurements are available.
combine_method : str, optional
"merge" (vertical stack with 'origin' column) or "columns" (side-by-side columns).
If None, defaults to "merge".
organisation : str, optional
organisation as used by Lizard.
auth : tuple, optional
authentication credentials for the API request, e.g.: ("__key__", your_api_key)
Returns
-------
measurements : pandas DataFrame
timeseries of the monitoring well
metadata_df : dict
metadata of the monitoring well
"""
# Deprecation warning for type_timeseries
if type_timeseries is not None:
warnings.warn(
"the argument 'type_timeseries' is deprecated and will eventually be "
"removed, please use the arguments 'which_timeseries' and 'combine_method'.",
DeprecationWarning,
)
# Map old type_timeseries to which_timeseries and combine_method
if type_timeseries == "combine":
combine_method = "columns"
elif type_timeseries == "merge":
combine_method = "merge"
else:
which_timeseries = [type_timeseries]
combine_method = "merge"
if tube_metadata["timeseries_type"] is None:
return pd.DataFrame(), tube_metadata
# Fetch requested timeseries
ts_dict = {}
for ts_type in which_timeseries:
uuid_key = f"uuid_{ts_type}"
start_key = f"start_{ts_type}"
if tube_metadata.get(start_key) is not None:
ts = get_timeseries_uuid(
tube_metadata.get(uuid_key),
tmin,
tmax,
organisation=organisation,
auth=auth,
)
else:
ts = pd.DataFrame()
ts_dict[ts_type] = ts
# Filter
if datafilters is not None:
ts_dict_filtered = _filter_timeseries(ts_dict, datafilters)
else:
ts_dict_filtered = ts_dict
# Combine as requested
if combine_method == "columns":
# Side-by-side
if not ts_dict_filtered.get("hand", pd.DataFrame()).empty:
ts_dict_filtered["hand"] = ts_dict_filtered["hand"].rename(
columns={"value": "value_hand", "flag": "flag_hand"}
)
if not ts_dict_filtered.get("diver", pd.DataFrame()).empty:
ts_dict_filtered["diver"] = ts_dict_filtered["diver"].rename(
columns={"value": "value_diver", "flag": "flag_diver"}
)
if not ts_dict_filtered.get("diver_validated", pd.DataFrame()).empty:
ts_dict_filtered["diver_validated"] = ts_dict_filtered[
"diver_validated"
].rename(
columns={
"value": "value_diver_validated",
"flag": "flag_diver_validated",
}
)
dfs = [
df
for key in ["hand", "diver_validated", "diver"]
if key in ts_dict_filtered and not ts_dict_filtered[key].empty
for df in [ts_dict_filtered[key]]
]
measurements = pd.concat(dfs, axis=1) if dfs else pd.DataFrame()
# Only keep present expected columns
expected_cols = [
"value_hand",
"value_diver_validated",
"value_diver",
"flag_hand",
"flag_diver_validated",
"flag_diver",
]
present_cols = [col for col in expected_cols if col in measurements.columns]
if not measurements.empty:
measurements = measurements.loc[:, present_cols]
elif combine_method == "merge":
# Default: merge (vertical stack)
dfs = []
for key in which_timeseries:
df = ts_dict_filtered.get(key)
if df is not None and not df.empty:
df = df.copy()
df["origin"] = key
dfs.append(df)
measurements = pd.concat(dfs, axis=0).sort_index() if dfs else pd.DataFrame()
else:
raise ValueError(
f'{combine_method=} not supported, choose between "merge" and "columns".'
)
return measurements, tube_metadata
[docs]def get_lizard_groundwater(
code,
tube_nr=None,
tmin=None,
tmax=None,
type_timeseries=None, # deprecated argument
which_timeseries=("hand", "diver"), # new preferred argument
datafilters=None,
combine_method="merge",
only_metadata=False,
organisation="vitens",
auth=None,
):
"""Extracts the metadata and timeseries of an observation well from a LIZARD-API
based on the code of a monitoring well.
Parameters
----------
code : str
code of the measuring well, e.g. '27B-0444'
tube_nr : int, optional
select specific tube top
Default selects tube_nr = 1
tmin : str YYYY-m-d, optional
start of the observations, by default the entire serie is returned
tmax : str YYYY-m-d, optional
end of the observations, by default the entire serie is returned
type_timeseries : str, optional (deprecated)
Old keyword, use which_timeseries instead.
which_timeseries : tuple of str, optional
Which timeseries to retrieve. Options: "hand", "diver", "diver_validated".
Defaults to ("hand", "diver") (which should be correct for Vitens).
datafilters : list of strings, optional
Methods to filter the timeseries data.
If None (default), all measurements will be shown.
Currently implemented datafilter methods:
"remove_unvalidated_diver_values_when_validated_available": Removes diver values before last date with validated diver.
"remove_hand_during_diver_period": Removes hand measurements during periods where diver or diver_validated measurements are available.
combine_method : str, optional
"merge" (vertical stack with 'origin' column) or "columns" (side-by-side columns).
If None, defaults to "merge".
only_metadata : bool, optional
if True only metadata is returned and no time series data. The
default is False.
organisation : str, optional
organisation as used by Lizard, currently only "vitens" is officially supported.
auth : tuple, optional
authentication credentials for the API request, e.g.: ("__key__", your_api_key)
Returns
-------
measurements : pd.DataFrame
returns a DataFrame with metadata and timeseries
tube_metadata : dict
dictionary containing metadata
"""
groundwaterstation_metadata = get_metadata_mw_from_code(
code, organisation=organisation, auth=auth
)
tube_metadata = get_metadata_tube(groundwaterstation_metadata, tube_nr, auth=auth)
if only_metadata:
return pd.DataFrame(), tube_metadata
measurements, tube_metadata = get_timeseries_tube(
tube_metadata,
tmin,
tmax,
type_timeseries=type_timeseries,
which_timeseries=which_timeseries,
datafilters=datafilters,
combine_method=combine_method,
organisation=organisation,
auth=auth,
)
tube_metadata = check_status_obs(tube_metadata, measurements)
return measurements, tube_metadata
[docs]def get_obs_list_from_codes(
codes,
ObsClass,
tube_nr="all",
tmin=None,
tmax=None,
type_timeseries=None, # deprecated argument
which_timeseries=("hand", "diver"), # new preferred argument
datafilters=None,
combine_method="merge",
only_metadata=False,
organisation="vitens",
auth=None,
):
"""Get all observations from a list of codes of the monitoring wells and a list of
tube numbers.
Parameters
----------
codes : list of str or str
codes of the monitoring wells
ObsClass : type
class of the observations, e.g. GroundwaterObs
tube_nr : lst of str
list of tube numbers of the monitoring wells that should be selected.
By default 'all' available tubes are selected.
tmin : str YYYY-m-d, optional
start of the observations, by default the entire serie is returned
tmax : str YYYY-m-d, optional
end of the observations, by default the entire serie is returned
type_timeseries : str, optional (deprecated)
Old keyword, use which_timeseries instead.
which_timeseries : tuple of str, optional
Which timeseries to retrieve. Options: "hand", "diver", "diver_validated".
Defaults to ("hand", "diver") (which should be correct for Vitens).
datafilters : list of strings, optional
Methods to filter the timeseries data.
If None (default), all measurements will be shown.
Currently implemented datafilter methods:
"remove_unvalidated_diver_values_when_validated_available": Removes diver values before last date with validated diver.
"remove_hand_during_diver_period": Removes hand measurements during periods where diver or diver_validated measurements are available.
combine_method : str, optional
"merge" (vertical stack with 'origin' column) or "columns" (side-by-side columns).
If None, defaults to "merge".
only_metadata : bool, optional
if True only metadata is returned and no time series data. The
default is False.
organisation : str, optional
organisation as used by Lizard, currently only "vitens" is officially supported.
auth : tuple, optional
authentication credentials for the API request, e.g.: ("__key__", your_api_key)
Returns
-------
obs_list
list of observations
"""
if isinstance(codes, str):
codes = [codes]
if not hasattr(codes, "__iter__"):
raise TypeError("argument 'codes' should be an iterable")
obs_list = []
for code in codes:
groundwaterstation_metadata = get_metadata_mw_from_code(
code, organisation=organisation, auth=auth
)
tubes = []
if tube_nr == "all":
for metadata_tube in groundwaterstation_metadata["filters"]:
tnr = _split_mw_tube_nr(metadata_tube["code"])[-1]
if tnr not in tubes:
logger.debug(f"get {code}-{tnr}")
o = ObsClass.from_lizard(
code,
tnr,
tmin,
tmax,
type_timeseries=type_timeseries,
which_timeseries=which_timeseries,
datafilters=datafilters,
combine_method=combine_method,
only_metadata=only_metadata,
organisation=organisation,
auth=auth,
)
obs_list.append(o)
tubes.append(tnr)
else:
o = ObsClass.from_lizard(
code,
tube_nr,
tmin,
tmax,
which_timeseries=which_timeseries,
datafilters=datafilters,
combine_method=combine_method,
only_metadata=only_metadata,
organisation=organisation,
auth=auth,
)
obs_list.append(o)
return obs_list
[docs]def get_obs_list_from_extent(
extent,
ObsClass,
tube_nr="all",
tmin=None,
tmax=None,
type_timeseries=None, # deprecated argument
which_timeseries=("hand", "diver"), # new preferred argument
datafilters=None,
combine_method="merge",
only_metadata=False,
page_size=100,
nr_threads=10,
organisation="vitens",
auth=None,
):
"""Get all observations within a specified extent.
Parameters
----------
extent : list or shapefile
get groundwater monitoring wells within this extent [xmin, xmax, ymin, ymax]
or within a predefined Polygon from a shapefile
ObsClass : type
class of the observations, e.g. GroundwaterObs
tube_nr : lst of str
list of tube numbers of the monitoring wells that should be selected.
By default 'all' available tubes are selected.
tmin : str, optional
start of the observations (format YYYY-m-d), by default the entire series
is returned
tmax : str, optional
end of the observations (format YYYY-m-d), by default the entire series
is returned
type_timeseries : str, optional (deprecated)
Old keyword, use which_timeseries instead.
which_timeseries : tuple of str, optional
Which timeseries to retrieve. Options: "hand", "diver", "diver_validated".
Defaults to ("hand", "diver") (which should be correct for Vitens).
datafilters : list of strings, optional
Methods to filter the timeseries data.
If None (default), all measurements will be shown.
Currently implemented datafilter methods:
"remove_unvalidated_diver_values_when_validated_available": Removes diver values before last date with validated diver.
"remove_hand_during_diver_period": Removes hand measurements during periods where diver or diver_validated measurements are available.
combine_method : str, optional
"merge" (vertical stack with 'origin' column) or "columns" (side-by-side columns).
If None, defaults to "merge".
only_metadata : bool, optional
if True only metadata is returned and no time series data. The
default is False.
organisation : str
organisation as used by Lizard, currently only "vitens" is officially supported.
auth : tuple, optional
authentication credentials for the API request, e.g.: ("__key__", your_api_key)
page_size : int, optional
number of records to retrieve per page, default is 100
nr_threads : int, optional
number of threads to use for the API requests, default is 10
Returns
-------
obs_col : TYPE
ObsCollection DataFrame with the 'obs' column
"""
if isinstance(extent, (list, tuple)):
polygon_T = extent_to_wgs84_polygon(extent)
elif isinstance(extent, (str, pathlib.PurePath)):
polygon = geopandas.read_file(extent)
# TODO: check this transformation
polygon_T = polygon.to_crs("WGS84", "EPSG:28992").loc[0, "geometry"]
else:
raise TypeError("Extent should be a shapefile or a list of coordinates")
base_url = lizard_api_endpoint.format(organisation=organisation)
lizard_GWS_endpoint = f"{base_url}groundwaterstations/"
url_groundwaterstation_extent = (
f"{lizard_GWS_endpoint}?geometry__within={polygon_T}&page_size={page_size}"
)
r = requests.get(url_groundwaterstation_extent, auth=auth)
r.raise_for_status()
groundwaterstation_data = r.json()
nr_results = groundwaterstation_data["count"]
nr_pages = math.ceil(nr_results / page_size)
logger.info(f"Number of monitoring wells: {nr_results}")
logger.info(f"Number of pages: {nr_pages}")
if nr_results == 0:
logger.warning(
"No monitoring wells found in the specified extent. "
"Please check the extent or the organisation."
)
raise ValueError(r.json())
nr_threads = min(nr_threads, nr_pages)
urls = _prepare_API_input(nr_pages, url_groundwaterstation_extent)
# Prepare arguments for get_obs_list_from_codes
kwargs = {
"ObsClass": ObsClass,
"tube_nr": tube_nr,
"tmin": tmin,
"tmax": tmax,
"type_timeseries": type_timeseries,
"which_timeseries": which_timeseries,
"datafilters": datafilters,
"combine_method": combine_method,
"only_metadata": only_metadata,
"organisation": organisation,
"auth": auth,
}
codes = []
with ThreadPoolExecutor(max_workers=nr_threads) as executor:
for result in tqdm(
executor.map(lambda url: _download(url, auth=auth), urls),
total=nr_pages,
desc="Page",
):
codes += [d["code"] for d in result]
obs_list = []
with ThreadPoolExecutor() as executor:
for obs_list_mw in tqdm(
executor.map(lambda code: get_obs_list_from_codes(code, **kwargs), codes),
total=len(codes),
desc="monitoring well",
):
obs_list += obs_list_mw
return obs_list