Source code for nwm_region_mgr.formreg.select_formulation

"""Select formulations for regionalization.

This module provides functions to select formulations based on summary scores of calibrated basins,
as well as formulation costs if provided in the configuration.

Functions:
    - _get_gages_with_shared_formulations: Identify the formulations shared by a number of gages.
    - _find_gages_nearest_neighbor: Find gages for a given HUC ID using the nearest neighbor method.
    - _find_gages_upscaling: Find gages for a given HUC ID using the upscaling method.
    - _find_calibration_gages: Find gages for a given HUC ID in the summary scores DataFrame.
    - _get_formulation_costs: Retrieve formulation costs from the configuration.
    - _select_formulation_given_score: Select a formulation for each spatial unit based on the method specified.
    - _identify_best_formulation_per_gage: Identify the best formulation for each gage based on summary scores and costs.
    - select_formulation_donors_only: Select formulations for donor basins only.
    - select_formulation_all: Select formulations for all basins.
    - save_formulation_results: Save the formulation selection results to file.
    - plot_formulation_results: Plot the formulation selection results.
    - select_formulation: Head function to select formulations for each spatial unit in the VPU.

"""

import logging
from itertools import combinations
from pathlib import Path
from typing import List, Optional, Tuple

import fiona
import geopandas as gpd
import numpy as np
import pandas as pd

from nwm_region_mgr.formreg import config_schema as cs
from nwm_region_mgr.utils import (
    check_columns_dataframe,
    find_gages_within_buffer,
    read_table,
)

logger = logging.getLogger(__name__)


def _get_gages_with_shared_formulations(
    df: pd.DataFrame,
    min_gages: int = 3,
) -> Tuple[bool, List[str], List[str]]:
    """Identify the largest formulation set shared by at least `min_gages` gages.

    Args:
        df (pd.DataFrame): Must contain 'gage_id' and 'formulation' columns.
        min_gages (int): Minimum number of gages that must share the same formulations.

    Returns:
        (True, [gages], [shared_formulations]) if found, else (False, [], [])

    """
    # Build mapping from gage_id -> set of formulations
    gage_to_formulations = df.groupby("gage_id")["formulation"].apply(
        lambda x: frozenset(x)
    )

    # All unique gage_ids and their formulation sets
    gage_ids = list(gage_to_formulations.index)
    gage_form_sets = gage_to_formulations.tolist()

    # Step 1: Get all unique formulations across gages
    all_formulations = set().union(*gage_form_sets)

    # Step 2: Loop through combinations of formulations, largest sets first
    for r in range(len(all_formulations), 0, -1):  # Start from largest combos
        for combo in combinations(all_formulations, r):
            combo_set = set(combo)
            matching_gages = [
                gage_ids[i]
                for i, g_form in enumerate(gage_form_sets)
                if combo_set.issubset(g_form)
            ]
            if len(matching_gages) >= min_gages:
                logger.debug(
                    f"Found {len(matching_gages)} gages ({matching_gages}) sharing formulations: {combo_set}."
                )
                return True, matching_gages, list(combo_set)

    return False, [], []


