Source code for hydropandas.util

"""Created on Wed Sep 12 12:15:42 2018.

@author: Artesia
"""

import logging
import os
import sys
import tempfile
import time
import zipfile

import pandas as pd
from scipy.interpolate import RBFInterpolator

logger = logging.getLogger(__name__)

EPSG_28992 = (
    "+proj=sterea +lat_0=52.15616055555555 +lon_0=5.38763888888889 +k=0.9999079 "
    "+x_0=155000 +y_0=463000 +ellps=bessel "
    "+towgs84=565.417,50.3319,465.552,-0.398957,0.343988,-1.8774,4.0725 +units=m "
    "+no_defs"
)


def _obslist_to_frame(obs_list):
    """Convert a list of observations to a pandas DataFrame.

    Parameters
    ----------
    obs_list : list of hydropandas.*Obs
        list containing *Obs objects that will be stored in DataFrame.

    Returns
    -------
    obs_df : pandas.DataFrame
        DataFrame containing all data
    """
    if len(obs_list) > 0:
        obs_df = pd.DataFrame(
            [o.to_collection_dict() for o in obs_list],
            columns=obs_list[0].to_collection_dict().keys(),
        )
        obs_df.set_index("name", inplace=True)
        if obs_df.index.duplicated().any():
            logger.warning("multiple observations with the same name")
    else:
        obs_df = pd.DataFrame()

    return obs_df


