Source code for hydropandas.io.waterinfo

import logging
import os
import zipfile
from functools import lru_cache

import geopandas as gpd
import numpy as np
import pandas as pd
from shapely.geometry import box
from tqdm import tqdm

logger = logging.getLogger(__name__)


[docs]def get_obs_list_from_extent( extent, ObsClass, locatie=None, grootheid_code=None, groepering_code=None, parameter_code=None, proces_type=None, tmin=None, tmax=None, only_metadata=False, keep_all_obs=False, epsg=28992, location_gdf=None, ): """Get observations within a specific extent and optionally for a specific location and grootheid_code. Parameters ---------- extent : list, tuple, numpy-array or None, optional get waterinfo measurements within this extent [xmin, xmax, ymin, ymax] ObsClass : type class of the observations, e.g. WaterlvlObs locatie : str or list of str, optional select only measurement with this location(s), e.g. 'schoonhoven', default is None grootheid_code : str or list of str, optional select only measurement with this grootheid_code, e.g. 'WATHTE', default is None groepering_code : str or list of str, optional select only measurement with this groepering_code, e.g. 'GETETBRKD2', default is None parameter_code : str or list of str, optional select only measurement with this parameter_code, e.g. 'Cl', default is None proces_type : str or list of str, optional select only measurement with this proces type, e.g. 'meting', default is None tmin : str or None, optional start time of observations. The default is None. tmax : str or None, optional end time of observations. The default is None. only_metadata : bool, optional if True download only metadata, significantly faster. The default is False. keep_all_obs : bool, optional if False, only observations with measurements are kept. The default is True. epsg : int, optional epsg code of the extent. The default is 28992 (RD). location_gdf : GeoDataFrame, optional geodataframe with the locations of the measurements you want to include. If location_gdf is provided the provided extent and epgs will be ignored. Returns ------- obs_list : list list with Obs objects """ if location_gdf is None: gdf = get_locations_gdf(epsg=epsg) gdf = get_locations_within_extent(gdf, extent=extent) else: gdf = location_gdf if gdf.empty: msg = f"No waterinfo measurements found within extent {extent}" logger.warning(msg) return [] gdf = _select_location( gdf, locatie, grootheid_code, groepering_code, parameter_code, proces_type ) if gdf.empty: return [] logger.info( f"downloading waterinfo measurements from {len(gdf)} observation points" ) obs_list = [] onames = [] for _, row in gdf.iterrows(): if only_metadata: meta = _get_metadata_from_series(row) o = ObsClass(meta=meta, **meta) else: o = ObsClass.from_waterinfo(location_gdf=row, tmin=tmin, tmax=tmax) if not keep_all_obs and o.empty: continue # rename if observation name already exists if o.name in onames: counter = 1 new_name = f"{o.name} ({counter})" while new_name in onames: counter += 1 new_name = f"{o.name} ({counter})" o.name = new_name obs_list.append(o) onames.append(o.name) return obs_list
[docs]def get_waterinfo_obs( path=None, location_gdf=None, locatie=None, grootheid_code=None, groepering_code=None, parameter_code=None, proces_type=None, tmin=None, tmax=None, **kwargs, ): """Get waterinfo observations from a file or ddlpy Parameters ---------- path : str, optional path to waterinfo file (.zip or .csv), default is None location_gdf : geopandas.GeoDataFrame, optional geodataframe with locations, default is None locatie : str or list of str, optional name of the location, e.g. 'schoonhoven', default is None grootheid_code : str or list of str, optional code(s) of the grootheid, e.g. 'WATHTE', default is None groepering_code : str or list of str, optional code(s) of the groepering, e.g. 'GETETBRKD2', default is None parameter_code : str or list of str, optional code(s) of the parameter, e.g. 'Cl', default is None proces_type : str or list of str, optional code(s) of the proces type, e.g. 'meting', default is None tmin : pd.Timestamp, optional start date of the measurements, default is None tmax : pd.Timestamp, optional end date of the measurements, default is None Returns ------- df : pandas.DataFrame DataFrame with measurements meta : dict dict with metadata """ if path is not None: df, meta = read_waterinfo_file(path, **kwargs) else: df, meta = get_measurements_ddlpy( location_gdf, locatie, grootheid_code, groepering_code, parameter_code, proces_type, tmin, tmax, ) return df, meta
def _get_metadata_from_series(selected): """Get metadata from a series with location information Parameters ---------- selected : pandas.Series series with location information Returns ------- meta : dict dict with metadata """ d = selected.to_dict() p = d.pop("geometry") unit = d.pop("Eenheid.Code") if d["Hoedanigheid.Code"] not in ["", "NVT"]: unit += " " + d.pop("Hoedanigheid.Code") meta = { "name": d["Naam"] + " " + d.pop("Grootheid.Omschrijving"), "unit": unit, "x": p.x, "y": p.y, "source": "waterinfo (ddlpy)", "location": d.pop("Naam"), "meta": d, } return meta def _select_location( location_gdf, locatie, grootheid_code, groepering_code, parameter_code, proces_type ): """Select location from a geodataframe with locations Parameters ---------- location_gdf : geopandas.GeoDataFrame geodataframe with locations locatie : str or list of str name(s) of the location grootheid_code : str or list of str code(s) of the grootheid groepering_code : str or list of str code(s) of the groepering, e.g. 'GETETBRKD2' parameter_code : str or list of str code(s) of the parameter, e.g. 'Cl' proces_type : str or list of str code(s) of the proces type, e.g. 'meting' Returns ------- selected : pandas.Series series with location information """ if locatie is not None: if isinstance(locatie, str): locatie = [locatie] location_gdf = location_gdf.loc[locatie] if grootheid_code is not None: if isinstance(grootheid_code, str): grootheid_code = [grootheid_code] location_gdf = location_gdf.loc[ location_gdf["Grootheid.Code"].isin(grootheid_code) ] if groepering_code is not None: if isinstance(groepering_code, str): groepering_code = [groepering_code] location_gdf = location_gdf.loc[ location_gdf["Groepering.Code"].isin(groepering_code) ] if parameter_code is not None: if isinstance(parameter_code, str): parameter_code = [parameter_code] location_gdf = location_gdf.loc[ location_gdf["Parameter.Code"].isin(parameter_code) ] if proces_type is not None: if isinstance(proces_type, str): proces_type = [proces_type] location_gdf = location_gdf.loc[location_gdf["ProcesType"].isin(proces_type)] if location_gdf.empty: msg = f"No location found for {locatie=}, {grootheid_code=}, {groepering_code=}, {parameter_code=}, {proces_type=}" raise ValueError(msg) return location_gdf
[docs]def get_measurements_ddlpy( location_gdf=None, locatie=None, grootheid_code=None, groepering_code=None, parameter_code=None, proces_type=None, tmin=None, tmax=None, ): """Get measurements from ddlpy for a specific location and grootheid_code Parameters ---------- location_gdf : geopandas.GeoDataFrame, optional geodataframe with one or more locations, default is None locatie : str or list of str, optional name(s) of the location grootheid_code : str or list of str, optional code(s) of the grootheid groepering_code : str or list of str, optional code(s) of the groepering parameter_code : str or list of str, optional code(s) of the parameter proces_type : str or list of str, optional code(s) of the proces type, e.g. 'meting' tmin : pd.Timestamp, optional start date of the measurements, default is 2025-01-01 tmax : pd.Timestamp, optional end date of the measurements, default is now Returns ------- df : pandas.DataFrame DataFrame with measurements meta : dict dict with metadata """ import ddlpy if tmin is None: tmin = pd.Timestamp("2024") else: tmin = pd.to_datetime(tmin) if tmax is None: tmax = pd.Timestamp.now() else: tmax = pd.to_datetime(tmax) if location_gdf is None: location_gdf = get_locations_gdf() if isinstance(location_gdf, pd.Series): selected = location_gdf locatie = selected.name grootheid_code = selected["Grootheid.Code"] groepering_code = selected["Groepering.Code"] parameter_code = selected["Parameter.Code"] proces_type = selected["ProcesType"] df = ddlpy.measurements(selected, start_date=tmin, end_date=tmax) else: selected = _select_location( location_gdf, locatie, grootheid_code, groepering_code, parameter_code, proces_type, ) if isinstance(selected, pd.DataFrame): if len(selected) == 1: selected = selected.iloc[0] df = ddlpy.measurements(selected, start_date=tmin, end_date=tmax) else: logger.info( "Multiple observation points match critera, select first one with measurements" ) for _, row in selected.iterrows(): df = ddlpy.measurements(row, start_date=tmin, end_date=tmax) if not df.empty: break selected = row else: df = ddlpy.measurements(selected, start_date=tmin, end_date=tmax) if df.empty: msg = ( f"No measurements for {locatie=}, {grootheid_code=}, " f"{groepering_code=}, {parameter_code=} and {proces_type=} between " f"{tmin} and {tmax}" ) logger.info(msg) else: if "Meetwaarde.Waarde_Numeriek" in df.columns: df = df[["Meetwaarde.Waarde_Numeriek"]] df.columns = ["value"] # remove time zone information by transforming to dutch winter time df.index = pd.to_datetime(df.index, utc=True).tz_localize(None) + pd.Timedelta( 1, unit="h" ) meta = _get_metadata_from_series(selected) return df, meta
[docs]@lru_cache def get_locations_gdf(epsg=28992): """Get locations from ddlpy and return as geodataframe Returns ------- gdf : geopandas.GeoDataFrame geodataframe with locations. This dataframe is needed to obtain measurements using ddlpy """ import ddlpy locations = ddlpy.locations() geometries = gpd.points_from_xy(locations["Lon"], locations["Lat"]) gdf = gpd.GeoDataFrame(locations, geometry=geometries, crs=4326) gdf.to_crs(epsg, inplace=True) return gdf
[docs]def get_locations_within_extent(gdf, extent=(482.06, 306602.42, 284182.97, 637049.52)): """Get locations from ddlpy and return as geodataframe Parameters ---------- gdf : geopandas.GeoDataFrame geodataframe with locations. This dataframe is needed to obtain measurements using ddlpy extent : tuple, optional extent of the locations. The default is the extent of the Netherlands (RD). Returns ------- gdf : geopandas.GeoDataFrame geodataframe with locations. This dataframe is needed to obtain measurements using ddlpy """ polygon_ext = box(*tuple(np.array(extent)[[0, 2, 1, 3]])) gdf = gdf.loc[gdf.within(polygon_ext)] return gdf
[docs]def read_waterinfo_file( path, index_cols=None, return_metadata=True, value_col=None, location_col=None, xcol=None, ycol=None, transform_coords=True, ): """Read waterinfo file (CSV or zip) Parameters ---------- path : str path to waterinfo file (.zip or .csv) Returns ------- df : pandas.DataFrame DataFrame containing file content metadata : dict, optional dict containing metadata, returned if return_metadata is True, default is False """ from pyproj import Transformer name = os.path.splitext(os.path.basename(path))[0] if path.endswith(".csv"): f = path elif path.endswith(".zip"): zf = zipfile.ZipFile(path) f = zf.open(f"{name}.csv") else: raise NotImplementedError( f"File type '{os.path.splitext(path)[-1]}' not supported!" ) if value_col is None: value_col = "NUMERIEKEWAARDE" if location_col is None: location_col = "MEETPUNT_IDENTIFICATIE" if xcol is None: xcol = "X" if ycol is None: ycol = "Y" # read data df = pd.read_csv( f, sep=";", decimal=",", encoding="ISO-8859-1", dayfirst=True, ) if index_cols is None: index_cols = ["WAARNEMINGDATUM"] if "WAARNEMINGTIJD (MET/CET)" in df.columns: index_cols += ["WAARNEMINGTIJD (MET/CET)"] elif "WAARNEMINGTIJD" in df.columns: index_cols += ["WAARNEMINGTIJD"] else: raise KeyError("expected column with WAARNEMINGTIJD but could not find one") df.index = pd.to_datetime( df[index_cols[0]] + " " + df[index_cols[1]], dayfirst=True ) df.drop(columns=index_cols, inplace=True) # do some conversions df.loc[df[value_col] == 999999999, value_col] = np.nan df[value_col] = df[value_col] / 100.0 # parse metadata into dict if return_metadata: if len(df[location_col].unique()) > 1: raise ValueError( "File contains data for more than one location!" " Use ObsCollection.from_waterinfo()!" ) metadata = {} if transform_coords: transformer = Transformer.from_crs("epsg:25831", "epsg:28992") x, y = transformer.transform(df[xcol].iloc[-1], df[ycol].iloc[-1]) else: x = df[xcol].iloc[-1] / 100.0 y = df[ycol].iloc[-1] / 100.0 metadata["name"] = df[location_col].iloc[-1] metadata["x"] = x metadata["y"] = y metadata["filename"] = f metadata["source"] = "waterinfo" return df, metadata else: return df
[docs]def read_waterinfo_obs(file_or_dir, ObsClass, progressbar=False, **kwargs): """Read waterinfo file or directory and extract locations and observations. Parameters ---------- file_or_dir : str path to file or directory ObsClass: Obs type type of Obs to store data in progressbar : bool, optional show progressbar if True, default is False Returns ------- obs_collection : list list of Obs objects """ from pyproj import Transformer # Waterinfo file if os.path.isfile(file_or_dir): files = [file_or_dir] # directory with waterinfo files (zips or csvs) elif os.path.isdir(file_or_dir): files = [os.path.join(file_or_dir, f) for f in sorted(os.listdir(file_or_dir))] else: raise NotImplementedError("Provide path to file or directory!") location_col = kwargs.pop("location_col", "MEETPUNT_IDENTIFICATIE") # loop over files metadata = {} obs_collection = [] transformer = Transformer.from_crs("epsg:25831", "epsg:28992") for filenm in tqdm(files) if progressbar else files: # read file or zip df = read_waterinfo_file( filenm, location_col=location_col, return_metadata=False, **kwargs ) # get location and convert to m RD for stn in df[location_col].unique(): mask = df[location_col] == stn x, y = transformer.transform( df.loc[mask, "X"].iloc[-1], df.loc[mask, "Y"].iloc[-1] ) metadata = { "name": stn, "x": x, "y": y, "filename": filenm, "source": "waterinfo", } # add to list o = ObsClass(df.loc[mask, :], meta=metadata, **metadata) obs_collection.append(o) return obs_collection