def _find_gages_nearest_neighbor(
    huc_id: str,
    df: pd.DataFrame,
    gage_file: Path,
    gage_id_col: str,
    min_gages: int,
    gdf: gpd.GeoDataFrame,
) -> tuple[list, list, list]:
    """Find gages for the given HUC ID in the DataFrame.

    Args:
        huc_id (str): HUC ID to find gages for.
        df (pd.DataFrame): DataFrame of summary scores.
        gage_file (Path): Path to the donor gage file containing gage_id, longitude, and latitude.
        gage_id_col (str): Column name for gage ID in the gage file.
        min_gages (int): Minimum number of gages required for each spatial unit.
        gdf (gpd.GeoDataFrame): GeoDataFrame of hydrofabric (HUC12) polygons.

    Returns:
        tuple: A tuple containing a list of calibration gage IDs found for the HUC ID,  distances of the gages
            to the HUC centroid, and the shared formulations.

    """
    # get all calibration gages with valid formulation and summary scores
    gages_calib = (
        df[~df["formulation"].isna() & ~df["summary_score"].isna()][gage_id_col]
        .unique()
        .tolist()
    )

    # iteratively find the nearest neighbor gages for the given HUC ID, increasing the neighborhood size
    # by 100km each time until the minimum number of gages is met
    buffer = 100  # initial buffer size in kilometers
    gages = []
    max_buffer = 2000
    while len(gages) < min_gages and buffer <= max_buffer:
        # find gages within the buffer around huc_id
        gages, distances, _ = find_gages_within_buffer(
            gage_file, gage_id_col, buffer, gdf=gdf, id=huc_id
        )

        # filter gages to only those that are calibrated, and filter distances accordingly
        filtered = [(g, d) for g, d in zip(gages, distances) if g in gages_calib]
        if filtered:
            gages, distances = map(list, zip(*filtered))
        else:
            gages, distances = [], []

        # if there are enough gages, check if they share the same formulations
        found_gages = False
        if len(gages) >= min_gages:
            found_gages, gages, formulations = _get_gages_with_shared_formulations(
                df[df[gage_id_col].isin(gages)], min_gages=min_gages
            )

        # if no gages are found, increase the buffer size and try again
        if not found_gages:
            logger.debug(
                f"{huc_id}: Not enough gages found within {buffer} km buffer. Increasing buffer size by 100km."
            )
            buffer += 100  # increase buffer size by 100 km
        else:
            logger.debug(
                f"{huc_id}: Found {len(gages)} gages ({gages}) within {buffer} km buffer."
            )
            break

    else:
        # if while loop exits normally (len(gages) >= min_gages or buffer > max_buffer)
        if len(gages) < min_gages:
            logger.warning(
                f"Not enough calibrated gages found for {huc_id} after increasing buffer "
                f"to {buffer - 100} km. Minimum required is {min_gages}."
            )

    return gages, distances, formulations


def _find_gages_upscaling(
    huc_id: str, df: pd.DataFrame, gage_id_col: str, min_gages: int
) -> tuple[list, str, list]:
    """Find gages for the given HUC ID in the DataFrame.

    Args:
        huc_id (str): HUC ID to find gages for.
        df (pd.DataFrame): DataFrame of summary scores.
        gage_id_col (str): Column name for gage ID in the gage file.
        min_gages (int): Minimum number of gages required for each spatial unit.

    Returns:
        tuple: A tuple containing a list of calibration gage IDs found for the HUC ID, the HUC level,
        and the shared formulations.

    """
    # define valid HUC levels
    valid_huc_levels = ["huc2", "huc4", "huc6", "huc8", "huc10", "huc12"]
    valid_huc_levels.reverse()

    # make sure current huc level is in the valid HUC levels
    current_huc_level = "huc" + str(len(huc_id))
    if current_huc_level not in valid_huc_levels:
        msg = f"Invalid current HUC level: {current_huc_level}. Valid levels are: {', '.join(valid_huc_levels)}."
        logger.error(msg)
        raise ValueError(msg)

    gages = []
    formulations = []
    huc_level = current_huc_level  # Default to current if nothing better is found

    # start from current huc level, loop through valid HUC levels to ensure sufficient calibrated gages
    for _, huc_level in enumerate(
        valid_huc_levels[valid_huc_levels.index(current_huc_level) :]
    ):
        # count the number of unique gages in the new HUC level
        huc_digit = int(huc_level.replace("huc", ""))
        gages = (
            df[df["huc_id"].str[:huc_digit] == huc_id[:huc_digit]][gage_id_col]
            .unique()
            .tolist()
        )

        # remove NaN values from gages
        gages = [gage for gage in gages if pd.notna(gage)]
        ngage = len(gages)

        # if there are enough gages, check if they share the same formulations
        if ngage >= min_gages:
            found_gages, gages, formulations = _get_gages_with_shared_formulations(
                df[df[gage_id_col].isin(gages)], min_gages=min_gages
            )

            if not found_gages:
                logger.debug(
                    f"{huc_id}: upscaling HUC level from {current_huc_level} to {huc_level} "
                )
                continue
            else:
                # logger.debug(f"{huc_id}: found {len(gages)} gages ({gages}) at {huc_level} level.")
                break

    return gages, huc_level, formulations


