import datetime
import logging
import os
import xml.etree.ElementTree as etree
from io import StringIO
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
from .. import observation
from ..observation import Obs
logger = logging.getLogger(__name__)
[docs]def read_xml_fname(
fname: str,
ObsClass: Union[Obs, Dict[str, Obs]],
translate_dic: Optional[Dict[str, str]] = None,
low_memory: bool = True,
locationIds: Optional[List[str]] = None,
filterdict: Optional[Dict[str, List[str]]] = None,
return_events: bool = True,
keep_flags: Tuple[int] = (0, 1),
return_df: bool = False,
tags: Tuple[str] = ("series", "header", "event"),
remove_nan: bool = False,
**kwargs: dict, # unused
):
"""Read an xml filename into a list of observations objects.
Parameters
----------
fname : str
full path to file
ObsClass: Union[Obs, Dict[str, Obs]]
class of the observations, e.g. GroundwaterObs or WaterlvlObs
translate_dic : dic or None, optional
translate names from fews. If None this default dictionary is used:
{'locationId': 'monitoring_well'}.
low_memory : bool, optional
whether to use xml-parsing method with lower memory footprint,
default is True
locationIds : tuple or list of str, optional
list of locationId's to read from XML file, others are skipped.
If None (default) all locations are read.
filterdict : dict, optional
dictionary with tag name to apply filter to as keys, and list of
accepted names as dictionary values to keep in final result,
i.e. {"locationId": ["B001", "B002"]}
return_events : bool, optional
return all event-information in a DataFrame per location, instead of
just a Series (defaults to False). Overrules keep_flags kwarg.
keep_flags : list of ints, optional
keep the values with these flags (defaults to 0 and 1). Only used
when return_events is False.
tags : list of strings, optional
Select the tags to be parsed. Defaults to series, header and event
return_df : bool, optional
return a DataFame with the data, instead of two lists (default is
False)
remove_nan : boolean, optional
remove nan values from measurements, flag information about the
nan values is also lost, only used if low_memory=False
Returns
-------
list of ObsClass objects
list of timeseries stored in ObsClass objects
"""
if translate_dic is None:
translate_dic = {"locationId": "monitoring_well"}
if low_memory is True:
obs_list = iterparse_pi_xml(
fname,
ObsClass,
translate_dic=translate_dic,
locationIds=locationIds,
filterdict=filterdict,
return_events=return_events,
keep_flags=keep_flags,
return_df=return_df,
tags=tags,
)
else:
tree = etree.parse(fname)
root = tree.getroot()
obs_list = read_xml_root(
root,
ObsClass,
translate_dic=translate_dic,
locationIds=locationIds,
remove_nan=remove_nan,
)
return obs_list
[docs]def iterparse_pi_xml(
fname: str,
ObsClass: Union[Obs, Dict[str, Obs]],
translate_dic: Optional[Dict[str, str]] = None,
filterdict: Optional[Dict[str, List[str]]] = None,
locationIds: Optional[List[str]] = None,
return_events: bool = True,
keep_flags: Tuple[int] = (0, 1),
return_df: bool = False,
tags: Tuple[str] = ("series", "header", "event"),
):
"""Read a FEWS XML-file with measurements, memory efficient.
Parameters
----------
fname : str
full path to file
ObsClass: Union[Obs, Dict[str, Obs]],
class of the observations, e.g. GroundwaterObs or WaterlvlObs
translate_dic : dic or None, optional
translate names from fews. If None this default dictionary is used:
{'locationId': 'monitoring_well'}.
locationIds : tuple or list of str, optional
list of locationId's to read from XML file, others are skipped.
If None (default) all locations are read.
filterdict : dict, optional
dictionary with tag name to apply filter to as keys, and list of
accepted names as dictionary values to keep in final result,
i.e. {"locationId": ["B001", "B002"]}
return_events : bool, optional
return all event-information in a DataFrame per location, instead of
just a Series (defaults to False). Overrules keep_flags kwarg.
keep_flags : list of ints, optional
keep the values with these flags (defaults to 0 and 1). Only used
when return_events is False.
tags : list of strings, optional
Select the tags to be parsed. Defaults to series, header and event
return_df : bool, optional
return a DataFame with the data, instead of two lists (default is
False)
Returns
-------
df : pandas.DataFrame
a DataFrame containing the metadata and the series if 'return_df'
is True
obs_list : list of pandas Series
list of timeseries if 'return_df' is False
"""
from lxml.etree import iterparse
if translate_dic is None:
translate_dic = {"locationId": "monitoring_well"}
tags = ["{{http://www.wldelft.nl/fews/PI}}{}".format(tag) for tag in tags]
context = iterparse(fname, tag=tags)
# _, root = next(context)
header_list = []
obs_list = []
keep_flags = [str(flag) for flag in keep_flags]
for _, element in context:
if element.tag.endswith("header"):
header = {}
for h_attr in element:
tag = h_attr.tag.replace(
"{{{0}}}".format("http://www.wldelft.nl/fews/PI"), ""
)
if tag.startswith("locationId"):
logger.info(f"reading {h_attr.text}")
# if specific locations are provided only read those
if locationIds is not None and tag.startswith("locationId"):
loc = h_attr.text
if loc not in locationIds:
element.clear()
logger.info(f" ... skipping '{loc}', not in locationIds")
continue
if filterdict is not None:
for k, v in filterdict.items():
if tag.startswith(k):
attr = h_attr.text
if attr not in v:
element.clear()
logger.info(
f" ... skipping '{attr}' not "
f"in accepted values for '{k}'"
)
continue
if h_attr.text is not None:
header[tag] = h_attr.text
elif len(h_attr.attrib) != 0:
header[tag] = {**h_attr.attrib}
else:
header[tag] = None
events = []
elif element.tag.endswith("event"):
# if specific locations are provided only read those
if locationIds is not None:
if loc not in locationIds:
element.clear()
continue
if filterdict is not None:
skip = False
for (
k,
v,
) in filterdict.items():
if header.get(k, None) not in v:
skip = True
if skip:
element.clear()
continue
events.append({**element.attrib})
elif element.tag.endswith("series"):
# if specific locations are provided only read those
if locationIds is not None:
if loc not in locationIds:
element.clear()
continue
if filterdict is not None:
skip = False
for (
k,
v,
) in filterdict.items():
if header.get(k, None) not in v:
skip = True
if skip:
element.clear()
continue
if len(events) == 0:
if return_events:
ts = pd.DataFrame()
else:
ts = pd.Series()
else:
df = pd.DataFrame(events)
df.index = pd.to_datetime(
[d + " " + t for d, t in zip(df["date"], df["time"])],
errors="coerce",
)
df.drop(columns=["date", "time"], inplace=True)
if return_events:
df["value"] = pd.to_numeric(df["value"], errors="coerce")
df["flag"] = pd.to_numeric(df["flag"])
ts = df
else:
mask = df["flag"].isin(keep_flags)
ts = pd.to_numeric(df.loc[mask, "value"], errors="coerce")
o, header = _obs_from_meta(ts, header, translate_dic, ObsClass)
header_list.append(header)
obs_list.append(o)
# Free memory.
element.clear()
if return_df:
for h, s in zip(header_list, obs_list):
h["series"] = s
return pd.DataFrame(header_list)
else:
return obs_list
[docs]def read_xmlstring(
xmlstring: str,
ObsClass: Union[Obs, Dict[str, Obs]],
translate_dic: Optional[Dict[str, str]] = None,
filterdict: Optional[Dict[str, List[str]]] = None,
locationIds: Optional[List[str]] = None,
low_memory: bool = True,
remove_nan: bool = False,
):
"""Read xmlstring into an list of Obs objects. Xmlstrings are usually
obtained using a fews api.
Parameters
----------
xmlstring : str
xml string to be parsed. Typically from a fews api.
ObsClass: Union[Obs, Dict[str, Obs]]
class of the observations, e.g. GroundwaterObs or WaterlvlObs
translate_dic : dic or None, optional
translate names from fews. If None this default dictionary is used:
{'locationId': 'monitoring_well'}.
locationIds : tuple or list of str, optional
list of locationId's to read from XML file, others are skipped.
If None (default) all locations are read.
low_memory : bool, optional
whether to use xml-parsing method with lower memory footprint,
default is True
remove_nan : boolean, optional
remove nan values from measurements, flag information about the
nan values is also lost, only used if low_memory=False
Returns
-------
list of ObsClass objects
list of timeseries stored in ObsClass objects
"""
if translate_dic is None:
translate_dic = {"locationId": "monitoring_well"}
if low_memory:
obs_list = iterparse_pi_xml(
StringIO(xmlstring),
ObsClass,
translate_dic=translate_dic,
filterdict=filterdict,
locationIds=locationIds,
)
else:
root = etree.fromstring(xmlstring)
obs_list = read_xml_root(
root,
ObsClass,
translate_dic=translate_dic,
locationIds=locationIds,
remove_nan=remove_nan,
)
return obs_list
[docs]def read_xml_root(
root: etree.Element,
ObsClass: Union[Obs, Dict[str, Obs]],
translate_dic: Dict[str, str] = None,
locationIds: List[str] = None,
remove_nan: bool = False,
):
"""Read a FEWS XML-file with measurements, return list of ObsClass objects.
Parameters
----------
root : xml.etree.ElementTree.Element
root element of a fews xml
ObsClass: Union[Obs, Dict[str, Obs]],
class of the observations, e.g. GroundwaterObs or WaterlvlObs
translate_dic : dic or None, optional
translate names from fews. If None this default dictionary is used:
{'locationId': 'monitoring_well'}.
locationIds : tuple or list of str, optional
list of locationId's to read from XML file, others are skipped.
If None (default) all locations are read.
remove_nan : boolean, optional
remove nan values from measurements, flag information about the
nan values is also lost
Returns
-------
list of ObsClass objects
list of timeseries stored in ObsClass objects
"""
if translate_dic is None:
translate_dic = {"locationId": "monitoring_well"}
obs_list = []
for item in root:
if item.tag.endswith("series"):
header = {}
date = []
time = []
events = []
for subitem in item:
if subitem.tag.endswith("header"):
for subsubitem in subitem:
prop = subsubitem.tag.split("}")[-1]
val = subsubitem.text
if prop == "x" or prop == "y" or prop == "lat" or prop == "lon":
val = float(val)
header[prop] = val
if prop == "locationId":
logger.info(f"read {val}")
elif subitem.tag.endswith("event"):
date.append(subitem.attrib.pop("date"))
time.append(subitem.attrib.pop("time"))
events.append({**subitem.attrib})
# combine events in a dataframe
index = pd.to_datetime(
[d + " " + t for d, t in zip(date, time)], errors="coerce"
)
ts = pd.DataFrame(events, index=index)
if not ts.empty:
ts["value"] = ts["value"].astype(float)
if remove_nan:
ts.dropna(subset=["value"], inplace=True)
header["unit"] = "m NAP"
o, header = _obs_from_meta(ts, header, translate_dic, ObsClass)
if locationIds is not None:
if header["monitoring_well"] in locationIds:
obs_list.append(o)
else:
obs_list.append(o)
return obs_list
def _obs_from_meta(
ts: pd.DataFrame,
header: Dict[str, str],
translate_dic: Dict[str, str],
ObsClass: Union[Obs, Dict[str, Obs]],
):
"""Internal function to convert timeseries and header into Obs objects.
Parameters
----------
ts : pd.DataFrame
timeseries data.
header : dictionary
metadata.
translate_dic : dictionary
translate dictionary.
ObsClass: Union[Obs, Dict[str, Obs]],
class of the observations, e.g. GroundwaterObs or WaterlvlObs
Returns
-------
o : GroundwaterObs or WaterlvlObs
hyrdopandas observation object.
header : dictionary
metadata.
"""
for key, item in translate_dic.items():
header[item] = header.pop(key)
if "x" in header.keys():
x = float(header["x"])
else:
x = np.nan
if "y" in header.keys():
y = float(header["y"])
else:
y = np.nan
if "units" in header.keys():
unit = str(header["units"])
else:
unit = np.nan
if np.isnan(x) or np.isnan(y):
metadata_available = False
else:
metadata_available = True
if "parameterId" in header:
parid = header["parameterId"]
name = header["monitoring_well"] + "_" + parid
else:
name = header["monitoring_well"]
if isinstance(ObsClass, dict):
if parid in ObsClass.keys():
ObsC = ObsClass[parid]
else:
ObsC = Obs
else:
ObsC = ObsClass
if ObsC in (observation.WaterlvlObs,):
o = ObsC(
ts,
x=x,
y=y,
unit=unit,
meta=header,
name=name,
monitoring_well=header["monitoring_well"],
metadata_available=metadata_available,
source="FEWS",
)
elif ObsC in (observation.GroundwaterObs,):
if "z" in header.keys():
z = float(header["z"])
else:
z = np.nan
o = ObsC(
ts,
x=x,
y=y,
ground_level=z,
unit=unit,
meta=header,
name=name,
monitoring_well=header["monitoring_well"],
metadata_available=metadata_available,
source="FEWS",
)
elif ObsC in (
observation.MeteoObs,
observation.PrecipitationObs,
observation.EvaporationObs,
):
o = ObsC(
ts,
x=x,
y=y,
unit=unit,
meta=header,
name=name,
meteo_var=parid,
source="FEWS",
)
else:
o = ObsC(ts, x=x, y=y, unit=unit, meta=header, name=name, source="FEWS")
return o, header
[docs]def write_pi_xml(obs_coll, fname: str, timezone: float = 1.0, version: str = "1.24"):
"""Write TimeSeries object to PI-XML file.
Parameters
----------
fname: path
path to XML file
"""
assert fname.endswith(".xml"), "Output file should have '.xml' extension!"
# first line of XML file
line0 = '<?xml version="1.0" encoding="UTF-8"?>\n'
# some definitions for timeseries XML file
NS = r"http://www.wldelft.nl/fews/PI"
FS = r"http://www.wldelft.nl/fews/fs"
XSI = r"http://www.w3.org/2001/XMLSchema-instance"
schemaLocation = (
r"http://fews.wldelft.nl/schemas/version1.0" r"/Pi-schemas/pi_timeseries.xsd"
)
timeseriesline = (
'<TimeSeries xmlns="{NS}" xmlns:xsi="{XSI}" '
'xsi:schemaLocation="{NS} {schema}" version="{version}" '
'xmlns:fs="{FS}">\n'
)
# line templates
paramline = "<{tag}>{param}</{tag}>\n"
# write file
with open(fname, "w") as f:
f.write(line0)
f.write(
timeseriesline.format(
NS=NS, FS=FS, XSI=XSI, schema=schemaLocation, version=version
)
)
tzline = "\t" + paramline.format(tag="timeZone", param=timezone)
f.write(tzline)
for o in obs_coll.obs:
# start series
start = "\t" + "<series>\n"
f.write(start)
# write header
hlines = []
hstart = 2 * "\t" + "<header>\n"
hlines.append(hstart)
for htag, hval in o.meta.items():
if htag.endswith("Date"):
try:
hdate = hval.strftime("%Y-%m-%d")
htime = hval.strftime("%H:%M:%S")
except AttributeError as e:
if htag.startswith("start"):
hdate = o.index[0].strftime("%Y-%m-%d")
htime = o.index[0].strftime("%H:%M:%S")
elif htag.startswith("end"):
hdate = o.index[-1].strftime("%Y-%m-%d")
htime = o.index[-1].strftime("%H:%M:%S")
else:
raise (e)
hline = '<{tag} date="{date}" time="{time}"/>\n'.format(
tag=htag, date=hdate, time=htime
)
elif htag.endswith("timeStep"):
hline = '<{tag} unit="{unit}"/>\n'.format(tag=htag, unit=hval)
else:
hline = paramline.format(tag=htag, param=hval)
hlines.append(3 * "\t" + hline)
hlines.append(2 * "\t" + "</header>\n")
f.writelines(hlines)
# write timeseries
dates = o.reset_index()["index"].apply(
lambda s: datetime.datetime.strftime(s, "%Y-%m-%d")
)
times = o.reset_index()["index"].apply(
lambda s: datetime.datetime.strftime(s, "%H:%M:%S")
)
# set date and time attributes
events = (
2 * "\t" + '<event date="' + dates.values + '" time="' + times.values
)
# loop through columns and add to event
for icol in o.columns:
val = o[icol].astype(str)
events += '" {}="'.format(icol) + val.values
# close event
events += '"/>\n'
# write to file
f.writelines(events)
# end series
f.write("\t" + "</series>\n")
# end Timeseries
f.write("</TimeSeries>\n")
[docs]def read_xml_filelist(
fnames: List[str],
ObsClass: Union[Obs, Dict[str, Obs]],
directory: Optional[str] = None,
locations: Optional[List[str]] = None,
translate_dic: Optional[Dict[str, str]] = None,
filterdict: Optional[Dict[str, List[str]]] = None,
remove_nan: bool = False,
low_memory: bool = True,
**kwargs: dict,
):
"""Read a list of xml files into a list of observation objects.
Parameters
----------
fnames : TYPE
DESCRIPTION.
ObsClass: Union[Obs, Dict[str, Obs]]
class of the observations, e.g. GroundwaterObs or WaterlvlObs
directory : TYPE, optional
DESCRIPTION. The default is None.
locations : tuple or list of str, optional
list of locationId's to read from XML file, others are skipped.
If None (default) all locations are read.
translate_dic : dic or None, optional
translate names from fews. If None this default dictionary is used:
{'locationId': 'monitoring_well'}.
filterdict : dict, optional
dictionary with tag name to apply filter to as keys, and list of
accepted names as dictionary values to keep in final result,
i.e. {"locationId": ["B001", "B002"]}
remove_nan : boolean, optional
remove nan values from measurements, flag information about the
nan values is also lost, only used if low_memory=False
low_memory : bool, optional
whether to use xml-parsing method with lower memory footprint,
default is True
Returns
-------
list of ObsClass objects
list of timeseries stored in ObsClass objects
"""
if translate_dic is None:
translate_dic = {"locationId": "monitoring_well"}
obs_list = []
nfiles = len(fnames)
for j, ixml in enumerate(fnames):
# print message
logger.info(f"{j+1}/{nfiles} read {ixml}")
# join directory to filename if provided
if directory is None:
fullpath = ixml
else:
fullpath = os.path.join(directory, ixml)
# read xml fname
obs_list += read_xml_fname(
fullpath,
ObsClass,
translate_dic=translate_dic,
filterdict=filterdict,
low_memory=low_memory,
locationIds=locations,
remove_nan=remove_nan,
**kwargs,
)
return obs_list
[docs]def get_fews_pid(name: str) -> Dict[str, Obs]:
"""Get matching ParameterId's and HydroPandas Observation Classes
Parameters
----------
name : str
Waterboard name
Returns
-------
Dict[str, Obs]
Dictonary with ParameterId and the resulting Observation Class
"""
from ..data.fews_parameterid import pid
pid_sel = pid[name.lower()]
return {key: getattr(observation, value) for (key, value) in pid_sel.items()}