Source code for nwm_region_mgr.utils.validation_utils

"""Define utilities and/or help functions for validation.

validation_utils.py

Functions:
    - check_columns_dataframe: Check if the required columns are present in a DataFrame (CSV or Parquet).
    - check_columns_hydrofabric: Check if the required fields are present in a hydrofabric file (GeoPackage or Shapefile).
    - check_options: Check if the provided option is valid against a list of valid options.

"""

import logging
from pathlib import Path
from typing import Set

import fiona
import pandas as pd
import pyarrow.parquet as pq

logger = logging.getLogger(__name__)


[docs] def check_columns_dataframe(file: Path | str, columns: Set[str]): """Check if the required columns are present in the file. Args: file: Path to the CSV or Parquet file. columns: Set of required column names. Raises: ValueError: If any of the required columns are missing in the file. """ if isinstance(file, str): file = Path(file) if not file.exists(): msg = f"File not found: {file}" logger.error(msg) raise FileNotFoundError(msg) suffix = file.suffix.lower() if suffix == ".csv": # Read full DataFrame with minimal memory usage df = pd.read_csv(file) df.columns = df.columns.str.strip().str.lower() columns_present = list(df.columns) is_empty = df.empty elif suffix == ".parquet": pf = pq.ParquetFile(file) columns_present = [col.strip().lower() for col in pf.schema.names] is_empty = pf.metadata.num_rows == 0 # More efficient than loading into pandas else: msg = f"Unsupported file format: {suffix}. Supported formats are .csv and .parquet." logger.error(msg) raise ValueError(msg) # raise error if the file is empty if is_empty: msg = f"The file {file} is empty. Please provide a file with data." logger.error(msg) raise ValueError(msg) # Check for missing columns (case insensitive) missing_cols = {col.lower() for col in columns} - { col.lower() for col in columns_present } if missing_cols: msg = f"Missing columns (case insensitive) in {file}: {missing_cols}. Available columns: {columns_present}" logger.error(msg) raise ValueError(msg)
[docs] def check_columns_hydrofabric( hydro_file: str | Path, required_fields: list[str], layer_name: str = None ) -> str: """Check if the required fields are present in the hydrofabric file. Args: hydro_file: Path to the hydrofabric file (GeoPackage or Shapefile). required_fields: List of required fields to check. layer_name: Optional layer name for GeoPackage or Geodatabase files. Returns: str: The layer name used for the hydrofabric file. """ # get the file suffix if isinstance(hydro_file, str): hydro_file = Path(hydro_file) suffix = hydro_file.suffix.lower() # check if the file format is supported if suffix not in (".gpkg", ".shp", ".gdb"): msg = f"Unsupported hydrofabric file format: {suffix}. Supported formats are .gpkg, .shp, and .gdb." logger.error(msg) raise ValueError(msg) # Determine if layer is needed use_layer = suffix in (".gpkg", ".gdb") # Automatically select layer if needed and not provided if use_layer and layer_name is None: layers = fiona.listlayers(hydro_file) if len(layers) == 1: layer_name = layers[0] else: raise ValueError( f"Multiple layers found in {hydro_file}. Please specify a layer: {layers}" ) # Open with or without layer open_kwargs = {"layer": layer_name} if use_layer else {} with fiona.open(hydro_file, **open_kwargs) as src: schema_fields = {field.lower() for field in src.schema["properties"]} has_geometry = src.schema.get("geometry") is not None missing = [] for field in required_fields: if field.lower() == "geometry": if not has_geometry: missing.append("geometry") elif field not in schema_fields: missing.append(field) if missing: msg = f"Missing required fields in {hydro_file}: {missing}. Available fields: {schema_fields}" logger.error(msg) raise ValueError(msg) return layer_name
[docs] def check_options(options: str | list[str], valid_options: list[str], var: str): """Check if the provided options are valid against a list of valid options.""" if isinstance(options, str): options = [options] options_unsupported = set(options) - set(valid_options) if options_unsupported: raise ValueError( f"Unsupported options for {var}: {options_unsupported}. Valid options are: {valid_options}" )