def _find_calibration_gages(
    huc_id: str, df: pd.DataFrame, config: cs.Config, gdf: gpd.GeoDataFrame
) -> tuple[str, list, list, list]:
    """Find gages for the given HUC ID in the DataFrame.

    Args:
        huc_id (str): HUC ID to find gages for.
        df (pd.DataFrame): DataFrame of summary scores.
        config (cs.Config): Configuration object containing settings.
        gdf (gpd.GeoDataFrame): GeoDataFrame of hydrofabric (HUC12) polygons.

    Returns:
        tuple: A tuple containing the HUC level, the list of calibration gage IDs found for the HUC ID,
            list of corresponding distances, and the list of shared formulations.

    """
    # get configuration settings
    gage_id_col = getattr(
        config.general.id_col, "gage", "gage_id"
    )  # column name for gage ID in the DataFrame
    nmin_gages = (
        config.spatial_unit.nmin_calib_basin
    )  #  minimum number of calibration gages required
    method = (
        config.spatial_unit.basin_fill_method.lower()
    )  # method to find additional calibration gages if needed
    gage_file = config.general.donor_gage_file  # path to the donor gage file

    # check if the huc_id is valid
    if not isinstance(huc_id, str) or len(huc_id) < 2:
        msg = f"Invalid HUC ID: {huc_id}. It should be a string with at least 2 characters."
        logger.error(msg)
        raise ValueError(msg)

    # remove rows with invalid gage_id_col, formulation, and summary_score columns in the DataFrame
    df = df[
        ~df[gage_id_col].isna()
        & ~df["formulation"].isna()
        & ~df["summary_score"].isna()
    ]

    # check number of calibration gages within the current huc_id
    gages0 = df[df["huc_id"] == huc_id][gage_id_col].unique().tolist()
    gages0 = [gage for gage in gages0 if pd.notna(gage)]  # remove NaN values

    if len(gages0) >= nmin_gages:
        found_gages, gages0, formulations = _get_gages_with_shared_formulations(
            df[df[gage_id_col].isin(gages0)], min_gages=nmin_gages
        )

    # if there are enough gages at the current huc_id level, return them
    if len(gages0) >= nmin_gages:
        logger.debug(
            f"{huc_id}: found {len(gages0)} calibration gages ({gages0}) at the current level. No need to find additional gages."
        )
        huc_level = "huc" + str(len(huc_id))
        return huc_level, gages0, [np.nan] * len(gages0), formulations
    else:
        logger.debug(
            f"{huc_id}: found only {len(gages0)} calibration gages ({gages0}) at the current level. "
            f"Need to find additional gages using the {method} method."
        )

    def nearest_neighbor_fallback():
        logger.warning(
            f"Not enough calibration gages found for {huc_id} after upscaling. "
            f"Found {len(gages0)} gages ({gages0}), minimum required is {nmin_gages}. "
            f"Use nearest-neighbor method for {huc_id} instead."
        )
        gages_nn, dists_nn, formulations = _find_gages_nearest_neighbor(
            huc_id, df, gage_file, gage_id_col, nmin_gages, gdf
        )
        return gages_nn, dists_nn, "huc" + str(len(huc_id)), formulations

    if method == "upscaling":
        # find calibration gages using upscaling method
        gages, huc_level, formulations = _find_gages_upscaling(
            huc_id, df, gage_id_col, nmin_gages
        )
        dists = [np.nan] * len(gages)
        if len(gages) < nmin_gages:
            gages, dists, huc_level, formulations = nearest_neighbor_fallback()

    elif method == "nearest-neighbor":
        # find calibration gages using nearest neighbor
        gages, dists, formulations = _find_gages_nearest_neighbor(
            huc_id, df, gage_file, gage_id_col, nmin_gages, gdf
        )
        huc_level = "huc" + str(
            len(huc_id)
        )  # HUC level is determined by the length of huc_id
    else:
        # raise error if the basin fill method is not recognized
        msg = f"Unknown basin fill method: {method}. Supported methods are 'upscaling' and 'nearest-neighbor'."
        logger.error(msg)
        raise ValueError(msg)

    return huc_level, gages, dists, formulations


def _get_formulation_costs(
    config: cs.FormulationCostConfig,
) -> Optional[dict[str, float]]:
    """Get formulation costs from the configuration.

    Args:
        config : cs.FormulationCostConfig
            Configuration object containing formulation cost settings.

    Returns:
        Optional[dict[str, float]]
            Dictionary of formulation costs, keyed by formulation name. If no costs are defined, returns None.

    """
    if config.file:
        # check "formulation" and "cost" columns in the cost file
        check_columns_dataframe(
            config.file,
            columns=["formulation", "cost"],
        )
        # read the cost file
        cost_df = read_table(config.file)
        if not cost_df.empty:
            return dict(zip(cost_df["formulation"], cost_df["cost"]))
        else:
            logger.warning(
                f"No costs found in the file: {config.file}. Returning None."
            )
            return None
    elif config.costs:
        return config.costs
    else:
        logger.warning(
            "No formulation costs defined in the configuration. Returning None."
        )

    return None