[docs]def unzip_file(src, dst, force=False, preserve_datetime=False): """Unzip file. Parameters ---------- src : str source zip file dst : str destination directory force : boolean, optional force unpack if dst already exists preserve_datetime : boolean, optional use date of the zipfile for the destination file Returns ------- int 1 of True """ if os.path.exists(dst) and not force: print( "File not unzipped. Destination already exists. Use'force=True' to unzip." ) return if preserve_datetime: zipf = zipfile.ZipFile(src, "r") for f in zipf.infolist(): zipf.extract(f, path=dst) date_time = time.mktime(f.date_time + (0, 0, -1)) os.utime(os.path.join(dst, f.filename), (date_time, date_time)) zipf.close() else: zipf = zipfile.ZipFile(src, "r") zipf.extractall(dst) zipf.close() return 1
[docs]def get_files( file_or_dir, ext, unpackdir=None, force_unpack=False, preserve_datetime=False ): """Internal method to get list of files with specific extension from dirname. Parameters ---------- file_or_dir : str file or path to data. ext : str extension of filenames to store in list. unpackdir : str directory to story unpacked zip file, only used in case of a zipfile. force_unpack : bool, optional force unzip, by default False. preserve_datetime : bool, optional preserve datetime of unzipped files, by default False. Used for checking whether data has changed. """ # check if unpackdir is same as file_or_dir, if same, this can cause # problems when the unpackdir still contains zips that will be unpacked # again. if (unpackdir is not None) and ( os.path.normcase(unpackdir) == os.path.normcase(file_or_dir) ): raise ValueError("Please specify a different folder to unpack files!") # identify whether file_or_dir started as zip if str(file_or_dir).endswith(".zip"): iszip = True else: iszip = False # unzip dir if iszip: zipf = file_or_dir if unpackdir is None: file_or_dir = tempfile.TemporaryDirectory().name else: file_or_dir = unpackdir unzip_file( zipf, file_or_dir, force=force_unpack, preserve_datetime=preserve_datetime ) # file_or_dir is directory if os.path.isdir(file_or_dir): # check for zips in dir zip_fnames = [i for i in os.listdir(file_or_dir) if i.endswith(".zip")] if len(zip_fnames) > 0: # unzip zips if unpackdir is None: dirname = tempfile.TemporaryDirectory().name else: dirname = unpackdir for zipf in zip_fnames: unzip_file( os.path.join(file_or_dir, zipf), dirname, force=True, preserve_datetime=preserve_datetime, ) # remove intermediate zipfiles if initial file_or_dir was zip if iszip: os.remove(os.path.join(file_or_dir, zipf)) else: dirname = file_or_dir # get all files with extension ext unzip_fnames = [i for i in os.listdir(dirname) if i.endswith(ext)] elif os.path.isfile(file_or_dir): # file_or_dir is actually an xml unzip_fnames = [os.path.basename(file_or_dir)] # get file name dirname = os.path.dirname(file_or_dir) # get directory path else: raise NotImplementedError(f"Cannot parse 'file_or_dir': {file_or_dir}!") return dirname, unzip_fnames
[docs]def df2gdf(df, xcol="x", ycol="y", crs=28992): """Create a GeoDataFrame from a DataFrame with xy points. Parameters ---------- df : pd.DataFrame input dataframe xcol : str, optional column name with x values. The default is 'x'. ycol : str, optional column name with y values. The default is 'x'. crs : int, optional coordinate reference system, by default 28992 (RD new). Returns ------- geopandas GeoDataFrame geodataframe """ from geopandas import GeoDataFrame from shapely.geometry import Point gdf = GeoDataFrame( df.copy(), geometry=[Point((s[xcol], s[ycol])) for i, s in df.iterrows()], crs=crs, ) return gdf
[docs]class ColoredFormatter(logging.Formatter): """Colored log formatter. Taken from https://gist.github.com/joshbode/58fac7ababc700f51e2a9ecdebe563ad """ def __init__(self, *args, colors: dict[str, str] | None = None, **kwargs) -> None: """Initialize the formatter with specified format strings.""" super().__init__(*args, **kwargs) self.colors = colors if colors else {}
[docs] def format(self, record) -> str: """Format the specified record as text.""" record.color = self.colors.get(record.levelname, "") record.reset = "\x1b[0m" return super().format(record)
[docs]def get_color_logger(level="INFO", logger_name=None): """Get a logger with colored output. Parameters ---------- level : str, optional The logging level to set for the logger. Default is "INFO". Returns ------- logger : logging.Logger The configured logger object. """ if level == "DEBUG": FORMAT = "{color}{levelname}:{name}.{funcName}:{lineno}:{message}{reset}" else: FORMAT = "{color}{levelname}:{name}.{funcName}:{message}{reset}" formatter = ColoredFormatter( FORMAT, style="{", datefmt="%Y-%m-%d %H:%M:%S", colors={ "DEBUG": "\x1b[36m", "INFO": "\x1b[32m", "WARNING": "\x1b[33m", "ERROR": "\x1b[31m", "CRITICAL": "\x1b[31m" + "\x1b[47m" + "\x1b[1m", }, ) handler = logging.StreamHandler(sys.stdout) handler.setFormatter(formatter) clogger = logging.getLogger(logger_name) clogger.handlers[:] = [] clogger.addHandler(handler) clogger.setLevel(getattr(logging, level)) logging.captureWarnings(True) return clogger
[docs]def oc_to_df(oc, col: str | None = None) -> pd.DataFrame: """Convert an observation collection to a DataFrame where every column has one observation. Parameters ---------- oc : hydropandas ObsCollection observation collection col : Optional[str], optional Name of a column in hte observation collection, by default None Returns ------- DataFrame _description_ """ df_list = [] for o in oc.obs.values: if not o.empty: if col is None: vals = o.loc[:, o._get_first_numeric_col_name()] else: vals = o.loc[:, col] vals.name = o.name df_list.append(vals) return pd.concat(df_list, axis=1)
[docs]def interpolate( xy: list[list[float]], obsdf: pd.DataFrame, obsloc: pd.DataFrame, kernel: str = "thin_plate_spline", kernel2: str = "linear", epsilon: int | None = None, ) -> pd.DataFrame: """Interpolation method using the Scipy radial basis function (RBF) Parameters ---------- xy : List[List[float]] xy coordinates of locations of interest e.g. [[10,25], [5,25]] obsdf : DataFrame Dataframe containing the observation locations as columns and the observations at a measurement time in each row. obsloc : DataFrame Dataframe containing the observation locations coordinates with observation locations as index and columns ["x", "y"] kernel : str, optional Type of radial basis funtion, by default thin_plate_spline. Other options are linear, gaussian, inverse_quadratic, multiquadric, inverse_multiquadric, cubic or quintic. kernel2 : str, optional Kernel in case there are not enough observations (3 or 6) for time step, by default linear. Other options are gaussian, inverse_quadratic, multiquadric, or inverse_multiquadric. epsilon : Optional[int], optional Shape parameter that scales the input to the RBF. If kernel is linear, thin_plate_spline, cubic, or quintic, this defaults to 1. Otherwise this must be specified. Returns ------- DataFrame DataFrame with locations of interest as columns and interpolated values at a measurement time in each row. """ if kernel in {"thin_plate_spline", "cubic"}: min_val = 3 elif kernel == "quintic": min_val = 6 else: min_val = len(obsdf.index) fill_df = ( pd.DataFrame( index=obsdf.index, columns=[f"{int(_xy[0])}_{int(_xy[1])}" for _xy in xy], ) .sort_index() .astype(float) ) for idx in obsdf.index: # get all stations with values for this date val = obsdf.loc[idx].dropna() # get stations for this date coor = obsloc.loc[val.index] if len(val) < min_val: kernel = kernel2 # create an scipy interpolator rbf = RBFInterpolator( coor.to_numpy(), val.to_numpy(), epsilon=epsilon, kernel=kernel ) val_rbf = rbf(xy) fill_df.loc[idx] = val_rbf return fill_df