Source code for nwm_region_mgr.utils.hydrofabric_utils

"""Functions to handle hydrofabric data and operations."""

import logging
from pathlib import Path
from typing import Optional

import geopandas as gpd
from shapely.geometry import GeometryCollection, Point
from shapely.ops import unary_union

from nwm_region_mgr.utils.io_utils import read_table
from nwm_region_mgr.utils.validation_utils import check_columns_dataframe

logger = logging.getLogger(__name__)


[docs] def find_gages_within_buffer( gage_file: Path, gage_id_col: str, buffer: float, gdf: gpd.GeoDataFrame = None, hydrofabric_file: Path = None, id: str = None, ) -> tuple[list, list, gpd.GeoSeries]: """Find gages within a buffer zone around the hydrofabric. Args: gage_file: path to the donor gage file containing gage_id, longitude, and latitude gage_id_col: column name for gage ID in the gage file buffer: buffer distance in kilometers around the hydrofabric gdf: optional, a GeoDataFrame of hydrofabric polygons (if provided, hydrofabric_file is ignored) hydrofabric_file: optional, path to the hydrofabric file (.gpkg or .shp format) id: optional, identifier for the hydrofabric Returns: gages: list of gages within the buffered hydrofabric distances: list of distances from each gage to the hydrofabric centroid gdf_buffered: GeoDataFrame representing the buffered hydrofabric geometry """ # check if required columns are present in gage file required_columns = {"longitude", "latitude", gage_id_col} check_columns_dataframe(gage_file, required_columns) # read in lat/lon of all gages gages = read_table(gage_file) if gages.empty: raise ValueError(f"No gages found in the gage file: {gage_file}") # create a GeoDataFrame of gages with geometry as points gage_gdf = gpd.GeoDataFrame( gages, geometry=[Point(xy) for xy in zip(gages["longitude"], gages["latitude"])], crs="EPSG:4326", ) # read the hydrofabric file if gdf is None: if hydrofabric_file is None: raise ValueError("Either gdf or hydrofabric_file must be provided.") if not hydrofabric_file.exists(): raise FileNotFoundError(f"Hydrofabric file not found: {hydrofabric_file}") # read the hydrofabric file if hydrofabric_file.suffix.lower() == ".gpkg": # For GeoPackage files, read the 'divides' layer gdf = gpd.read_file(hydrofabric_file, layer="divides") elif hydrofabric_file.suffix.lower() == ".shp": # For Shapefile, read the file directly gdf = gpd.read_file(hydrofabric_file) else: raise ValueError( f"Unsupported hydrofabric file format: {hydrofabric_file.suffix}" ) # Project to meters for accurate distance calculations gage_gdf = gage_gdf.to_crs(epsg=3857) gdf = gdf.to_crs(epsg=3857) # remove invalid geometries gdf1 = gdf[gdf.is_valid] if gdf1.empty: # If no valid geometries, create a copy with buffered geometry if id: logger.debug( f"No valid geometries found in the hydrofabric for ID: {id}. " f"Creating a buffered geometry as workaround." ) else: logger.debug( "No valid geometries found in the hydrofabric. Creating a buffered geometry." ) gdf1 = gdf.copy() gdf1["geometry"] = gdf1.buffer(0) # dissolve all polygons into one before bufferring combined_geom = unary_union(gdf1.geometry) # Create a buffer around the VPU polygon gdf_buffered = combined_geom.buffer(buffer * 1000) # Find gages in the buffered VPU gages = gage_gdf[gage_gdf.geometry.within(gdf_buffered)][gage_id_col].tolist() # compute distances of the gages to the hydrofabric centroid gage_gdf = gage_gdf[gage_gdf[gage_id_col].isin(gages)] distances = gage_gdf.distance(combined_geom.centroid) # convert distances meters to kilometers and round to integer distances = distances / 1000 distances = distances.round(0).astype(int) if not gages: logger.debug( f"No gages found for {id}. Please check the gage file and hydrofabric file." ) return gages, distances, gdf_buffered
[docs] def area_weighted_average( gdf_target: gpd.GeoDataFrame, gdf_source: gpd.GeoDataFrame, value_col: str, target_id_col: str = "divide_id", crs_proj: str = "EPSG:5070", ) -> gpd.GeoDataFrame: """Map area-weighted average of `value_col` from source polygons to target polygons. The function computes the area-weighted average of a specified attribute from source polygons (gdf_source) and assigns it to target polygons (gdf_target) based on their spatial intersection. It works in both directions: coarse-to-fine and fine-to-coarse. Args: gdf_target: GeoDataFrame with target polygons gdf_source: GeoDataFrame with source polygons and the value to be averaged value_col: Name of the column in gdf_source to average target_id_col: Name of unique identifier column in gdf_target (default is 'divide_id') crs_proj: Projected CRS (in meters) for accurate area computation (default is 'EPSG:5070') Returns: gdf_target with a new column: `{value_col}_weighted` """ # Reproject to projected CRS gdf_target = gdf_target.to_crs(crs_proj).copy() gdf_source = gdf_source.to_crs(crs_proj).copy() # Ensure unique ID on target polygons if target_id_col not in gdf_target.columns: gdf_target = gdf_target.reset_index(drop=True) gdf_target[target_id_col] = gdf_target.index # Ensure valid geometries before overlay gdf_target["geometry"] = gdf_target.geometry.buffer(0) gdf_source["geometry"] = gdf_source.geometry.buffer(0) # Overlay to get intersections intersected = gpd.overlay( gdf_target[[target_id_col, "geometry"]], gdf_source[["geometry", value_col]], how="intersection", ) # Remove rows with unsupported GeometryCollection geometry intersected = intersected[ ~intersected.geometry.apply(lambda g: isinstance(g, GeometryCollection)) ] # Optionally explode multipart geometries intersected = intersected.explode(index_parts=True, ignore_index=True) # Compute area of intersection intersected["area"] = intersected.geometry.area # Weighted value = value * area intersected["weighted_value"] = intersected[value_col] * intersected["area"] # Sum weighted values and areas for each target polygon area_sum = intersected.groupby(target_id_col)["area"].sum() value_sum = intersected.groupby(target_id_col)["weighted_value"].sum() # Compute area-weighted average weighted_avg = value_sum / area_sum # Assign back to target GeoDataFrame new_col = f"{value_col}_weighted" gdf_target[new_col] = gdf_target[target_id_col].map(weighted_avg) return gdf_target