def _select_formulation_given_score(
    df: pd.DataFrame,
    method: str = "basin",
    type: str = "total_score",
    id_col: dict[str, str] = {"gage": "gage_id", "divide": "divide_id"},
) -> pd.DataFrame:
    """Compute total score for each spatial unit based on the method specified.

    Args:
        df : pd.DataFrame
            DataFrame containing summary scores for each formulation and calibrated basin.
        method : str, optional
            Method to compute total score, either 'basin' or 'divide', by default 'basin'.
        type : str, optional
            Type of total score to compute, either 'total_score' or 'average_score', by default 'total_score'.
        id_col : dict[str, str], optional
            Dictionary mapping spatial unit type to its identifier column name,
            by default {"gage": "gage_id", "divide": "divide_id"}.

    Returns:
        pd.DataFrame
            DataFrame with total scores computed for each spatial unit.

    """
    # determine the column name for the type of subdivision to compute total/average score
    col1 = (
        id_col.get("gage", "gage_id")
        if type == "basin" and "gage_id" in df.columns
        else id_col.get("divide", "divide_id")
    )

    df1 = df[[col1, "formulation", "summary_score", "cost"]].copy()

    # remove duplicate rows
    df1 = df1.drop_duplicates(subset=[col1, "formulation"])

    # drop rows with NaN values in the summary_score column
    df1 = df1.dropna(subset=["summary_score"])

    # identify best formulation(s)
    if method == "total_score":
        # Sum of summary_score per formulation
        df1.loc[:, method] = (
            df1.groupby(["formulation"])["summary_score"].transform("sum").round(2)
        )

    elif method == "average_score":
        # Average summary_score per formulation
        df1.loc[:, method] = df1.groupby(["formulation"])["summary_score"].transform(
            "mean"
        )

    else:
        msg = f"Unknown method: {method}. Supported methods are 'total_score' and 'average_score'."
        logger.error(msg)
        raise ValueError(msg)

    # keep only the best formulation(s) with the highest total or average score
    df1 = df1[df1[method] == df1[method].max()]

    # if there are multiple formulations with the same score, choose the one that has the lowest cost
    if df1.shape[0] > 1:
        df1 = df1[df1["cost"] == df1["cost"].min()]

    # drop col1
    df1 = df1.drop(columns=[col1])

    # remove duplicates
    df1 = df1.drop_duplicates(subset=["formulation", method])

    return df1


def _identify_best_formulation_per_gage(
    df_score: pd.DataFrame,
    gage_id_col: str,
    tolerance: float = 0.05,
    cost_dict: Optional[dict] = None,
) -> pd.DataFrame:
    """Identify the best formulation for each gage based on summary scores and costs.

    Args:
        df_score : pd.DataFrame
            DataFrame containing summary scores for each formulation and calibrated basin.
        gage_id_col : str
            Column name for gage ID in the gage file.
        tolerance : float, optional
            Tolerance for the summary score to consider formulations as equally good, by default 0.05.
        cost_dict : dict, optional
            Dictionary containing costs for each formulation, by default None.

    Returns:
        pd.DataFrame
            DataFrame with the best formulation for each gage and its score.

    """
    # for each gage, find the best formulation(s) within the tolerance
    max_scores = df_score.groupby(gage_id_col)["summary_score"].transform("max")
    best_per_gage = df_score[
        df_score["summary_score"] >= (max_scores - max_scores * tolerance)
    ].copy()

    # if formulation costs are provided, select the formulation with the minimum cost; otherwise,
    # choose the formulation with the highest score (this essentially ignores the tolerance)
    if cost_dict is not None:
        # check if all formulations are in the cost_dict
        if not best_per_gage["formulation"].isin(cost_dict.keys()).all():
            missed_formulations = best_per_gage[
                ~best_per_gage["formulation"].isin(cost_dict.keys())
            ]["formulation"].unique()

            msg = f"Some formulations are missing costs in the configuration: {', '.join(missed_formulations)}."
            logger.error(msg)
            raise ValueError(msg)

        # map costs to the formulations
        best_per_gage.loc[:, "cost"] = best_per_gage["formulation"].map(cost_dict)

        # for each gage, select the formulation with the minimum cost
        best_per_gage = best_per_gage.loc[
            best_per_gage.groupby(gage_id_col)["cost"].idxmin()
        ].copy()

        # if there are multiple formulations with the same minimum cost, keep the first one
        best_per_gage = best_per_gage.drop_duplicates(subset=[gage_id_col])
    else:
        # choose the first formulation with the highest score for each gage
        best_per_gage = best_per_gage.loc[
            best_per_gage.groupby(gage_id_col)["summary_score"].idxmax()
        ].copy()

        # initialize cost column to None
        best_per_gage["cost"] = None

    return best_per_gage


