import logging
import os
import re
import tempfile
from io import FileIO, TextIOWrapper
from pathlib import Path
from typing import Union
from zipfile import ZipFile
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
def _read_dino_groundwater_header(f):
line = f.readline()
header = dict()
while line not in ["\n", "", "\r\n"]:
if "," in line: # for csv from dinoloket
propval = line.split(",")
prop = propval[0]
prop = prop.replace(":", "")
prop = prop.strip()
val = propval[1]
if propval[2] != "":
val = val + " " + propval[2].replace(":", "") + " " + propval[3]
header[prop] = val
else: # for artdiver dino-csv
propval = line.split(":")
prop = propval[0]
val = ":".join(propval[1:])
header[prop] = val
line = f.readline()
return line, header
def _read_empty(f, line):
while (line == "\n") or (line == "\r\n"):
line = f.readline()
return line
def _read_dino_groundwater_referencelvl(f, line):
ref = {}
while line not in ["\n", "", "\r\n"]:
propval = line.split(",")
prop = propval[0]
prop = prop.replace(":", "")
prop = prop.strip()
if len(propval) > 1:
val = propval[1]
ref[prop] = val
line = f.readline()
return line, ref
def _read_dino_groundwater_metadata(f, line):
_translate_dic_float = {
"x-coordinaat": "x",
"y-coordinaat": "y",
"filternummer": "tube_nr",
}
_translate_dic_div_100 = {
"meetpunt (cm t.o.v. nap)": "tube_top",
"bovenkant filter (cm t.o.v. nap)": "screen_top",
"onderkant filter (cm t.o.v. nap)": "screen_bottom",
"maaiveld (cm t.o.v. nap)": "ground_level",
}
metalist = list()
line = line.strip()
properties = line.split(",")
line = f.readline()
while line not in ["\n", "", "\r\n"]:
meta = dict()
line = line.strip()
values = line.split(",")
for i, val in enumerate(values):
meta[properties[i].lower()] = val
metalist.append(meta)
line = f.readline()
meta_ts = {}
if metalist:
# add time dependent metadata to meta_ts
for i, meta in enumerate(metalist):
meta_tsi = {}
start_date = pd.to_datetime(meta.pop("startdatum"), dayfirst=True)
# end_date = pd.to_datetime(meta.pop('einddatum'), dayfirst=True)
meta_tsi["monitoring_well"] = meta["locatie"]
for key in _translate_dic_float.keys():
if meta[key] == "":
meta_tsi[_translate_dic_float[key]] = np.nan
else:
meta_tsi[_translate_dic_float[key]] = float(meta[key])
for key in _translate_dic_div_100.keys():
if meta[key] == "":
meta_tsi[_translate_dic_div_100[key]] = np.nan
else:
meta_tsi[_translate_dic_div_100[key]] = float(meta[key]) / 100.0
if i == 0:
for key in meta_tsi.keys():
meta_ts[key] = pd.Series(name=key, dtype=type(meta_tsi[key]))
for key in meta_tsi.keys():
meta_ts[key].loc[start_date] = meta_tsi[key]
# remove series with non time variant metadata from meta_ts
ts_keys = (
["monitoring_well"]
+ list(_translate_dic_float.values())
+ list(_translate_dic_div_100.values())
)
for key in ts_keys:
unique_values = meta_ts[key].unique()
if len(unique_values) == 1:
meta_ts.pop(key)
obs_att = meta_tsi.copy()
obs_att["name"] = f'{obs_att["monitoring_well"]}-{int(obs_att["tube_nr"]):03d}'
obs_att["metadata_available"] = True
else:
# no metadata
obs_att = {}
obs_att["monitoring_well"] = ""
obs_att["tube_nr"] = np.nan
obs_att["name"] = "unknown"
obs_att["x"] = np.nan
obs_att["y"] = np.nan
obs_att["tube_top"] = np.nan
obs_att["ground_level"] = np.nan
obs_att["screen_top"] = np.nan
obs_att["screen_bottom"] = np.nan
obs_att["metadata_available"] = False
return line, obs_att, meta_ts
def _read_dino_groundwater_measurements(f, line):
line = line.strip()
titel = line.split(",")
while "" in titel:
titel.remove("")
if line != "":
# Validate if titles are valid names
validator = np.lib._iotools.NameValidator()
titel = [s.lower() for s in validator(titel)]
usecols = range(0, len(titel))
try:
measurements = pd.read_csv(
f,
header=None,
names=titel,
parse_dates=["peildatum"],
index_col="peildatum",
dayfirst=True,
usecols=usecols,
)
except pd.errors.ParserError:
# for now the workflow is to remove the files that cannot be read
# manually.
measurements = None
else:
measurements = None
return line, measurements
[docs]def read_dino_groundwater_quality_txt(f: Union[str, Path, FileIO]):
"""Read dino groundwater quality (grondwatersamenstelling) from a dinoloket
txt file.
Notes
-----
this function has not been tested thoroughly
Parameters
----------
filepath_or_buffer : str
path to txt file
Returns
-------
measurements : pd.DataFrame
meta : dict
dictionary with metadata
"""
if isinstance(f, str):
fname = f
else:
fname = f.name.split(os.sep)[-1]
if isinstance(f, (str, Path)):
if isinstance(f, str):
f = Path(f)
fname = str(f.stem)
f = f.open("r")
logger.info("reading -> {}".format(fname))
# LOCATIE gegevens
line = f.readline().rstrip("\n")
assert line == "LOCATIE gegevens"
strt_locatie = f.tell()
# determine the number of rows
nrows = -1 # the header does not count
line = f.readline()
while line not in ["\n", ""]:
nrows += 1
line = f.readline()
eind_locatie = f.tell()
# go back to where we were before
f.seek(strt_locatie)
# read the location-information
locatie = pd.read_csv(f, sep="\t", nrows=nrows)
# there is always only one location (change if this is not the case)
assert nrows == 1
locatie = locatie.squeeze()
# KWALITEIT gegevens
f.seek(eind_locatie)
read_quality = False
while line:
if line.startswith("KWALITEIT gegevens VAST\n"):
logger.warning("ignoring 'KWALITEIT gegevens VAST'! ")
if line.startswith("KWALITEIT gegevens VLOEIBAAR"):
read_quality = True
break
line = f.readline()
if read_quality:
strt_locatie = f.tell()
nrows = -1
while line not in ["\n", ""]:
nrows += 1
line = f.readline()
eind_locatie = f.tell()
f.seek(strt_locatie)
measurements = pd.read_csv(
f,
sep="\t",
parse_dates=["Monster datum", "Analyse datum"],
dayfirst=True,
index_col="Monster datum",
nrows=nrows,
)
else:
measurements = pd.Series()
f.close()
meta = {
"filename": fname,
"source": "dino",
"monitoring_well": locatie["NITG-nr"],
"name": locatie["NITG-nr"],
"x": locatie["X-coord"],
"y": locatie["Y-coord"],
}
try:
meta["ground_level"] = locatie["Maaiveldhoogte (m tov NAP)"]
except KeyError:
meta["ground_level"] = np.nan
return measurements, meta
[docs]def read_dino_groundwater_csv(
f: Union[str, Path, FileIO],
to_mnap: bool = True,
read_series: bool = True,
remove_duplicates: bool = False,
keep_dup: str = "last",
):
"""Read dino groundwater quantity data from a dinoloket csv file.
Parameters
----------
f : Union[str, Path, TextIOWrapper]
path to csv file
to_mnap : boolean, optional
if True a column with 'stand_m_tov_nap' is added to the dataframe
read_series : boolean, optional
if False only metadata is read, default is True
remove_duplicates : boolean, optional
if True duplicate indices are removed. Default is False.
keep_dup : str, optional
indicate which duplicate indices should be kept, only used when
remove_duplicates is True. Default is 'last'
Returns
-------
measurements : pd.DataFrame
meta : dict
dictionary with metadata
"""
if isinstance(f, (str, Path)):
if isinstance(f, str):
f = Path(f)
fname = f.stem
f = f.open("r")
elif isinstance(f, TextIOWrapper):
fname = f.name
else:
raise TypeError("f should be of type str, Path or TextIOWrapper")
logger.info("reading -> {}".format(fname))
# read header
line, header = _read_dino_groundwater_header(f)
line = _read_empty(f, line)
if not header:
logger.warning(f"could not read header -> {fname}")
# read reference level
line, ref = _read_dino_groundwater_referencelvl(f, line)
line = _read_empty(f, line)
if not ref:
logger.warning(f"could not read reference level -> {fname}")
# read metadata
line, meta, meta_ts = _read_dino_groundwater_metadata(f, line)
line = _read_empty(f, line)
if not meta["metadata_available"]:
logger.warning(f"could not read metadata -> {fname}")
meta["filename"] = fname
meta["source"] = "dino"
# read measurements
if read_series:
line, measurements = _read_dino_groundwater_measurements(f, line)
if measurements is None:
logger.warning(f"could not read measurements -> {fname}")
elif measurements[~measurements.stand_cm_tov_nap.isna()].empty:
logger.warning(f"no NAP measurements available -> {fname}")
if to_mnap and measurements is not None:
measurements.insert(
0, "stand_m_tov_nap", measurements["stand_cm_tov_nap"] / 100.0
)
meta["unit"] = "m NAP"
elif not to_mnap:
meta["unit"] = "cm NAP"
if remove_duplicates:
measurements = measurements[~measurements.index.duplicated(keep=keep_dup)]
# add time variant metadata to measurements
for s in meta_ts.values():
if measurements is None:
measurements = pd.DataFrame(data=s.copy(), columns=[s.name])
else:
measurements = measurements.join(s, how="outer")
measurements.loc[:, s.name] = measurements.loc[:, s.name].ffill()
else:
measurements = None
f.close()
return measurements, meta
def _read_artdino_groundwater_metadata(f, line):
metalist = list()
line = line.strip()
properties = line.split(",")
line = f.readline()
while line not in ["\n", "", "\r\n"]:
meta = dict()
line = line.strip()
values = line.split(",")
for i in range(0, len(values)):
meta[properties[i].lower()] = values[i]
metalist.append(meta)
line = f.readline()
meta = {}
if metalist:
meta["monitoring_well"] = metalist[-1]["locatie"]
meta["tube_nr"] = int(float(metalist[-1]["filternummer"]))
meta["name"] = "-".join([meta["monitoring_well"], metalist[-1]["filternummer"]])
meta["x"] = float(metalist[-1]["x-coordinaat"])
meta["y"] = float(metalist[-1]["y-coordinaat"])
meetpunt = metalist[-1]["meetpunt nap"]
if meetpunt == "":
meta["tube_top"] = np.nan
else:
meta["tube_top"] = float(meetpunt) / 100.0
maaiveld = metalist[-1]["maaiveld nap"]
if maaiveld == "":
meta["ground_level"] = np.nan
else:
meta["ground_level"] = float(maaiveld) / 100
bovenkant_filter = metalist[-1]["bovenkant filter"]
if bovenkant_filter == "":
meta["screen_top"] = np.nan
else:
meta["screen_top"] = float(bovenkant_filter) / 100
onderkant_filter = metalist[-1]["onderkant filter"]
if onderkant_filter == "":
meta["screen_bottom"] = np.nan
else:
meta["screen_bottom"] = float(onderkant_filter) / 100
meta["metadata_available"] = True
else:
# no metadata
meta["monitoring_well"] = ""
meta["tube_nr"] = np.nan
meta["name"] = "unknown"
meta["x"] = np.nan
meta["y"] = np.nan
meta["tube_top"] = np.nan
meta["ground_level"] = np.nan
meta["screen_top"] = np.nan
meta["screen_bottom"] = np.nan
meta["metadata_available"] = False
return line, meta
def _read_artdino_groundwater_measurements(f, line):
line = line.strip()
titel = line.split(",")
while "" in titel:
titel.remove("")
if line != "":
# Validate if titles are valid names
validator = np.lib._iotools.NameValidator()
titel = [s.lower() for s in validator(titel)]
usecols = range(0, len(titel))
try:
measurements = pd.read_csv(
f,
header=None,
names=titel,
parse_dates=["peil_datum_tijd"],
index_col="peil_datum_tijd",
dayfirst=True,
usecols=usecols,
)
except pd.errors.ParserError:
# for now the workflow is to remove the files that cannot be read
# manually.
measurements = None
else:
measurements = None
return line, measurements
[docs]def read_artdino_groundwater_csv(path, to_mnap=True, read_series=True):
"""Read dino groundwater quantity data from a CSV file as exported by
ArtDiver.
Parameters
----------
path : str
path to csv file
to_mnap : boolean, optional
if True a column with 'stand_m_tov_nap' is added to the dataframe
read_series : boolean, optional
if False only metadata is read, default is True
Returns
-------
measurements : pd.DataFrame
meta : dict
dictionary with metadata
"""
logger.info(f"reading -> {os.path.split(path)[-1]}")
with open(path, "r") as f:
# read header
line, header = _read_dino_groundwater_header(f)
line = _read_empty(f, line)
if not header:
logger.warning(f"could not read header -> {path}")
# read metadata
line, meta = _read_artdino_groundwater_metadata(f, line)
line = _read_empty(f, line)
if not meta["metadata_available"]:
logger.warning(f"could not read metadata -> {path}")
meta["filename"] = path
meta["source"] = "dino"
# read measurements
if read_series:
line, measurements = _read_artdino_groundwater_measurements(f, line)
if measurements is None:
logger.warning(f"could not read measurements -> {path}")
elif measurements[~measurements["stand_cm_nap"].isna()].empty:
logger.warning(f"no NAP measurements available -> {path}")
if to_mnap and measurements is not None:
measurements["stand_m_tov_nap"] = measurements["stand_cm_nap"] / 100.0
meta["unit"] = "m NAP"
elif not to_mnap:
meta["unit"] = "cm NAP"
else:
measurements = None
return measurements, meta
[docs]def read_artdino_dir(
dirname,
ObsClass=None,
subdir="csv",
suffix="1.csv",
unpackdir=None,
force_unpack=False,
preserve_datetime=False,
keep_all_obs=True,
**kwargs,
):
"""Read Dino directory with point observations.
TODO:
- Evt. nog verbeteren door meteen Dataframe te vullen op het moment dat een
observatie wordt ingelezen. Nu wordt eerst alles ingelezen in een lijst en
daar een dataframe van gemaakt.
Parameters
----------
dirname : str
directory name, can be a .zip file or the parent directory of subdir
ObsClass : type
class of the observations, e.g. GroundwaterObs or WaterlvlObs
subdir : str
subdirectory of dirname with data files
suffix : str
suffix of files in subdir that will be read
unpackdir : str
destination directory of the unzipped file
force_unpack : boolean, optional
force unpack if dst already exists
preserve_datetime : boolean, optional
use date of the zipfile for the destination file
keep_all_obs : boolean, optional
add all observation points to the collection, even without data or
metadata
**kwargs: dict, optional
Extra arguments are passed to ObsClass.from_artdino_file()
Returns
-------
obs_df : pd.DataFrame
collection of multiple point observations
"""
from ..util import unzip_file
# unzip dir
if dirname.endswith(".zip"):
zipf = dirname
if unpackdir is None:
dirname = tempfile.TemporaryDirectory().name
else:
dirname = unpackdir
unzip_file(
zipf, dirname, force=force_unpack, preserve_datetime=preserve_datetime
)
# read filenames
files = os.listdir(os.path.join(dirname, subdir))
if suffix:
files = [file for file in files if file.endswith(suffix)]
if not files:
raise FileNotFoundError(
"no files were found in {} that end with {}".format(
os.path.join(dirname, subdir), suffix
)
)
# read individual files
obs_list = []
for _, file in enumerate(files):
path = os.path.join(dirname, subdir, file)
obs = ObsClass.from_artdino_file(path=path, **kwargs)
if obs.metadata_available and (not obs.empty):
obs_list.append(obs)
elif keep_all_obs:
obs_list.append(obs)
else:
logging.info(f"not added to collection -> {path}")
return obs_list
def _read_dino_waterlvl_metadata(f, line):
"""read dino waterlevel metadata
Parameters
----------
f : text wrapper
line : str
line with meta dictionary keys
meta_dic : dict (optional)
dictionary with metadata
Returns
-------
meta : dict
dictionary with metadata
"""
meta_keys = line.strip().split(",")
meta_values = f.readline().strip().split(",")
meta = {}
for key, value in zip(meta_keys, meta_values):
key = key.strip()
if key in ["X-coordinaat", "Y-coordinaat"]:
if key == "X-coordinaat":
meta["x"] = float(value)
elif key == "Y-coordinaat":
meta["y"] = float(value)
elif key == "Locatie":
meta["monitoring_well"] = value
meta["name"] = value
return meta
def _read_dino_waterlvl_measurements(f, line):
"""
Parameters
----------
f : text wrapper
line: str
header of dataframe
Returns
-------
measurements : pd.DataFrame
"""
titel = line.strip().split(",")
while "" in titel:
titel.remove("")
validator = np.lib._iotools.NameValidator()
titel = [i.lower() for i in validator(titel)]
usecols = range(0, len(titel))
measurements = pd.read_csv(
f,
header=None,
names=titel,
parse_dates=["peildatum"],
index_col="peildatum",
dayfirst=True,
usecols=usecols,
)
# measurements['Stand (m t.o.v. NAP)'] = measurements['Stand (cm t.o.v. NAP)'] /100.
# measurements.set_index('Peildatum', inplace=True)
return measurements
[docs]def read_dino_waterlvl_csv(
f: Union[str, Path, FileIO], to_mnap: bool = True, read_series: bool = True
):
"""Read dino waterlevel data from a dinoloket csv file.
Parameters
----------
f : str, Path, FileIO
path to dino water level csv file
to_mnap : boolean, optional
if True a column with 'stand_m_tov_nap' is added to the dataframe
read_series : boolean, optional
if False only metadata is read, default is True
"""
fname = ""
if isinstance(f, (str, Path)):
if isinstance(f, str):
f = Path(f)
fname = f.stem
f = f.open("r")
logger.info("reading -> {}".format(fname))
p_meta = re.compile(
"Locatie,Externe aanduiding,X-coordinaat,Y-coordinaat, Startdatum, Einddatum"
)
p_data = re.compile(r"Locatie,Peildatum,Stand \(cm t.o.v. NAP\),Bijzonderheid")
line = f.readline()
while line != "":
line = f.readline()
if p_meta.match(line):
meta = _read_dino_waterlvl_metadata(f, line)
if meta:
meta["metadata_available"] = True
else:
meta["metadata_available"] = False
meta["filename"] = fname
meta["source"] = "dino"
elif p_data.match(line):
if read_series:
measurements = _read_dino_waterlvl_measurements(f, line)
if to_mnap and measurements is not None:
measurements["stand_m_tov_nap"] = (
measurements["stand_cm_tov_nap"] / 100.0
)
meta["unit"] = "m NAP"
elif not to_mnap:
meta["unit"] = "cm NAP"
else:
measurements = None
f.close()
return measurements, meta
[docs]def read_dino_dir(
path: Union[str, Path],
ObsClass,
subdir: str = "Grondwaterstanden_Put",
suffix: str = "1.csv",
keep_all_obs: bool = True,
**kwargs: dict,
):
"""Read Dino directory with point observations.
TODO:
- Evt. nog verbeteren door meteen Dataframe te vullen op het moment dat een
observatie wordt ingelezen. Nu wordt eerst alles ingelezen in een lijst en
daar een dataframe van gemaakt.
- aparte unzip functie maken en toch de juiste tijdelijke directory krijgen.
Parameters
----------
path : str | Path
directory name, can be a .zip file or the parent directory of subdir
ObsClass : type
class of the observations, e.g. GroundwaterObs or WaterlvlObs
subdir : str
subdirectory of dirname with data files
suffix : str
suffix of files in subdir that will be read
keep_all_obs : boolean, optional
add all observation points to the collection, even without data or
metadata
**kwargs: dict, optional
Extra arguments are passed to ObsClass.from_dino_file()
Returns
-------
obs_df : pd.DataFrame
collection of multiple point observations
"""
path = Path(path)
obs_list = []
def get_dino_obs(f: Union[str, FileIO]):
obs = ObsClass.from_dino(f, **kwargs)
if obs.metadata_available and (not obs.empty):
return obs
elif keep_all_obs:
return obs
else:
logging.info(f"not added to collection -> {f.name}")
return None
if path.suffix == ".zip":
with ZipFile(path) as zfile:
fnames = [x for x in zfile.namelist() if f"{subdir}/" in x]
if suffix:
if "0" in suffix:
raise Exception(f"Cant read dino files with _{suffix}")
fnames = [x for x in fnames if suffix in x]
if len(fnames) == 0:
raise FileNotFoundError(
f"no files were found in {path}, {subdir=} with {suffix=}"
)
for fname in fnames:
with zfile.open(fname) as fo:
obs = get_dino_obs(TextIOWrapper(fo))
if obs is not None:
obs_list.append(obs)
elif path.is_dir():
subpath = path / subdir
if suffix:
if "0" in suffix:
raise Exception(f"Cant read dino files with _{suffix}")
elif "*" not in suffix:
suffix = f"*{suffix}"
files = list(subpath.glob(suffix))
else:
files = list(subpath.iterdir())
if len(files) == 0:
raise FileNotFoundError(f"no files were found in {subpath} with {suffix=}")
for file in files:
obs = get_dino_obs(file)
if obs is not None:
obs_list.append(obs)
else:
raise ValueError("Path must either be a .zip or directory")
return obs_list