Source code for hydropandas.extensions.geo

import numpy as np
import pandas as pd

from . import accessor


[docs]@accessor.register_obscollection_accessor("geo") class GeoAccessor: def __init__(self, oc_obj): self._obj = oc_obj
[docs] def get_bounding_box(self, xcol="x", ycol="y", buffer=0): """returns the bounding box of all observations. Parameters ---------- xcol : str, optional column name with x values ycol : str, optional column name with y values buffer : int or float, optional add a buffer around the bouding box from the observations Returns ------- xmin, ymin, xmax, ymax coordinates of bouding box """ xmin = self._obj[xcol].min() - buffer xmax = self._obj[xcol].max() + buffer ymin = self._obj[ycol].min() - buffer ymax = self._obj[ycol].max() + buffer return (xmin, ymin, xmax, ymax)
[docs] def get_extent(self, xcol="x", ycol="y", buffer=0): """returns the extent of all observations. Parameters ---------- xcol : str, optional column name with x values ycol : str, optional column name with y values buffer : int or float, optional add a buffer around the bouding box from the observations Returns ------- xmin, xmax, ymin, ymax coordinates of bouding box """ xmin = self._obj[xcol].min() - buffer xmax = self._obj[xcol].max() + buffer ymin = self._obj[ycol].min() - buffer ymax = self._obj[ycol].max() + buffer return (xmin, xmax, ymin, ymax)
[docs] def set_lat_lon(self, in_epsg="epsg:28992", out_epsg="epsg:4326", add_to_meta=True): """create columns with lat and lon values of the observation points. Parameters ---------- in_epsg : str, optional epsg code of current x and y attributes, default (RD new) out_epsg : str, optional epsg code of desired output, default lat/lon add_to_meta : bool, optional if True the lat and lon values are added to the observation meta dictionary. The default is True. Returns ------- None. """ df_lat_lon = self._obj.geo.get_lat_lon(in_epsg, out_epsg) for iname in df_lat_lon.index: self._obj._set_metadata_value( iname, "lat", df_lat_lon.loc[iname, "lat"], add_to_meta ) self._obj._set_metadata_value( iname, "lon", df_lat_lon.loc[iname, "lon"], add_to_meta )
[docs] def get_lat_lon(self, in_epsg="epsg:28992", out_epsg="epsg:4326"): """get lattitude and longitude from x and y attributes. Parameters ---------- in_epsg : str, optional epsg code of current x and y attributes, default (RD new) out_epsg : str, optional epsg code of desired output, default lat/lon Returns ------- pandas.DataFrame with columns 'lat' and 'lon' """ df_lat_lon = pd.DataFrame(index=self._obj.index, columns=["lat", "lon"]) for iname in self._obj.index: o = self._obj.loc[iname, "obs"] df_lat_lon.loc[iname, ["lat", "lon"]] = o.geo.get_lat_lon(in_epsg, out_epsg) return df_lat_lon
[docs] def get_nearest_point( self, obs_collection2=None, gdf2=None, xcol_obs1="x", ycol_obs1="y", xcol_obs2="x", ycol_obs2="y", ): """get nearest point of another obs collection for each point in the current obs collection. Parameters ---------- obs_collection2 : ObsCollection, optional collection of observations of which the nearest point is found gdf2 : GeoDataFrame, optional dataframe to look for nearest observation point xcol_obs1 : str, optional x column in self._obj used to get geometry ycol_obs1 : str, optional y column in self._obj used to get geometry xcol_obs2 : str, optional x column in obs_collection2 used to get geometry ycol_obs2 : str, optional y column in self._obj used to get geometry Returns ------- pandas.DataFrame with columns 'nearest point' and 'distance nearest point' """ from shapely.ops import nearest_points gdf1 = self._obj.to_gdf(xcol=xcol_obs1, ycol=ycol_obs1) if obs_collection2 is not None: gdf2 = obs_collection2.to_gdf(xcol=xcol_obs2, ycol=ycol_obs2) elif gdf2 is None: raise ValueError("obs_collecction2 or gdf2 should be defined") pts_gdf2 = gdf2.geometry.unary_union def nearest_point(point_gdf1, pts=pts_gdf2): # find the nearest point and return the corresponding Place value nearest_point_gdf2 = nearest_points(point_gdf1, pts_gdf2)[1] nearest = gdf2[gdf2.geometry == nearest_point_gdf2] return nearest.index[0] def distance_nearest_point(point_gdf1, pts=pts_gdf2): # find the nearest point and return the corresponding Place value nearest_point_gdf2 = nearest_points(point_gdf1, pts_gdf2)[1] distance = point_gdf1.distance(nearest_point_gdf2) return distance gdf1["nearest point"] = gdf1.apply( lambda row: nearest_point(row.geometry), axis=1 ) gdf1["distance nearest point"] = gdf1.apply( lambda row: distance_nearest_point(row.geometry), axis=1 ) return gdf1[["nearest point", "distance nearest point"]]
def _get_nearest_geometry( self, gdf=None, xcol_obs="x", ycol_obs="y", geometry_type="polygon", multiple_geometries="error", ): """get nearest geometry for each point in the obs collection. Function works for line and polygon geometries. Parameters ---------- gdf : GeoDataFrame dataframe with polygon features xcol_obs : str, optional x column in self._obj used to get geometry ycol_obs : str, optional y column in self._obj used to get geometry geometry_type : str, optional geometry type, can be 'polygon' or 'line'. multiple_geometries : str, optional keyword on how to deal with multiple geometries being nearest. Options are: 'error' -> raise a ValueError 'keep_all' -> return the indices of multiple geometries as a string seperated by a comma 'keep_first' -> return the index of the first geometry Returns ------- pandas.DataFrame with columns 'nearest geometry' and 'distance nearest geometry' """ gdf_obs = self._obj.to_gdf(xcol=xcol_obs, ycol=ycol_obs) gdf = gdf.copy() for i, point in gdf_obs.geometry.items(): distances = [point.distance(pol) for pol in gdf.geometry.values] if (np.array(distances) == np.min(distances)).sum() > 1: if multiple_geometries == "error": raise ValueError(f"multiple {geometry_type}s are nearest") elif multiple_geometries == "keep_all": ids = [] for i_min in np.where(np.array(distances) == np.min(distances))[0]: ids.append(gdf.index[i_min]) gdf_obs.loc[i, f"nearest {geometry_type}"] = ", ".join(ids) gdf_obs.loc[i, f"distance nearest {geometry_type}"] = np.min( distances ) elif multiple_geometries == "keep_first": gdf_obs.loc[i, f"nearest {geometry_type}"] = gdf.iloc[ np.argmin(distances) ].name gdf_obs.loc[i, f"distance nearest {geometry_type}"] = np.min( distances ) else: raise ValueError( "invalid value for multiple_geometries ->" f"{multiple_geometries}" ) else: gdf_obs.loc[i, f"nearest {geometry_type}"] = gdf.iloc[ np.argmin(distances) ].name gdf_obs.loc[i, f"distance nearest {geometry_type}"] = np.min(distances) return gdf_obs[ [f"nearest {geometry_type}", f"distance nearest {geometry_type}"] ]
[docs] def get_nearest_line( self, gdf=None, xcol_obs="x", ycol_obs="y", multiple_lines="error" ): """get nearest line for each point in the obs collection. Function calls the nearest_polygon function. Parameters ---------- gdf : GeoDataFrame dataframe with line features xcol_obs : str, optional x column in self._obj used to get geometry ycol_obs : str, optional y column in self._obj used to get geometry multiple_lines : str, optional keyword on how to deal with multiple lines being nearest. Options are: 'error' -> raise a ValueError 'keep_all' -> return the indices of multiple lines as a string seperated by a comma 'keep_first' -> return the index of the first line Returns ------- pandas.DataFrame with columns 'nearest polygon' and 'distance nearest polygon' """ return self._get_nearest_geometry( gdf=gdf, xcol_obs=xcol_obs, ycol_obs=ycol_obs, multiple_geometries=multiple_lines, geometry_type="line", )
[docs] def get_nearest_polygon( self, gdf=None, xcol_obs="x", ycol_obs="y", multiple_polygons="error" ): """get nearest polygon for each point in the obs collection. Function also works for lines instead of polygons. Parameters ---------- gdf : GeoDataFrame dataframe with polygon features xcol_obs : str, optional x column in self._obj used to get geometry ycol_obs : str, optional y column in self._obj used to get geometry multiple_polygons : str, optional keyword on how to deal with multiple polygons being nearest. Options are: 'error' -> raise a ValueError 'keep_all' -> return the indices of multiple polygons as a string seperated by a comma 'keep_first' -> return the index of the first polygon Returns ------- pandas.DataFrame with columns 'nearest polygon' and 'distance nearest polygon' """ return self._get_nearest_geometry( gdf=gdf, xcol_obs=xcol_obs, ycol_obs=ycol_obs, multiple_geometries=multiple_polygons, geometry_type="polygon", )
[docs] def get_distance_to_point(self, point, xcol="x", ycol="y"): """get distance of every observation to a point. Parameters ---------- point : shapely.geometry.point.Point point geometry xcol : str, optional x column in self._obj used to get x coordinates ycol : str, optional y column in self._obj used to get y coordinates Returns ------- pd.Series distance to the point for every observation in self._obj """ gdf = self._obj[[xcol, ycol]].to_gdf(xcol=xcol, ycol=ycol) return gdf.distance(point)
[docs] def within_extent(self, extent, inplace=False): """Slice ObsCollection by extent. Parameters ---------- extent : tuple format (xmin, xmax, ymin, ymax), default dis.sr.get_extent() format Returns ------- new_oc : obs_collection.ObsCollection ObsCollection with observations within extent """ new_oc = self._obj[ (self._obj.x > extent[0]) & (self._obj.x < extent[1]) & (self._obj.y > extent[2]) & (self._obj.y < extent[3]) ] if inplace: self._obj._update_inplace(new_oc) else: return new_oc
[docs] def within_polygon(self, gdf=None, shapefile=None, inplace=False, **kwargs): """Slice ObsCollection by checking if points are within a shapefile. Parameters ---------- gdf : GeoDataFrame, optional geodataframe containing a single polygon shapefile : str, optional Not yet implemented inplace : bool, default False Modify the ObsCollection in place (do not create a new object). **kwargs : kwargs will be passed to the self._obj.to_gdf() method Returns ------- new_oc : obs_collection.ObsCollection ObsCollection with observations within polygon """ if gdf is not None: if gdf.shape[0] != 1: raise NotImplementedError("cannot handle zero or multiple polygons") gdf_oc = self._obj.to_gdf(**kwargs) new_oc = self._obj.loc[gdf_oc.within(gdf.geometry.values[0])] elif shapefile is not None: raise NotImplementedError("shapefiles are not yet implemented") else: raise ValueError("shapefile or gdf must be specified") if inplace: self._obj._update_inplace(new_oc) return else: return new_oc
[docs]@accessor.register_obs_accessor("geo") class GeoAccessorObs: def __init__(self, obs): self._obj = obs
[docs] def get_lat_lon(self, in_epsg="epsg:28992", out_epsg="epsg:4326"): """get lattitude and longitude from x and y attributes. Parameters ---------- in_epsg : str, optional epsg code of current x and y attributes, default (RD new) out_epsg : str, optional epsg code of desired output, default lat/lon Returns ------- lon, lat : longitude and lattitude of x, y coordinates """ from pyproj import Transformer transformer = Transformer.from_crs(in_epsg, out_epsg) if np.isnan(self._obj.x) or np.isnan(self._obj.y): return np.nan, np.nan return transformer.transform(self._obj.x, self._obj.y)