[docs] def select_formulation_donors_only( config: cs.Config, df_score: pd.DataFrame, cost_dict: Optional[dict] = None, ) -> pd.DataFrame: """Select formulations for donor basins only based on summary scores and costs. Args: config : cs.Config Configuration object containing settings for the regionalization. df_score : pd.DataFrame DataFrame containing summary scores for each formulation and calibrated basin. cost_dict : Optional[dict], optional Dictionary containing costs for each formulation, by default None. Returns: pd.DataFrame DataFrame with selected formulations and their scores. """ # get the ID columns from the configuration gage_id_col = getattr( config.general.id_col, "gage", "gage_id" ) # column name for gage ID in the DataFrame divide_id_col = getattr(config.general.id_col, "divide", "divide_id") # score computing method, type, and tolerance score_tolerance = config.spatial_unit.best_formulation.tolerance # identify the best formulation for each gage df_best_per_gage = _identify_best_formulation_per_gage( df_score, gage_id_col=gage_id_col, tolerance=score_tolerance, cost_dict=cost_dict, ) # read the crosswalk file for gage/divide relationships col_dtype = { gage_id_col: "str", divide_id_col: "str", } cwt_divide_gage = read_table(config.general.gage_divide_cwt_file, dtype=col_dtype) # merge the crosswalk with the best formulations DataFrame if divide_id_col in df_best_per_gage.columns: df_best_per_gage = df_best_per_gage.drop(columns=[divide_id_col]) df_selected = df_best_per_gage.merge( cwt_divide_gage[[gage_id_col, divide_id_col]].drop_duplicates(), on=gage_id_col, how="left", ) return df_selected
[docs] def select_formulation_all( config: cs.Config, vpu: str, df_score: pd.DataFrame, gdf_vpu: gpd.GeoDataFrame, cost_dict: Optional[dict] = None, ) -> pd.DataFrame: """Select formulation for all catchments in the VPU. Args: config : cs.Config Configuration object containing settings for the regionalization. vpu : str Vector Processing Unit (VPU) for which to select formulations. df_score : pd.DataFrame DataFrame containing summary scores for each formulation and calibrated basin. gdf_vpu : gpd.GeoDataFrame GeoDataFrame of the VPU polygons, used for spatial operations where needed. cost_dict : Optional[dict], optional Dictionary containing costs for each formulation, by default None. Returns: pd.DataFrame DataFrame with selected formulations and their scores. """ # get the ID columns from the configuration id_cols = config.general.id_col gage_id_col = getattr(id_cols, "gage", "gage_id") divide_id_col = getattr(id_cols, "divide", "divide_id") huc12_id_col = getattr(id_cols, "huc12", "huc_12") huc12_layer = getattr(config.general.layer_name, "huc12", "WBDSnapshot_National") # score computing method, type, and tolerance score_method = config.spatial_unit.best_formulation.method.lower() score_type = config.spatial_unit.best_formulation.type.lower() score_tolerance = config.spatial_unit.best_formulation.tolerance logger.info( f"Computing total score using method '{score_method}' and type '{score_type}'." ) # crosswalk for gage/divide col_dtype = { gage_id_col: "str", divide_id_col: "str", huc12_id_col: "str", } cwt_divide_gage = read_table(config.general.gage_divide_cwt_file, dtype=col_dtype) # crosswalk for divide/huc12 cwt_divide_huc12 = read_table(config.general.divide_huc12_cwt_file, dtype=col_dtype) # filter cwt_divide_huc12 with divides in gdf_vpu (i.e., only keep divides that are in the current VPU) cwt_divide_huc12 = cwt_divide_huc12[ cwt_divide_huc12[divide_id_col].isin(gdf_vpu[divide_id_col]) ].copy() # get spatial units for formulation selection huc_level = config.spatial_unit.huc_level.lower().replace("-", "").replace("_", "") huc_digit = int(huc_level.replace("huc", "")) # add huc_id column to cwt_divide_huc12 based on huc_digit cwt_divide_huc12["huc_id"] = cwt_divide_huc12[huc12_id_col].str[:huc_digit] # make sure type of huc_id column is string if not pd.api.types.is_string_dtype(cwt_divide_huc12["huc_id"]): logger.warning( f"Converting 'huc_id' column to string type. Current type: {cwt_divide_huc12['huc_id'].dtype}." ) cwt_divide_huc12["huc_id"] = cwt_divide_huc12["huc_id"].astype(str) # merge the two crosswalks first df_cwt = cwt_divide_huc12[["huc_id", huc12_id_col, divide_id_col]].merge( cwt_divide_gage[[gage_id_col, divide_id_col]].drop_duplicates(), on=divide_id_col, how="left", ) # then merge with the score DataFrame df_score = df_score.merge(df_cwt, on=gage_id_col, how="outer") logger.info( f"Selecting formulations for VPU {vpu} at {huc_level} level. " f"There are {len(df_score['huc_id'].unique())} unique {huc_level} IDs." ) # all unique huc_ids in the score DataFrame huc_ids = df_score["huc_id"].unique() # remove NaN values from huc_ids huc_ids = [huc_id for huc_id in huc_ids if pd.notna(huc_id)] # get huc12 geometry for these huc_ids (to compute centroid distances between gages and huc_ids) huc12_hydro_file = Path(config.general.huc12_hydrofabric_file).resolve(strict=True) # Open with Fiona to read features in huc_ids features = [] huc_digit = len(huc_ids[0]) # assuming all huc_ids have the same length with fiona.open( huc12_hydro_file, "r", layer=huc12_layer, open_options=["METHOD=ONLY_CCW"] ) as src: for feat in src: huc_key = next( (k for k in feat["properties"] if k.lower() == huc12_id_col), None ) if huc_key and feat["properties"][huc_key][:huc_digit] in huc_ids: features.append(feat) if not features: msg = ( f"No matching HUC12 geometries found in {huc12_hydro_file} " f"for the specified HUC IDs at {huc_level} level." ) logger.error(msg) raise ValueError(msg) # Convert to GeoDataFrame huc12_gdf = gpd.GeoDataFrame.from_features(features, crs=src.crs) # loop through each unique huc_id and select formulation df_selected = pd.DataFrame() for huc_id in huc_ids: logger.debug(f"Processing HUC ID: {huc_id}") # Find actual column name in gdf that matches huc12_id_col (case-insensitive) col_match = next( (col for col in huc12_gdf.columns if col.lower() == huc12_id_col.lower()), None, ) # Use the matched column to filter huc12_gdf huc12_gdf_huc = huc12_gdf[ huc12_gdf[col_match].astype(str).str[:huc_digit] == huc_id ].copy() if huc12_gdf_huc.empty: msg = f"No geometry found for HUC ID: {huc_id}. Please check the HUC12 hydrofabric file." logger.error(msg) raise ValueError(msg) huc_level, gages, dists, formulations = _find_calibration_gages( huc_id, df_score, config, huc12_gdf_huc ) if not gages: msg = ( f"No calibration gages found for HUC ID: {huc_id}. " f"Minimum required is {config.spatial_unit.nmin_calib_basin}." ) logger.error(msg) raise ValueError(msg) # filter the score DataFrame for identified gages and formulations df_huc = df_score[ df_score[gage_id_col].isin(gages) & df_score["formulation"].isin(formulations) ].copy() # identify best formulation for each gage based on summary scores and/or formulation costs df_huc = _identify_best_formulation_per_gage( df_huc, gage_id_col=gage_id_col, tolerance=score_tolerance, cost_dict=cost_dict, ) # select the best formulation for each huc_id based on the total score or average score best_formulation = _select_formulation_given_score( df_huc, method=score_method, type=score_type ) # append the selected formulation to the list if best_formulation.empty: logger.warning( f"No valid formulations found for HUC ID: {huc_id}. Skipping this HUC." ) continue # add huc_id, huc_level, number of gages and vpu to the best formulation best_formulation["huc_id"] = huc_id best_formulation["upscale_huc"] = huc_level best_formulation["num_gages"] = len(gages) best_formulation["distances"] = ", ".join( [str(d) for d in dists] ) # join distances as a stringi df_selected = pd.concat([df_selected, best_formulation], ignore_index=True) # merge with cwt_divide_huc12 to get the divide_id df_selected = df_selected.merge( cwt_divide_huc12[["huc_id", divide_id_col]].drop_duplicates(), on="huc_id", how="left", ) # handle formulation selection for calibration gages if config.general.approach_calib_basins == "regionalization": pass elif config.general.approach_calib_basins == "summary_score": # select formulations for donor gages based on the summary scores and costs df_selected_donors = select_formulation_donors_only( config, df_score, cost_dict=cost_dict ) # replace the formulation for donors in df_selected with the one from df_selected_donors form_map = df_selected_donors.drop_duplicates(divide_id_col).set_index( divide_id_col )["formulation"] df_selected.loc[:, "formulation"] = ( df_selected[divide_id_col].map(form_map).fillna(df_selected["formulation"]) ) else: msg = ( f"Unknown approach for assigning formulations to calibrated basins: " f"{config.general.approach_calib_basins}. Supported methods are 'regionalization' and 'summary_score'." ) logger.error(msg) raise ValueError(msg) return df_selected
[docs] def check_gage_formulation_uniqueness( df_form: pd.DataFrame, config: cs.Config, ) -> tuple[pd.DataFrame, pd.DataFrame]: """Check that each gage has a unique formulation assigned. Args: df_form : pd.DataFrame DataFrame with selected formulations for each divide. config : cs.Config Configuration object containing settings for the regionalization. Returns: tuple[pd.DataFrame, pd.DataFrame] Tuple containing the new DataFrame with gage-formulation mapping, and the updated DataFrame with divide-formulation mapping and """ # get formulation selected for each donor gage gage_id_col = getattr(config.general.id_col, "gage", "gage_id") divide_id_col = getattr(config.general.id_col, "divide", "divide_id") df_cwt = read_table( config.general.gage_divide_cwt_file, dtype={gage_id_col: "str", divide_id_col: "str"}, ) df_cwt = df_cwt[[gage_id_col, divide_id_col]].drop_duplicates() df_form_gage = df_form[[divide_id_col, "formulation"]].drop_duplicates() df_form_gage = df_cwt.merge( df_form_gage, on=divide_id_col, how="inner", ) # check if any gage is associated with multiple formulations df_count = ( df_form_gage[[gage_id_col, "formulation"]] .drop_duplicates() .groupby(gage_id_col) .size() .reset_index(name="count") .sort_values(by="count", ascending=False) ) gages = df_count[df_count["count"] > 1][gage_id_col].tolist() # loop through gages associated with multiple formulations to reassign them to # the most frequent formulation for both df_form_gage and df_form for gage in gages: df1 = df_form_gage.loc[df_form_gage[gage_id_col] == gage].sort_values( by="formulation" ) df_count = ( df1.groupby("formulation") .size() .reset_index(name="count") .sort_values(by="count", ascending=False) ) most_freq_form = df_count.iloc[0]["formulation"] logger.debug( f"Gage {gage} is associated with multiple formulations, " f"reassigning to the most frequent formulation {most_freq_form}" ) df_form_gage.loc[df_form_gage[gage_id_col] == gage, "formulation"] = ( most_freq_form ) df_form.loc[df_form[divide_id_col].isin(df1[divide_id_col]), "formulation"] = ( most_freq_form ) # ensure no duplicate rows in df_form_gage df_form_gage = df_form_gage[[gage_id_col, "formulation"]].drop_duplicates() return df_form_gage, df_form
[docs] def save_formulation_results( df_form: pd.DataFrame, df_form_gage: pd.DataFrame, config: cs.Config, vpu: str, ) -> None: """Save the selected formulations to the output file. Args: df_form : pd.DataFrame DataFrame with formualtion selected for each divide. df_form_gage : pd.DataFrame DataFrame with formualtion selected for each gage. config : cs.Config Configuration object containing settings for the regionalization. vpu : str Vector Processing Unit (VPU) for which to save formulations. """ co = getattr(config.output, "formulation", None) if co is not None: co.save_to_file( df_form, vpu=vpu, data_str="Formulation Selection", use_stem_suffix=False ) # merge with calibration parameter file to get parameters for each gage gage_id_col = getattr(config.general.id_col, "gage", "gage_id") df_pars = read_table(config.general.calib_param_file, dtype={gage_id_col: "str"}) df_pars = df_form_gage.merge( df_pars, on=[gage_id_col, "formulation"], how="inner" ).drop_duplicates() # save the parameters for each gage to the output file co.save_to_file( df_pars, vpu=vpu, data_str="Formulation Selection (per gage with parameters)", use_stem_suffix=True, )
[docs] def plot_formulation_results( df_selected: pd.DataFrame, config: cs.Config, vpu: str, gdf_vpu: Optional[gpd.GeoDataFrame], ) -> None: """Plot the selected formulations for each spatial unit in the VPU. Args: df_selected : pd.DataFrame DataFrame with selected formulations and their scores. config : cs.Config Configuration object containing settings for the regionalization. vpu : str Vector Processing Unit (VPU) for which to plot formulations. gdf_vpu : Optional[gpd.GeoDataFrame], optional GeoDataFrame of the VPU polygons, used for spatial operations where needed """ # generate plots if enabled cc = getattr(config.output, "formulation", None) if cc is None: return divide_id_col = getattr(config.general.id_col, "divide", "divide_id") score_method = config.spatial_unit.best_formulation.method.lower() if any(cc.plots.values()): # get geometry for divides if spatial map is enabled if cc.plots.get("spatial_map", False): # merge the geometry with the selected formulations df_selected = df_selected.merge( gdf_vpu[[divide_id_col, "geometry"]].drop_duplicates(), on=divide_id_col, how="right", ) # convert df_selected to a real GeoDataFrame for plotting df_selected = gpd.GeoDataFrame( df_selected, geometry="geometry", crs=gdf_vpu.crs, ) # plot the selected formulations columns_to_plot = cc.plots.get("columns_to_plot", None) if columns_to_plot is None: columns_to_plot = ["formulation", score_method, "summary_score", "cost"] plot_dict = { "vpu": vpu, "var_str": "Formulation Selection", "columns": columns_to_plot, "ncols": 3, } cc.plot_data(df_selected, plot_dict)
[docs] def select_formulation( config: cs.Config, vpu: str, df_score: pd.DataFrame, gdf_vpu: Optional[gpd.GeoDataFrame], ) -> pd.DataFrame: """Head function to select formulations for catchments in the VPU. Args: config : cs.Config Configuration object containing settings for the regionalization. vpu : str Vector Processing Unit (VPU) for which to select formulations. df_score : pd.DataFrame DataFrame containing summary scores for each formulation and calibrated basin. gdf_vpu : Optional[gpd.GeoDataFrame] GeoDataFrame of the VPU polygons, used for spatial operations where needed. Returns: pd.DataFrame DataFrame with selected formulations and their scores. """ # get formulation costs from the configuration formulation_costs = _get_formulation_costs(config.formulation_cost) cost_dict = formulation_costs if config.general.consider_cost else None if config.general.calib_basins_only: # select formulations for donor basins only df_formulation = select_formulation_donors_only(config, df_score, cost_dict) else: # select formulations for all catchments in the VPU df_formulation = select_formulation_all( config, vpu, df_score, gdf_vpu, cost_dict ) # add vpu column to the formulation DataFrame df_formulation["vpu"] = vpu # rearrange the columns in the formulation DataFrame divide_id_col = getattr(config.general.id_col, "divide", "divide_id") score_method = config.spatial_unit.best_formulation.method.lower() columns = [ "vpu", divide_id_col, "formulation", "huc_id", score_method, "summary_score", "cost", "num_gages", ] method1 = config.spatial_unit.basin_fill_method.lower() output_columns = ( columns + ["upscale_huc"] if method1 == "upscaling" else columns + ["distances"] ) df_formulation = df_formulation.reindex( columns=[c for c in output_columns if c in df_formulation.columns] ) # check that each gage has a unique formulation assigned df_form_gage, df_formulation = check_gage_formulation_uniqueness( df_formulation, config ) # save the selected formulations to the output file save_formulation_results(df_formulation, df_form_gage, config, vpu) # plot the selected formulations plot_formulation_results(df_formulation, config, vpu, gdf_vpu) return df_formulation