Skip to content

API Reference#

This page provides detailed documentation for the NWM Coastal Python API.

Configuration Classes#

CoastalCalibConfig#

CoastalCalibConfig dataclass #

CoastalCalibConfig(
    simulation,
    boundary,
    paths,
    model_config,
    monitoring=MonitoringConfig(),
    download=DownloadConfig(),
    _base_config=None,
)

Complete coastal calibration workflow configuration.

Supports both SCHISM and SFINCS models via the polymorphic :attr:model_config field. The concrete type is selected by the model key in the YAML file and resolved through :data:MODEL_REGISTRY.

model property #

model

Model identifier string (convenience accessor).

from_yaml classmethod #

from_yaml(config_path)

Load configuration from YAML file with optional inheritance.

Supports variable interpolation using ${section.key} syntax. Variables are resolved from other config values, e.g.:

  • ${user} -> value of $USER environment variable
  • ${simulation.coastal_domain} -> value of simulation.coastal_domain
  • ${model} -> the model type string ("schism" or "sfincs")
PARAMETER DESCRIPTION
config_path

Path to YAML configuration file.

TYPE: Path or str

RETURNS DESCRIPTION
CoastalCalibConfig

Loaded configuration.

RAISES DESCRIPTION
FileNotFoundError

If the configuration file does not exist.

YAMLError

If the YAML file is malformed.

Source code in src/coastal_calibration/config/schema.py
@classmethod
def from_yaml(cls, config_path: Path | str) -> CoastalCalibConfig:
    """Load configuration from YAML file with optional inheritance.

    Supports variable interpolation using ${section.key} syntax.
    Variables are resolved from other config values, e.g.:

    - ``${user}`` -> value of ``$USER`` environment variable
    - ``${simulation.coastal_domain}`` -> value of ``simulation.coastal_domain``
    - ``${model}`` -> the model type string (``"schism"`` or ``"sfincs"``)

    Parameters
    ----------
    config_path : Path or str
        Path to YAML configuration file.

    Returns
    -------
    CoastalCalibConfig
        Loaded configuration.

    Raises
    ------
    FileNotFoundError
        If the configuration file does not exist.
    yaml.YAMLError
        If the YAML file is malformed.
    """
    config_path = Path(config_path)
    if not config_path.exists():
        raise FileNotFoundError(f"Configuration file not found: {config_path}")

    try:
        data = yaml.safe_load(config_path.read_text())
    except yaml.YAMLError as e:
        raise yaml.YAMLError(f"Invalid YAML in {config_path}: {e}") from e

    if data is None:
        raise ValueError(f"Configuration file is empty: {config_path}")

    base_config = None
    if "_base" in data:
        base_path = Path(data.pop("_base"))
        if not base_path.is_absolute():
            base_path = config_path.parent / base_path
        base_config = cls.from_yaml(base_path)
        data = _deep_merge(base_config.to_dict(), data)

    # Ensure model key has a default before interpolation
    data.setdefault("model", "schism")

    # Interpolate variables after merging
    data = _interpolate_config(data)

    return cls.from_dict(data, base_config_path=config_path if base_config else None)

from_dict classmethod #

from_dict(data, base_config_path=None)

Create config from a plain dictionary.

PARAMETER DESCRIPTION
data

Configuration dictionary with the same structure as the YAML file (see :meth:to_dict for the expected keys).

TYPE: dict

base_config_path

Path to a base configuration file (for YAML inheritance). Only needed when the config was loaded via _base key.

TYPE: Path DEFAULT: None

RETURNS DESCRIPTION
CoastalCalibConfig
Source code in src/coastal_calibration/config/schema.py
@classmethod
def from_dict(
    cls, data: dict[str, Any], base_config_path: Path | None = None
) -> CoastalCalibConfig:
    """Create config from a plain dictionary.

    Parameters
    ----------
    data : dict
        Configuration dictionary with the same structure as the YAML
        file (see :meth:`to_dict` for the expected keys).
    base_config_path : Path, optional
        Path to a base configuration file (for YAML inheritance).
        Only needed when the config was loaded via ``_base`` key.

    Returns
    -------
    CoastalCalibConfig
    """
    if "model" not in data:
        raise ValueError("'model' is required (e.g., model: schism or model: sfincs)")
    model_type: str = data["model"]

    model_config_data = data.pop("model_config", {}) or {}

    sim_data = data.get("simulation", {})
    if "start_date" in sim_data:
        sim_data["start_date"] = pd.to_datetime(sim_data["start_date"]).to_pydatetime()
    simulation = SimulationConfig(**sim_data)

    boundary_data = data.get("boundary", {})
    if boundary_data.get("stofs_file"):
        boundary_data["stofs_file"] = Path(boundary_data["stofs_file"])
    boundary = BoundaryConfig(**boundary_data)

    paths_data = data.get("paths", {})
    paths = PathConfig(**paths_data)

    monitoring_data = data.get("monitoring", {})
    if monitoring_data.get("log_file"):
        monitoring_data["log_file"] = Path(monitoring_data["log_file"])
    monitoring = MonitoringConfig(**monitoring_data)

    download_data = data.get("download", {})
    download = DownloadConfig(**download_data)

    if model_type not in MODEL_REGISTRY:
        msg = (
            f"Unknown model type: {model_type!r}. Supported models: {', '.join(MODEL_REGISTRY)}"
        )
        raise ValueError(msg)

    model_cls = MODEL_REGISTRY[model_type]
    model_config = model_cls(**model_config_data)

    return cls(
        simulation=simulation,
        boundary=boundary,
        paths=paths,
        model_config=model_config,  # pyright: ignore[reportArgumentType]
        monitoring=monitoring,
        download=download,
        _base_config=base_config_path,
    )

to_yaml #

to_yaml(path)

Write configuration to YAML file.

PARAMETER DESCRIPTION
path

Path to YAML output file. Parent directories will be created if they don't exist.

TYPE: Path or str

Source code in src/coastal_calibration/config/schema.py
def to_yaml(self, path: Path | str) -> None:
    """Write configuration to YAML file.

    Parameters
    ----------
    path : Path or str
        Path to YAML output file. Parent directories will be created
        if they don't exist.
    """
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(yaml.dump(self.to_dict(), default_flow_style=False, sort_keys=False))

to_dict #

to_dict()

Convert config to dictionary.

Source code in src/coastal_calibration/config/schema.py
def to_dict(self) -> dict[str, Any]:
    """Convert config to dictionary."""
    return {
        "model": self.model,
        "simulation": {
            "start_date": self.simulation.start_date.isoformat(),
            "duration_hours": self.simulation.duration_hours,
            "coastal_domain": self.simulation.coastal_domain,
            "meteo_source": self.simulation.meteo_source,
            "timestep_seconds": self.simulation.timestep_seconds,
        },
        "boundary": {
            "source": self.boundary.source,
            "stofs_file": (str(self.boundary.stofs_file) if self.boundary.stofs_file else None),
        },
        "paths": {
            "work_dir": str(self.paths.work_dir),
            "raw_download_dir": (
                str(self.paths.raw_download_dir) if self.paths.raw_download_dir else None
            ),
            "hot_start_file": (
                str(self.paths.hot_start_file) if self.paths.hot_start_file else None
            ),
            **({"parm_dir": str(self.paths.parm_dir)} if self.paths.parm_dir else {}),
            **({"nwm_dir": str(self.paths.nwm_dir)} if self.paths.nwm_dir else {}),
            **({"otps_dir": str(self.paths.otps_dir)} if self.paths.otps_dir else {}),
        },
        "model_config": self.model_config.to_dict(),
        "monitoring": {
            "log_level": self.monitoring.log_level,
            "log_file": (str(self.monitoring.log_file) if self.monitoring.log_file else None),
            "enable_progress_tracking": self.monitoring.enable_progress_tracking,
            "enable_timing": self.monitoring.enable_timing,
        },
        "download": {
            "enabled": self.download.enabled,
            "timeout": self.download.timeout,
            "raise_on_error": self.download.raise_on_error,
            "limit_per_host": self.download.limit_per_host,
        },
    }

validate #

validate()

Validate configuration and return list of errors.

Source code in src/coastal_calibration/config/schema.py
def validate(self) -> list[str]:
    """Validate configuration and return list of errors."""
    from coastal_calibration.data.downloader import validate_date_ranges

    errors: list[str] = []

    if self.simulation.duration_hours <= 0:
        errors.append("simulation.duration_hours must be positive")

    # Model-specific validation
    errors.extend(self.model_config.validate(self))

    # Shared boundary validation
    errors.extend(self._validate_boundary_source())

    # Date range validation
    if self.download.enabled:
        sim = self.simulation
        start_time = sim.start_date
        end_time = start_time + timedelta(hours=sim.duration_hours)
        date_errors = validate_date_ranges(
            start_time,
            end_time,
            sim.meteo_source,
            self.boundary.source,
            sim.coastal_domain,
        )
        errors.extend(date_errors)

    return errors

SimulationConfig#

SimulationConfig dataclass #

SimulationConfig(
    start_date,
    duration_hours,
    coastal_domain,
    meteo_source,
    timestep_seconds=3600,
)

Simulation time and domain configuration.

start_pdy property #

start_pdy

Return start date as YYYYMMDD string.

start_cyc property #

start_cyc

Return start cycle (hour) as HH string.

inland_domain property #

inland_domain

Inland domain directory name for this coastal domain.

nwm_domain property #

nwm_domain

NWM domain identifier for this coastal domain.

geo_grid property #

geo_grid

Geogrid filename for this coastal domain.

BoundaryConfig#

BoundaryConfig dataclass #

BoundaryConfig(source='tpxo', stofs_file=None)

Boundary condition configuration.

PathConfig#

PathConfig dataclass #

PathConfig(
    work_dir,
    raw_download_dir=None,
    hot_start_file=None,
    parm_dir=None,
    nwm_dir=None,
    otps_dir=None,
)

Path configuration for data and executables.

Only work_dir is required. All other fields are optional and only needed by specific workflow stages (e.g. the create workflow for SCHISM or SFINCS boundary processing that uses TPXO/OTPSnc).

tpxo_data_dir property #

tpxo_data_dir

TPXO tidal atlas data directory (requires parm_dir).

parm_nwm property #

parm_nwm

Parameter files directory (requires parm_dir).

download_dir property #

download_dir

Effective download directory (fallback to work_dir/downloads).

meteo_dir #

meteo_dir(meteo_source)

Directory for meteorological data.

Source code in src/coastal_calibration/config/schema.py
def meteo_dir(self, meteo_source: str) -> Path:
    """Directory for meteorological data."""
    return self.download_dir / self.METEO_SUBDIR / meteo_source

streamflow_dir #

streamflow_dir(meteo_source, coastal_domain='conus')

Directory for streamflow/hydro data.

Source code in src/coastal_calibration/config/schema.py
def streamflow_dir(self, meteo_source: str, coastal_domain: str = "conus") -> Path:
    """Directory for streamflow/hydro data."""
    if meteo_source == "nwm_retro":
        return self.download_dir / self.STREAMFLOW_SUBDIR / "nwm_retro"
    nwm_dir = self._NWM_DOMAIN_DIR.get(coastal_domain, "conus")
    return self.download_dir / self.HYDRO_SUBDIR / "nwm" / nwm_dir

coastal_dir #

coastal_dir(coastal_source)

Directory for coastal boundary data.

Source code in src/coastal_calibration/config/schema.py
def coastal_dir(self, coastal_source: str) -> Path:
    """Directory for coastal boundary data."""
    return self.download_dir / self.COASTAL_SUBDIR / coastal_source

geogrid_file #

geogrid_file(sim)

Geogrid file path for the given domain (requires parm_dir).

Source code in src/coastal_calibration/config/schema.py
def geogrid_file(self, sim: SimulationConfig) -> Path:
    """Geogrid file path for the given domain (requires ``parm_dir``)."""
    return self.parm_nwm / sim.inland_domain / sim.geo_grid

ModelConfig#

ModelConfig #

Bases: ABC

Abstract base class for model-specific configuration.

Each concrete subclass owns its compute parameters, environment variable construction, stage ordering, validation, and SLURM script generation. This keeps model-specific concerns out of the shared configuration and makes adding new models straightforward: create a new subclass, implement the abstract methods, and register it in :data:MODEL_REGISTRY.

ATTRIBUTE DESCRIPTION
omp_num_threads

Number of OpenMP threads per process.

TYPE: int

runtime_env

Extra environment variables for the model run subprocess. Merged last so they can override any auto-detected value. Only used by model run stages (schism_run, sfincs_run).

TYPE: dict[str, str]

model_name abstractmethod property #

model_name

Return the model identifier string (e.g. 'schism', 'sfincs').

stage_order abstractmethod property #

stage_order

Ordered list of stage names for this model's pipeline.

build_environment abstractmethod #

build_environment(env, config)

Add model-specific environment variables to env (mutating).

Called by :meth:WorkflowStage.build_environment after shared variables (OpenMP pinning, HDF5 file locking) have been populated.

Source code in src/coastal_calibration/config/schema.py
@abstractmethod
def build_environment(self, env: dict[str, str], config: CoastalCalibConfig) -> dict[str, str]:
    """Add model-specific environment variables to *env* (mutating).

    Called by :meth:`WorkflowStage.build_environment` after shared
    variables (OpenMP pinning, HDF5 file locking) have been populated.
    """

validate abstractmethod #

validate(config)

Return model-specific validation errors.

Source code in src/coastal_calibration/config/schema.py
@abstractmethod
def validate(self, config: CoastalCalibConfig) -> list[str]:
    """Return model-specific validation errors."""

create_stages abstractmethod #

create_stages(config, monitor)

Construct and return the {name: stage} dictionary.

Source code in src/coastal_calibration/config/schema.py
@abstractmethod
def create_stages(self, config: CoastalCalibConfig, monitor: Any) -> dict[str, Any]:
    """Construct and return the ``{name: stage}`` dictionary."""

to_dict abstractmethod #

to_dict()

Serialize model-specific fields to a dictionary.

Source code in src/coastal_calibration/config/schema.py
@abstractmethod
def to_dict(self) -> dict[str, Any]:
    """Serialize model-specific fields to a dictionary."""

SchismModelConfig#

SchismModelConfig dataclass #

SchismModelConfig(
    prebuilt_dir=Path(),
    geogrid_file=Path(),
    nodes=2,
    ntasks_per_node=18,
    exclusive=True,
    nscribes=2,
    omp_num_threads=2,
    oversubscribe=False,
    schism_exe=None,
    include_noaa_gages=False,
    discharge_file=None,
    runtime_env=dict(),
)

Bases: ModelConfig

SCHISM model configuration.

Contains compute parameters (MPI layout, SCHISM binary), the path to a prebuilt model directory, and the geogrid file used for atmospheric forcing regridding.

PARAMETER DESCRIPTION
prebuilt_dir

Path to the directory containing the pre-built SCHISM model files (hgrid.gr3, vgrid.in, param.nml, etc.).

TYPE: Path DEFAULT: Path()

geogrid_file

Path to the WRF geogrid file (e.g. geo_em_HI.nc) used by the atmospheric forcing regridding stage.

TYPE: Path DEFAULT: Path()

nodes

Number of SLURM nodes.

TYPE: int DEFAULT: 2

ntasks_per_node

MPI tasks per node.

TYPE: int DEFAULT: 18

exclusive

Request exclusive node access.

TYPE: bool DEFAULT: True

nscribes

Number of SCHISM scribe processes.

TYPE: int DEFAULT: 2

omp_num_threads

OpenMP threads per MPI rank.

TYPE: int DEFAULT: 2

oversubscribe

Allow MPI oversubscription.

TYPE: bool DEFAULT: False

schism_exe

Path to a compiled SCHISM executable. When set, the schism_run stage uses this binary instead of discovering pschism on PATH. Normally not needed -- SCHISM is compiled automatically when activating a pixi environment with the schism feature. Set this to a system-compiled binary on WCOSS2 or other clusters where the model is built against system MPI/HDF5/NetCDF.

TYPE: Path DEFAULT: None

include_noaa_gages

When True, automatically query NOAA CO-OPS for water level stations within the model domain (computed from the concave hull of open boundary nodes in hgrid.gr3), write a station.in file, set iout_sta = 1 in param.nml, and generate sim-vs-obs comparison plots after the run. Requires the plot optional dependencies.

TYPE: bool DEFAULT: False

discharge_file

Path to a nwmReaches.csv file mapping NWM reach feature IDs to SCHISM source/sink elements. When None (default), the discharge stage is skipped and no river forcing is generated.

TYPE: Path DEFAULT: None

total_tasks property #

total_tasks

Total number of MPI tasks (nodes * ntasks_per_node).

coastal_parm property #

coastal_parm

Directory containing prebuilt SCHISM model files.

schism_mesh property #

schism_mesh

SCHISM ESMF mesh file path.

SfincsModelConfig#

SfincsModelConfig dataclass #

SfincsModelConfig(
    prebuilt_dir,
    model_root=None,
    discharge_locations_file=None,
    merge_discharge=False,
    include_precip=False,
    include_wind=False,
    include_pressure=False,
    meteo_res=None,
    forcing_to_mesh_offset_m=0.0,
    vdatum_mesh_to_msl_m=0.0,
    sfincs_exe=None,
    omp_num_threads=0,
    inp_overrides=dict(),
    floodmap_dem=None,
    floodmap_hmin=0.05,
    floodmap_enabled=True,
    runtime_env=dict(),
)

Bases: ModelConfig

SFINCS model configuration.

SFINCS runs on a single node using OpenMP (all available cores). There is no MPI or multi-node support.

PARAMETER DESCRIPTION
prebuilt_dir

Path to the directory containing the pre-built model files (sfincs.inp, sfincs.nc, region.geojson, etc.).

TYPE: Path

model_root

Output directory for the built model. Defaults to {work_dir}/sfincs_model.

TYPE: Path DEFAULT: None

discharge_locations_file

Path to a SFINCS .src or GeoJSON with discharge source point locations.

TYPE: Path DEFAULT: None

merge_discharge

Whether to merge with pre-existing discharge source points.

TYPE: bool DEFAULT: False

include_precip

When True, add precipitation forcing from the meteorological data catalog entry (derived from simulation.meteo_source).

TYPE: bool DEFAULT: False

include_wind

When True, add spatially-varying wind forcing (wind10_u, wind10_v) from the meteorological data catalog entry.

TYPE: bool DEFAULT: False

include_pressure

When True, add spatially-varying atmospheric pressure forcing (press_msl) and enable barometric correction (baro=1).

TYPE: bool DEFAULT: False

meteo_res

Output resolution (m) for gridded meteorological forcing (precipitation, wind, pressure). When None (default) the resolution is determined from the SFINCS quadtree grid — it equals the base cell size (coarsest level) so that the meteo grid is never finer than needed. Setting an explicit value (e.g. 2000) overrides the automatic calculation.

.. note::

Without this parameter the HydroMT reproject call retains the source-data resolution (≈ 1 km for NWM), and the LCC → UTM reprojection can inflate the output to the full CONUS extent, producing multi-GB files and very slow simulations.

TYPE: float DEFAULT: None

forcing_to_mesh_offset_m

Vertical offset in meters added to the boundary-condition water levels before they enter SFINCS.

Tidal-only sources such as TPXO provide oscillations centered on zero (MSL) but carry no information about where MSL sits on the mesh's vertical datum. This parameter anchors the forcing signal to the correct geodetic height on the mesh. Set it to the elevation of MSL in the mesh datum obtained from VDatum (e.g. 0.171 for a NAVD88 mesh on the Texas Gulf coast, where MSL is 0.171 m above NAVD88).

For sources that already report water levels in the mesh datum (e.g. STOFS on a NAVD88 mesh) set this to 0.0.

Defaults to 0.0.

TYPE: float DEFAULT: 0.0

vdatum_mesh_to_msl_m

Vertical offset in meters added to the simulated water level before comparison with NOAA CO-OPS observations (which are in MSL). The model output inherits the mesh vertical datum, so this converts it to MSL (e.g. 0.171 for a NAVD88 mesh on the Texas Gulf coast).

Defaults to 0.0.

TYPE: float DEFAULT: 0.0

sfincs_exe

Path to a compiled SFINCS executable. When set, the sfincs_run stage uses this binary instead of discovering sfincs on PATH. Normally not needed -- SFINCS is compiled automatically when activating a pixi environment with the sfincs feature.

TYPE: Path DEFAULT: None

omp_num_threads

Number of OpenMP threads. Defaults to the number of physical CPU cores on the current machine (see :func:~coastal_calibration.utils.get_cpu_count). On HPC nodes this auto-detects correctly; on a local laptop it avoids over-subscribing the system.

TYPE: int DEFAULT: 0

inp_overrides

Arbitrary key/value pairs written to sfincs.inp just before the model is written to disk. Use this to override physics parameters that HydroMT-SFINCS sets by default (e.g. advection: 0, nuvisc: 0.01). Keys must be valid sfincs.inp parameter names.

TYPE: dict DEFAULT: dict()

MonitoringConfig#

MonitoringConfig dataclass #

MonitoringConfig(
    log_level="INFO",
    log_file=None,
    enable_progress_tracking=True,
    enable_timing=True,
)

Workflow monitoring configuration.

DownloadConfig#

DownloadConfig dataclass #

DownloadConfig(
    enabled=True,
    timeout=600,
    raise_on_error=True,
    limit_per_host=4,
)

Data download configuration.

SFINCS Creation Configuration#

SfincsCreateConfig#

SfincsCreateConfig dataclass #

SfincsCreateConfig(
    aoi,
    output_dir,
    download_dir=None,
    grid=GridConfig(),
    elevation=ElevationConfig(),
    mask=MaskConfig(),
    subgrid=SubgridConfig(),
    data_catalog=DataCatalogConfig(),
    monitoring=MonitoringConfig(),
    river_discharge=None,
    add_noaa_gages=False,
    observation_points=list(),
    observation_locations_file=None,
    merge_observations=False,
)

Root configuration for SFINCS model creation workflow.

Loaded from YAML via :meth:from_yaml. All paths are resolved to absolute paths during construction.

stage_order property #

stage_order

Ordered list of creation stages to execute.

Roughness is embedded in the quadtree subgrid tables, so there is no separate roughness stage. The create_discharge stage is included only when :attr:river_discharge is configured.

from_yaml classmethod #

from_yaml(config_path)

Load configuration from a YAML file.

PARAMETER DESCRIPTION
config_path

Path to YAML configuration file.

TYPE: Path or str

RETURNS DESCRIPTION
SfincsCreateConfig

Loaded configuration.

RAISES DESCRIPTION
FileNotFoundError

If the configuration file does not exist.

YAMLError

If the YAML file is malformed.

Source code in src/coastal_calibration/config/create_schema.py
@classmethod
def from_yaml(cls, config_path: Path | str) -> SfincsCreateConfig:
    """Load configuration from a YAML file.

    Parameters
    ----------
    config_path : Path or str
        Path to YAML configuration file.

    Returns
    -------
    SfincsCreateConfig
        Loaded configuration.

    Raises
    ------
    FileNotFoundError
        If the configuration file does not exist.
    yaml.YAMLError
        If the YAML file is malformed.
    """
    config_path = Path(config_path)
    if not config_path.exists():
        raise FileNotFoundError(f"Configuration file not found: {config_path}")

    try:
        data = yaml.safe_load(config_path.read_text())
    except yaml.YAMLError as e:
        raise yaml.YAMLError(f"Invalid YAML in {config_path}: {e}") from e

    if data is None:
        raise ValueError(f"Configuration file is empty: {config_path}")

    cls._resolve_relative_paths(data, config_path.parent)
    return cls.from_dict(data)

from_dict classmethod #

from_dict(data)

Create config from a plain dictionary.

PARAMETER DESCRIPTION
data

Configuration dictionary with the same structure as the YAML file (see :meth:to_dict for the expected keys).

TYPE: dict

RETURNS DESCRIPTION
SfincsCreateConfig
Source code in src/coastal_calibration/config/create_schema.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> SfincsCreateConfig:
    """Create config from a plain dictionary.

    Parameters
    ----------
    data : dict
        Configuration dictionary with the same structure as the YAML
        file (see :meth:`to_dict` for the expected keys).

    Returns
    -------
    SfincsCreateConfig
    """
    aoi = data.get("aoi")
    if aoi is None:
        raise ValueError("'aoi' is required (path to AOI polygon file)")
    output_dir = data.get("output_dir")
    if output_dir is None:
        raise ValueError("'output_dir' is required (model output directory)")

    grid_data = data.get("grid", {})
    refinement_raw = grid_data.pop("refinement", None)
    if refinement_raw is not None:
        grid_data["refinement"] = [RefinementLevel(**r) for r in refinement_raw]
    grid = GridConfig(**grid_data)

    elev_data = data.get("elevation", {})
    datasets_raw = elev_data.pop("datasets", None)
    if datasets_raw is not None:
        elev_data["datasets"] = [ElevationDataset(**d) for d in datasets_raw]
    elevation = ElevationConfig(**elev_data)

    mask_data = data.get("mask", {})
    mask = MaskConfig(**mask_data)

    subgrid_data = data.get("subgrid", {})
    if subgrid_data.get("reclass_table"):
        subgrid_data["reclass_table"] = Path(subgrid_data["reclass_table"])
    subgrid_cfg = SubgridConfig(**subgrid_data)

    catalog_data = data.get("data_catalog", {})
    data_catalog = DataCatalogConfig(**catalog_data)

    monitoring_data = data.get("monitoring", {})
    if monitoring_data.get("log_file"):
        monitoring_data["log_file"] = Path(monitoring_data["log_file"])
    monitoring = MonitoringConfig(**monitoring_data)

    river_discharge: RiverDischargeConfig | None = None
    nwm_data = data.get("river_discharge")
    if nwm_data is not None:
        nwm_data["flowlines"] = Path(nwm_data["flowlines"])
        river_discharge = RiverDischargeConfig(**nwm_data)

    download_dir_raw = data.get("download_dir")
    download_dir = Path(download_dir_raw) if download_dir_raw else None

    add_noaa_gages = data.get("add_noaa_gages", False)
    observation_points = data.get("observation_points", [])
    obs_file_raw = data.get("observation_locations_file")
    observation_locations_file = Path(obs_file_raw) if obs_file_raw else None
    merge_observations = data.get("merge_observations", False)

    return cls(
        aoi=Path(aoi),
        output_dir=Path(output_dir),
        download_dir=download_dir,
        grid=grid,
        elevation=elevation,
        mask=mask,
        subgrid=subgrid_cfg,
        data_catalog=data_catalog,
        monitoring=monitoring,
        river_discharge=river_discharge,
        add_noaa_gages=add_noaa_gages,
        observation_points=observation_points,
        observation_locations_file=observation_locations_file,
        merge_observations=merge_observations,
    )

to_yaml #

to_yaml(path)

Write configuration to a YAML file.

PARAMETER DESCRIPTION
path

Path to YAML output file. Parent directories are created automatically.

TYPE: Path or str

Source code in src/coastal_calibration/config/create_schema.py
def to_yaml(self, path: Path | str) -> None:
    """Write configuration to a YAML file.

    Parameters
    ----------
    path : Path or str
        Path to YAML output file.  Parent directories are created
        automatically.
    """
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(yaml.dump(self.to_dict(), default_flow_style=False, sort_keys=False))

to_dict #

to_dict()

Convert configuration to a plain dictionary.

Source code in src/coastal_calibration/config/create_schema.py
def to_dict(self) -> dict[str, Any]:
    """Convert configuration to a plain dictionary."""
    return {
        "aoi": str(self.aoi),
        "output_dir": str(self.output_dir),
        **({"download_dir": str(self.download_dir)} if self.download_dir else {}),
        "grid": {
            "resolution": self.grid.resolution,
            "crs": self.grid.crs,
            "rotated": self.grid.rotated,
            "refinement": [
                {
                    "polygon": str(r.polygon),
                    "level": r.level,
                    **({"buffer_m": r.buffer_m} if r.buffer_m != 0.0 else {}),
                }
                for r in self.grid.refinement
            ],
        },
        "elevation": {
            "datasets": [
                {
                    "name": d.name,
                    "zmin": d.zmin,
                    **({"source": d.source} if d.source else {}),
                    **({"noaa_dataset": d.noaa_dataset} if d.noaa_dataset else {}),
                    **({"coastal_domain": d.coastal_domain} if d.coastal_domain else {}),
                }
                for d in self.elevation.datasets
            ],
            "buffer_cells": self.elevation.buffer_cells,
        },
        "mask": {
            "zmin": self.mask.zmin,
            "boundary_zmax": self.mask.boundary_zmax,
            "reset_bounds": self.mask.reset_bounds,
        },
        "subgrid": {
            "nr_subgrid_pixels": self.subgrid.nr_subgrid_pixels,
            "lulc_dataset": self.subgrid.lulc_dataset,
            **({"lulc_source": self.subgrid.lulc_source} if self.subgrid.lulc_source else {}),
            "reclass_table": (
                str(self.subgrid.reclass_table) if self.subgrid.reclass_table else None
            ),
            "manning_land": self.subgrid.manning_land,
            "manning_sea": self.subgrid.manning_sea,
        },
        "data_catalog": {
            "data_libs": self.data_catalog.data_libs,
        },
        "monitoring": {
            "log_level": self.monitoring.log_level,
            "log_file": (str(self.monitoring.log_file) if self.monitoring.log_file else None),
            "enable_progress_tracking": self.monitoring.enable_progress_tracking,
            "enable_timing": self.monitoring.enable_timing,
        },
        "river_discharge": (
            {
                "flowlines": str(self.river_discharge.flowlines),
                "nwm_id_column": self.river_discharge.nwm_id_column,
                "max_snap_distance_m": self.river_discharge.max_snap_distance_m,
            }
            if self.river_discharge is not None
            else None
        ),
        "add_noaa_gages": self.add_noaa_gages,
        "observation_points": self.observation_points,
        "observation_locations_file": (
            str(self.observation_locations_file) if self.observation_locations_file else None
        ),
        "merge_observations": self.merge_observations,
    }

validate #

validate()

Validate configuration and return a list of error messages.

RETURNS DESCRIPTION
list of str

Validation errors (empty when the config is valid).

Source code in src/coastal_calibration/config/create_schema.py
def validate(self) -> list[str]:
    """Validate configuration and return a list of error messages.

    Returns
    -------
    list of str
        Validation errors (empty when the config is valid).
    """
    errors: list[str] = []

    if not self.aoi.exists():
        errors.append(f"AOI file not found: {self.aoi}")

    if self.grid.resolution <= 0:
        errors.append(f"grid.resolution must be positive, got {self.grid.resolution}")

    errors.extend(self._validate_elevation())
    errors.extend(self._validate_subgrid())

    for ref in self.grid.refinement:
        if not ref.polygon.exists():
            errors.append(f"refinement polygon not found: {ref.polygon}")
        if ref.level < 1:
            errors.append(f"refinement level must be >= 1, got {ref.level}")

    if self.river_discharge is not None:
        nd = self.river_discharge
        if not nd.flowlines.exists():
            errors.append(f"river_discharge.flowlines not found: {nd.flowlines}")

    return errors

GridConfig#

GridConfig dataclass #

GridConfig(
    resolution=50.0,
    crs="utm",
    rotated=True,
    refinement=list(),
)

Grid generation configuration.

ElevationConfig#

ElevationConfig dataclass #

ElevationConfig(
    datasets=(
        lambda: [
            ElevationDataset(
                name="copdem_30m",
                zmin=0.001,
                source="copdem_30m",
            ),
            ElevationDataset(
                name="gebco_15arcs",
                zmin=-20000,
                source="gebco_15arcs",
            ),
        ]
    )(),
    buffer_cells=1,
)

Elevation and bathymetry configuration.

MaskConfig#

MaskConfig dataclass #

MaskConfig(
    zmin=-5.0, boundary_zmax=-5.0, reset_bounds=True
)

Active-cell mask and boundary configuration.

SubgridConfig#

SubgridConfig dataclass #

SubgridConfig(
    nr_subgrid_pixels=5,
    lulc_dataset="esa_worldcover",
    lulc_source="esa_worldcover",
    reclass_table=None,
    manning_land=0.04,
    manning_sea=0.02,
)

Subgrid table configuration.

Roughness parameters are included here because for quadtree grids the Manning coefficients are embedded directly in the subgrid tables.

RiverDischargeConfig#

RiverDischargeConfig dataclass #

RiverDischargeConfig(
    flowlines, nwm_id_column, max_snap_distance_m=2000.0
)

River discharge source point configuration.

Derives discharge source points from user-provided flowline geometries (e.g. exported from the QGIS plugin). Each flowline's downstream endpoint (closest to the AOI boundary) is registered as a SFINCS discharge source location.

Workflow Runners#

CoastalCalibRunner#

CoastalCalibRunner #

CoastalCalibRunner(config)

Main workflow runner for coastal model calibration.

This class orchestrates the entire calibration workflow, managing stage execution and progress monitoring.

Supports both SCHISM (model="schism", default) and SFINCS (model="sfincs") pipelines. The model type is selected via config.model.

Initialize the workflow runner.

PARAMETER DESCRIPTION
config

Coastal calibration configuration.

TYPE: CoastalCalibConfig

Source code in src/coastal_calibration/runner.py
def __init__(self, config: CoastalCalibConfig) -> None:
    """Initialize the workflow runner.

    Parameters
    ----------
    config : CoastalCalibConfig
        Coastal calibration configuration.
    """
    self.config = config

    # Ensure log directory exists early so file logging can start.
    config.paths.work_dir.mkdir(parents=True, exist_ok=True)

    # Set up file logging *before* creating the monitor so that
    # every message (including third-party) is captured on disk.
    if not config.monitoring.log_file:
        log_path = generate_log_path(config.paths.work_dir)
        configure_logger(file=str(log_path), file_level="DEBUG")

    # Silence noisy third-party loggers (HydroMT, xarray, ...)
    silence_third_party_loggers()

    self.monitor = WorkflowMonitor(config.monitoring)
    self._stages: dict[str, WorkflowStage] = {}
    self._results: dict[str, Any] = {}

validate #

validate()

Validate configuration and prerequisites.

RETURNS DESCRIPTION
list of str

List of validation error messages (empty if valid).

Source code in src/coastal_calibration/runner.py
def validate(self) -> list[str]:
    """Validate configuration and prerequisites.

    Returns
    -------
    list of str
        List of validation error messages (empty if valid).
    """
    errors = []

    config_errors = self.config.validate()
    errors.extend(config_errors)

    self._init_stages()
    for name, stage in self._stages.items():
        stage_errors = stage.validate()
        errors.extend(f"[{name}] {error}" for error in stage_errors)

    return errors

run #

run(start_from=None, stop_after=None, dry_run=False)

Execute the calibration workflow.

PARAMETER DESCRIPTION
start_from

Stage name to start from (skip earlier stages).

TYPE: str DEFAULT: None

stop_after

Stage name to stop after (skip later stages).

TYPE: str DEFAULT: None

dry_run

If True, validate but don't execute.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
WorkflowResult

Result with execution details.

Source code in src/coastal_calibration/runner.py
def run(
    self,
    start_from: str | None = None,
    stop_after: str | None = None,
    dry_run: bool = False,
) -> WorkflowResult:
    """Execute the calibration workflow.

    Parameters
    ----------
    start_from : str, optional
        Stage name to start from (skip earlier stages).
    stop_after : str, optional
        Stage name to stop after (skip later stages).
    dry_run : bool, default False
        If True, validate but don't execute.

    Returns
    -------
    WorkflowResult
        Result with execution details.
    """
    start_time = datetime.now()
    stages_completed: list[str] = []
    stages_failed: list[str] = []
    outputs: dict[str, Any] = {}
    errors: list[str] = []

    validation_errors = self.validate()
    # When resuming mid-pipeline, verify that earlier stages completed.
    if not validation_errors and start_from:
        validation_errors = self._check_prerequisites(start_from)
    if validation_errors:
        return WorkflowResult(
            success=False,
            job_id=None,
            start_time=start_time,
            end_time=datetime.now(),
            stages_completed=[],
            stages_failed=[],
            outputs={},
            errors=validation_errors,
        )

    if dry_run:
        self.monitor.info("Dry run mode - validation passed, no execution")
        return WorkflowResult(
            success=True,
            job_id=None,
            start_time=start_time,
            end_time=datetime.now(),
            stages_completed=[],
            stages_failed=[],
            outputs={"dry_run": True},
            errors=[],
        )

    self.monitor.register_stages(self.STAGE_ORDER)
    self.monitor.start_workflow()
    self.monitor.info("-" * 40)

    # Clean generated files from previous runs when starting fresh.
    # When resuming (start_from is set), preserve existing outputs.
    if not start_from:
        from coastal_calibration.schism.prep import clean_run_directory

        clean_run_directory(self.config.paths.work_dir)

    stages_to_run = self._get_stages_to_run(start_from, stop_after)

    current_stage = ""
    try:
        for current_stage in stages_to_run:
            stage = self._stages[current_stage]

            with self.monitor.stage_context(current_stage, stage.description):
                result = stage.run()
                self._results[current_stage] = result
                outputs[current_stage] = result
                stages_completed.append(current_stage)
                self._save_stage_status(current_stage)

        self.monitor.end_workflow(success=True)
        success = True

    except Exception as e:
        self.monitor.error(f"Workflow failed: {e}")
        self.monitor.end_workflow(success=False)
        errors.append(str(e))
        stages_failed.append(current_stage)
        success = False

    result = WorkflowResult(
        success=success,
        job_id=None,
        start_time=start_time,
        end_time=datetime.now(),
        stages_completed=stages_completed,
        stages_failed=stages_failed,
        outputs=outputs,
        errors=errors,
    )

    result_file = self.config.paths.work_dir / "workflow_result.json"
    result.save(result_file)
    self.monitor.save_progress(self.config.paths.work_dir / "workflow_progress.json")

    return result

SfincsCreator#

SfincsCreator #

SfincsCreator(config)

Runner that orchestrates the SFINCS model creation pipeline.

Mirrors :class:~coastal_calibration.runner.CoastalCalibRunner but operates on a :class:SfincsCreateConfig and delegates to :class:~coastal_calibration.sfincs.create.CreateStage instances.

Source code in src/coastal_calibration/sfincs/create.py
def __init__(self, config: SfincsCreateConfig) -> None:
    self.config = config

    # Ensure the output directory exists early so file logging can start.
    config.output_dir.mkdir(parents=True, exist_ok=True)

    # Set up file logging before creating the monitor.
    if not config.monitoring.log_file:
        log_path = generate_log_path(config.output_dir, prefix="sfincs-create")
        configure_logger(file=str(log_path), file_level="DEBUG")

    silence_third_party_loggers()

    self.monitor = WorkflowMonitor(config.monitoring)
    self._stages: dict[str, CreateStage] = {}
    self._results: dict[str, Any] = {}

run #

run(start_from=None, stop_after=None, dry_run=False)

Execute the SFINCS model creation workflow.

PARAMETER DESCRIPTION
start_from

Stage name to start from (skip earlier stages).

TYPE: str DEFAULT: None

stop_after

Stage name to stop after (skip later stages).

TYPE: str DEFAULT: None

dry_run

If True, validate but don't execute.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
WorkflowResult

Result with execution details.

Source code in src/coastal_calibration/sfincs/create.py
def run(
    self,
    start_from: str | None = None,
    stop_after: str | None = None,
    dry_run: bool = False,
) -> WorkflowResult:
    """Execute the SFINCS model creation workflow.

    Parameters
    ----------
    start_from : str, optional
        Stage name to start from (skip earlier stages).
    stop_after : str, optional
        Stage name to stop after (skip later stages).
    dry_run : bool, default False
        If True, validate but don't execute.

    Returns
    -------
    WorkflowResult
        Result with execution details.
    """
    start_time = datetime.now()
    stages_completed: list[str] = []
    stages_failed: list[str] = []
    outputs: dict[str, Any] = {}
    errors: list[str] = []

    validation_errors = self.validate()
    if not validation_errors and start_from:
        validation_errors = self._check_prerequisites(start_from)
    if validation_errors:
        return WorkflowResult(
            success=False,
            job_id=None,
            start_time=start_time,
            end_time=datetime.now(),
            stages_completed=[],
            stages_failed=[],
            outputs={},
            errors=validation_errors,
        )

    if dry_run:
        self.monitor.info("Dry run mode - validation passed, no execution")
        return WorkflowResult(
            success=True,
            job_id=None,
            start_time=start_time,
            end_time=datetime.now(),
            stages_completed=[],
            stages_failed=[],
            outputs={"dry_run": True},
            errors=[],
        )

    self.monitor.register_stages(self.stage_order)
    self.monitor.start_workflow()
    self.monitor.info("-" * 40)

    stages_to_run = self._get_stages_to_run(start_from, stop_after)

    # When resuming from a later stage, load the existing model so
    # that stages which reference ``self.sfincs`` can find it.
    if start_from and "create_grid" not in stages_to_run:
        _load_existing_model(self.config)

    current_stage = ""
    try:
        with suppress_hydromt_output():
            for current_stage in stages_to_run:
                stage = self._stages[current_stage]

                with self.monitor.stage_context(current_stage, stage.description):
                    result = stage.run()
                    self._results[current_stage] = result
                    outputs[current_stage] = result
                    stages_completed.append(current_stage)
                    self._save_stage_status(current_stage)

        self.monitor.end_workflow(success=True)
        success = True

    except Exception as e:
        self.monitor.error(f"Workflow failed: {e}")
        self.monitor.end_workflow(success=False)
        errors.append(str(e))
        stages_failed.append(current_stage)
        success = False

    result = WorkflowResult(
        success=success,
        job_id=None,
        start_time=start_time,
        end_time=datetime.now(),
        stages_completed=stages_completed,
        stages_failed=stages_failed,
        outputs=outputs,
        errors=errors,
    )

    result_file = self.config.output_dir / "create_result.json"
    result.save(result_file)
    self.monitor.save_progress(self.config.output_dir / "create_progress.json")

    return result

WorkflowResult#

WorkflowResult dataclass #

WorkflowResult(
    success,
    job_id,
    start_time,
    end_time,
    stages_completed,
    stages_failed,
    outputs,
    errors,
)

Result of a workflow execution.

duration_seconds property #

duration_seconds

Get workflow duration in seconds.

to_dict #

to_dict()

Convert to dictionary.

Source code in src/coastal_calibration/runner.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary."""
    return {
        "success": self.success,
        "job_id": self.job_id,
        "start_time": self.start_time.isoformat(),
        "end_time": self.end_time.isoformat() if self.end_time else None,
        "duration_seconds": self.duration_seconds,
        "stages_completed": self.stages_completed,
        "stages_failed": self.stages_failed,
        "outputs": self.outputs,
        "errors": self.errors,
    }

save #

save(path)

Save result to JSON file.

PARAMETER DESCRIPTION
path

Path to output JSON file. Parent directories will be created if they don't exist.

TYPE: Path or str

Source code in src/coastal_calibration/runner.py
def save(self, path: Path | str) -> None:
    """Save result to JSON file.

    Parameters
    ----------
    path : Path or str
        Path to output JSON file. Parent directories will be created
        if they don't exist.
    """
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(self.to_dict(), indent=2))

Plotting#

SfincsGridInfo#

SfincsGridInfo dataclass #

SfincsGridInfo(
    grid_type,
    crs,
    base_resolution,
    levels,
    n_faces=None,
    n_edges=None,
    shape=None,
    _verts=None,
    _level_per_face=None,
    _mask=None,
    _grid_extent=None,
)

Summary of a SFINCS model grid.

Use :meth:from_model_root to construct from a SFINCS model directory. The instance carries enough pre-computed state to drive :func:plot_mesh without re-loading the model.

Examples:

>>> info = SfincsGridInfo.from_model_root("run/sfincs_model")
>>> print(info)
SfincsGridInfo(quadtree, EPSG:32619)
  Faces:     293,850
  Edges:     596,123
  Level 1:    7,090 cells (512 m)
  ...

from_model_root classmethod #

from_model_root(model_root)

Load grid metadata from a SFINCS model directory.

PARAMETER DESCRIPTION
model_root

Path to the SFINCS model directory (must contain sfincs.inp and, for quadtree models, sfincs.nc).

TYPE: Path | str

Source code in src/coastal_calibration/sfincs/plotting.py
@classmethod
def from_model_root(
    cls,
    model_root: Path | str,
) -> SfincsGridInfo:
    """Load grid metadata from a SFINCS model directory.

    Parameters
    ----------
    model_root:
        Path to the SFINCS model directory (must contain ``sfincs.inp``
        and, for quadtree models, ``sfincs.nc``).
    """
    from coastal_calibration.logging import suppress_hydromt_output
    from coastal_calibration.sfincs._hydromt_compat import apply_all_patches

    apply_all_patches()

    with suppress_hydromt_output():
        from hydromt_sfincs import SfincsModel

        sf = SfincsModel(root=str(model_root), mode="r+")
        sf.read()
    return cls._from_loaded_model(sf)

plot_mesh#

plot_mesh #

plot_mesh(
    info,
    *,
    ax=None,
    title=None,
    basemap=True,
    basemap_source=None,
    basemap_zoom=11,
    figsize=(11, 7),
)

Plot the SFINCS mesh colored by refinement level.

PARAMETER DESCRIPTION
info

Grid metadata from :meth:SfincsGridInfo.from_model_root.

TYPE: SfincsGridInfo

ax

Existing axes to plot into. A new figure is created when None.

TYPE: Axes | None DEFAULT: None

title

Plot title. Defaults to a description derived from info.

TYPE: str | None DEFAULT: None

basemap

If True (default), overlay satellite imagery via contextily.

TYPE: bool DEFAULT: True

basemap_source

Tile provider passed to contextily.add_basemap. Defaults to cx.providers.Esri.WorldImagery.

TYPE: Any | None DEFAULT: None

basemap_zoom

Zoom level for the basemap tiles.

TYPE: int DEFAULT: 11

figsize

Figure size when ax is None.

TYPE: tuple[float, float] DEFAULT: (11, 7)

RETURNS DESCRIPTION
(Figure, Axes)
Source code in src/coastal_calibration/sfincs/plotting.py
def plot_mesh(
    info: SfincsGridInfo,
    *,
    ax: Axes | None = None,
    title: str | None = None,
    basemap: bool = True,
    basemap_source: Any | None = None,
    basemap_zoom: int = 11,
    figsize: tuple[float, float] = (11, 7),
) -> tuple[Figure, Axes]:
    """Plot the SFINCS mesh colored by refinement level.

    Parameters
    ----------
    info:
        Grid metadata from :meth:`SfincsGridInfo.from_model_root`.
    ax:
        Existing axes to plot into.  A new figure is created when *None*.
    title:
        Plot title.  Defaults to a description derived from *info*.
    basemap:
        If *True* (default), overlay satellite imagery via *contextily*.
    basemap_source:
        Tile provider passed to ``contextily.add_basemap``.  Defaults to
        ``cx.providers.Esri.WorldImagery``.
    basemap_zoom:
        Zoom level for the basemap tiles.
    figsize:
        Figure size when *ax* is *None*.

    Returns
    -------
    (Figure, Axes)
    """
    import matplotlib.pyplot as plt

    if ax is None:
        fig, ax = plt.subplots(figsize=figsize)
    else:
        fig = ax.get_figure()
        if fig is None:  # pragma: no cover
            msg = "ax must be attached to a Figure"
            raise ValueError(msg)

    if info.grid_type == "quadtree":
        _plot_quadtree(info, ax)
    else:
        _plot_regular(info, ax)

    if title is None:
        title = f"SFINCS {info.grid_type} mesh ({info.base_resolution:.0f} m)"
    ax.set_title(title)

    if basemap:
        _add_basemap(ax, info.crs, basemap_source, basemap_zoom)

    return fig, ax  # pyright: ignore[reportReturnType]

plot_floodmap#

plot_floodmap #

plot_floodmap(
    floodmap_path,
    *,
    ax=None,
    title=None,
    basemap=True,
    basemap_source=None,
    basemap_zoom=12,
    max_display_px=2000,
    vmax_percentile=98,
    figsize=(11, 7),
    color_map="viridis_r",
)

Plot a flood-depth COG with an optional satellite basemap.

Reads at an overview level that keeps the longest axis under max_display_px pixels, masks dry / NaN pixels, and renders with a reverse viridis color map.

PARAMETER DESCRIPTION
floodmap_path

Path to the flood-depth GeoTIFF (e.g. floodmap_hmax.tif).

TYPE: Path or str

ax

Existing axes to plot into. A new figure is created when None.

TYPE: Axes DEFAULT: None

title

Plot title. Defaults to "Flood depth (hmax)".

TYPE: str DEFAULT: None

basemap

If True (default), overlay satellite imagery via contextily.

TYPE: bool DEFAULT: True

basemap_source

Tile provider passed to contextily.add_basemap.

TYPE: optional DEFAULT: None

basemap_zoom

Zoom level for the basemap tiles, by default 12.

TYPE: int DEFAULT: 12

max_display_px

Target maximum dimension (in pixels) for the rendered raster. Controls which overview level is read, by default 2000.

TYPE: int DEFAULT: 2000

vmax_percentile

Upper percentile for the color-map range.

TYPE: float DEFAULT: 98

figsize

Figure size when ax is None, by default (11, 7).

TYPE: tuple DEFAULT: (11, 7)

color_map

Name of the Matplotlib colormap to use for plotting the flood depth, by default "viridis_r".

TYPE: str DEFAULT: 'viridis_r'

RETURNS DESCRIPTION
(Figure, Axes)
Source code in src/coastal_calibration/sfincs/plotting.py
def plot_floodmap(
    floodmap_path: Path | str,
    *,
    ax: Axes | None = None,
    title: str | None = None,
    basemap: bool = True,
    basemap_source: Any | None = None,
    basemap_zoom: int = 12,
    max_display_px: int = 2000,
    vmax_percentile: float = 98,
    figsize: tuple[float, float] = (11, 7),
    color_map: str = "viridis_r",
) -> tuple[Figure | SubFigure, Axes]:
    """Plot a flood-depth COG with an optional satellite basemap.

    Reads at an overview level that keeps the longest axis under
    *max_display_px* pixels, masks dry / NaN pixels, and renders
    with a reverse viridis color map.

    Parameters
    ----------
    floodmap_path : Path or str
        Path to the flood-depth GeoTIFF (e.g. ``floodmap_hmax.tif``).
    ax : Axes, optional
        Existing axes to plot into. A new figure is created when *None*.
    title : str, optional
        Plot title. Defaults to ``"Flood depth (hmax)"``.
    basemap : bool, optional
        If *True* (default), overlay satellite imagery via *contextily*.
    basemap_source : optional
        Tile provider passed to ``contextily.add_basemap``.
    basemap_zoom : int, optional
        Zoom level for the basemap tiles, by default 12.
    max_display_px : int, optional
        Target maximum dimension (in pixels) for the rendered raster.
        Controls which overview level is read, by default 2000.
    vmax_percentile : float, optional
        Upper percentile for the color-map range.
    figsize : tuple, optional
        Figure size when *ax* is *None*, by default (11, 7).
    color_map : str, optional
        Name of the Matplotlib colormap to use for plotting the flood depth,
        by default "viridis_r".

    Returns
    -------
    (Figure, Axes)
    """
    import matplotlib.pyplot as plt
    import rasterio

    floodmap_path = Path(floodmap_path)
    if not floodmap_path.exists():
        raise FileNotFoundError(
            f"Flood map not found: {floodmap_path} — "
            "ensure floodmap_dem is set and sfincs_map.nc contains zsmax."
        )

    if color_map not in plt.colormaps:
        raise ValueError(
            f"Invalid color_map: {color_map}. Must be a valid Matplotlib colormap name."
        )

    # ── Read metadata at full resolution ─────────────────────────
    with rasterio.open(floodmap_path) as src:
        bounds = src.bounds
        raster_crs = src.crs

        overviews = src.overviews(1)
        ovr_idx = next(
            (
                i
                for i, f in enumerate(overviews)
                if max(src.height, src.width) / f <= max_display_px
            ),
            len(overviews) - 1,
        )

    # ── Read at overview level ───────────────────────────────────
    with rasterio.open(floodmap_path, overview_level=ovr_idx) as src:
        hmax = src.read(1)

    hmax_masked = np.ma.masked_where(~np.isfinite(hmax) | (hmax <= 0), hmax)
    if hmax_masked.count() == 0:
        raise ValueError("No valid flood depth values found in the raster.")

    if ax is None:
        fig, ax = plt.subplots(figsize=figsize)
    else:
        fig = ax.get_figure()
        if fig is None:  # pragma: no cover
            msg = "ax must be attached to a Figure"
            raise ValueError(msg)

    extent = (bounds.left, bounds.right, bounds.bottom, bounds.top)
    cmap = plt.colormaps[color_map].copy()
    cmap.set_bad(alpha=0)

    im = ax.imshow(
        hmax_masked,
        extent=extent,
        origin="upper",
        cmap=cmap,
        vmin=0,
        vmax=np.percentile(hmax_masked.compressed(), vmax_percentile),
        interpolation="nearest",
        zorder=2,
    )
    fig.colorbar(im, ax=ax, label="Flood depth (m)", shrink=0.6, pad=0.02, extend="both")

    if title is None:
        title = "Flood depth (hmax)"
    ax.set_title(title)

    if basemap:
        import contextily as cx

        if basemap_source is None:
            basemap_source = cx.providers.Esri.WorldImagery  # pyright: ignore[reportAttributeAccessIssue]
        cx.add_basemap(ax, crs=raster_crs, source=basemap_source, zoom=basemap_zoom)  # pyright: ignore[reportArgumentType]

    return fig, ax

plot_station_comparison#

plot_station_comparison #

plot_station_comparison(
    sim_times, sim_elevation, station_ids, obs_ds, figs_dir
)

Create comparison figures of simulated vs observed water levels.

Stations that lack either valid observations or valid simulated data are skipped so that empty panels do not appear.

PARAMETER DESCRIPTION
sim_times

Simulation datetimes.

TYPE: array - like

sim_elevation

Simulated elevation of shape (n_times, n_stations).

TYPE: ndarray

station_ids

NOAA station IDs (one per column in sim_elevation).

TYPE: list[str]

obs_ds

Observed water levels with a water_level variable indexed by station and time.

TYPE: Dataset

figs_dir

Output directory for figures.

TYPE: Path or str

RETURNS DESCRIPTION
list[Path]

Paths to the saved figures.

Source code in src/coastal_calibration/plotting/stations.py
def plot_station_comparison(
    sim_times: Any,
    sim_elevation: NDArray[np.float64],
    station_ids: list[str],
    obs_ds: Any,
    figs_dir: Path | str,
) -> list[Path]:
    """Create comparison figures of simulated vs observed water levels.

    Stations that lack *either* valid observations or valid simulated
    data are skipped so that empty panels do not appear.

    Parameters
    ----------
    sim_times : array-like
        Simulation datetimes.
    sim_elevation : ndarray
        Simulated elevation of shape ``(n_times, n_stations)``.
    station_ids : list[str]
        NOAA station IDs (one per column in *sim_elevation*).
    obs_ds : xr.Dataset
        Observed water levels with a ``water_level`` variable indexed
        by ``station`` and ``time``.
    figs_dir : Path or str
        Output directory for figures.

    Returns
    -------
    list[Path]
        Paths to the saved figures.
    """
    import sys

    import matplotlib

    # Force non-interactive backend except inside Jupyter kernels.
    if "ipykernel" not in sys.modules:
        matplotlib.use("Agg")

    import matplotlib.dates as mdates
    import matplotlib.pyplot as plt

    figs_dir = Path(figs_dir)

    # ── Pre-filter: keep only stations with both obs and sim ──
    stations = plotable_stations(station_ids, sim_elevation, obs_ds)

    if not stations:
        figs_dir.mkdir(parents=True, exist_ok=True)
        return []

    n_plotable = len(stations)
    n_figures = math.ceil(n_plotable / _STATIONS_PER_FIGURE)
    figs_dir.mkdir(parents=True, exist_ok=True)

    saved: list[Path] = []
    for fig_idx in range(n_figures):
        start = fig_idx * _STATIONS_PER_FIGURE
        end = min(start + _STATIONS_PER_FIGURE, n_plotable)
        batch = stations[start:end]
        batch_size = len(batch)

        nrows = 2 if batch_size > 2 else 1
        ncols = 2 if batch_size > 1 else 1

        fig, axes = plt.subplots(
            nrows,
            ncols,
            figsize=(16, 5 * nrows),
            squeeze=False,
        )
        axes_flat = axes.ravel()

        for i, (sid, col_idx) in enumerate(batch):
            ax = axes_flat[i]

            # Simulated
            sim_ts = sim_elevation[:, col_idx]
            has_sim = bool(np.isfinite(sim_ts).any())

            # Observed
            has_obs = False
            if sid in obs_ds.station.values:
                obs_wl = obs_ds.water_level.sel(station=sid)
                has_obs = bool(np.isfinite(obs_wl).any())
                if has_obs:
                    ax.plot(
                        obs_wl.time.values,
                        obs_wl.values,
                        label="Observed",
                        color="k",
                        linewidth=1.0,
                    )

            if has_sim:
                ax.plot(
                    sim_times,
                    sim_ts,
                    color="r",
                    ls="--",
                    alpha=0.5,
                )
                ax.scatter(
                    sim_times,
                    sim_ts,
                    label="Simulated",
                    color="r",
                    marker="x",
                    s=25,
                )

            ax.set_title(f"NOAA {sid}", fontsize=14, fontweight="bold")
            ax.set_ylabel("Water Level (m, MSL)", fontsize=12)
            ax.tick_params(axis="both", labelsize=11)
            ax.legend(fontsize=11, loc="best")

            # Readable date formatting on x-axis
            ax.xaxis.set_major_formatter(mdates.DateFormatter("%m-%d %H:%M"))
            ax.xaxis.set_major_locator(mdates.AutoDateLocator(minticks=4, maxticks=8))
            for label in ax.get_xticklabels():
                label.set_rotation(30)
                label.set_horizontalalignment("right")

        # Remove unused axes
        for j in range(batch_size, nrows * ncols):
            axes_flat[j].remove()

        fig.tight_layout()
        fig_path = figs_dir / f"stations_comparison_{fig_idx + 1:03d}.png"
        fig.savefig(fig_path, dpi=300, bbox_inches="tight")
        plt.close(fig)
        saved.append(fig_path)

    return saved

plotable_stations#

plotable_stations #

plotable_stations(station_ids, sim_elevation, obs_ds)

Return (station_id, column_index) pairs that have data to plot.

A station is plotable only when both its simulated and observed time-series contain finite values — a comparison plot with only one series is not useful.

The returned list is sorted by numeric station ID so that figures are deterministic across runs.

Source code in src/coastal_calibration/plotting/stations.py
def plotable_stations(
    station_ids: list[str],
    sim_elevation: NDArray[np.float64],
    obs_ds: Any,
) -> list[tuple[str, int]]:
    """Return ``(station_id, column_index)`` pairs that have data to plot.

    A station is plotable only when *both* its simulated and observed
    time-series contain finite values — a comparison plot with only
    one series is not useful.

    The returned list is sorted by numeric station ID so that figures
    are deterministic across runs.
    """
    result: list[tuple[str, int]] = []
    for i, sid in enumerate(station_ids):
        has_sim = bool(np.isfinite(sim_elevation[:, i]).any())
        has_obs = False
        if sid in obs_ds.station.values:
            has_obs = bool(np.isfinite(obs_ds.water_level.sel(station=sid)).any())
        if has_sim and has_obs:
            result.append((sid, i))
    try:
        result.sort(key=lambda pair: int(pair[0]))
    except ValueError:
        result.sort(key=lambda pair: pair[0])
    return result

Flood Depth Map#

create_flood_depth_map#

create_flood_depth_map #

create_flood_depth_map(
    model_root,
    dem_path,
    output_path=None,
    *,
    index_path=None,
    create_index=True,
    hmin=0.05,
    reproj_method="nearest",
    nrmax=2000,
    model=None,
    log=None,
)

Create a downscaled flood depth map from SFINCS output.

Reads the maximum water surface elevation (zsmax) from the SFINCS map output, optionally builds an index COG that maps DEM pixels to SFINCS grid cells, then downscales onto a high-resolution DEM to produce a Cloud Optimized GeoTIFF of maximum flood depth.

PARAMETER DESCRIPTION
model_root

Path to the SFINCS model directory (must contain sfincs_map.nc).

TYPE: Path or str

dem_path

Path to a high-resolution DEM GeoTIFF covering the model domain.

TYPE: Path or str

output_path

Output flood depth COG path. Defaults to <model_root>/floodmap_hmax.tif.

TYPE: Path or str DEFAULT: None

index_path

Path for the index COG (DEM pixels -> SFINCS cell mapping). Defaults to <model_root>/floodmap_index.tif.

TYPE: Path or str DEFAULT: None

create_index

If True (default), (re)generate the index COG via :func:make_index_cog. The index speeds up the downscaling significantly for large DEMs.

TYPE: bool DEFAULT: True

hmin

Minimum flood depth (m) to classify a pixel as flooded.

TYPE: float DEFAULT: 0.05

reproj_method

Reprojection method ("nearest" or "bilinear").

TYPE: str DEFAULT: 'nearest'

nrmax

Maximum cells per processing block (controls peak memory).

TYPE: int DEFAULT: 2000

model

An already-loaded :class:SfincsModel instance. When provided, model_root is still used to resolve default output paths but the model is not re-read from disk.

TYPE: SfincsModel DEFAULT: None

log

Logging callback accepting a single message; falls back to the module logger when None.

TYPE: callable DEFAULT: None

RETURNS DESCRIPTION
Path

Path to the generated flood depth COG.

RAISES DESCRIPTION
FileNotFoundError

If the DEM or sfincs_map.nc cannot be found.

KeyError

If zsmax is not present in the SFINCS map output.

Source code in src/coastal_calibration/sfincs/floodmap.py
def create_flood_depth_map(
    model_root: Path | str,
    dem_path: Path | str,
    output_path: Path | str | None = None,
    *,
    index_path: Path | str | None = None,
    create_index: bool = True,
    hmin: float = 0.05,
    reproj_method: str = "nearest",
    nrmax: int = 2000,
    model: SfincsModel | None = None,
    log: Any = None,
) -> Path:
    """Create a downscaled flood depth map from SFINCS output.

    Reads the maximum water surface elevation (``zsmax``) from the SFINCS
    map output, optionally builds an index COG that maps DEM pixels to
    SFINCS grid cells, then downscales onto a high-resolution DEM to
    produce a Cloud Optimized GeoTIFF of maximum flood depth.

    Parameters
    ----------
    model_root : Path or str
        Path to the SFINCS model directory (must contain ``sfincs_map.nc``).
    dem_path : Path or str
        Path to a high-resolution DEM GeoTIFF covering the model domain.
    output_path : Path or str, optional
        Output flood depth COG path.  Defaults to
        ``<model_root>/floodmap_hmax.tif``.
    index_path : Path or str, optional
        Path for the index COG (DEM pixels -> SFINCS cell mapping).
        Defaults to ``<model_root>/floodmap_index.tif``.
    create_index : bool
        If True (default), (re)generate the index COG via
        :func:`make_index_cog`.  The index speeds up the downscaling
        significantly for large DEMs.
    hmin : float
        Minimum flood depth (m) to classify a pixel as flooded.
    reproj_method : str
        Reprojection method (``"nearest"`` or ``"bilinear"``).
    nrmax : int
        Maximum cells per processing block (controls peak memory).
    model : SfincsModel, optional
        An already-loaded :class:`SfincsModel` instance.  When provided,
        ``model_root`` is still used to resolve default output paths but
        the model is **not** re-read from disk.
    log : callable, optional
        Logging callback accepting a single message; falls back to the module
        logger when *None*.

    Returns
    -------
    Path
        Path to the generated flood depth COG.

    Raises
    ------
    FileNotFoundError
        If the DEM or ``sfincs_map.nc`` cannot be found.
    KeyError
        If ``zsmax`` is not present in the SFINCS map output.
    """
    # -- Ensure patches are applied before any hydromt-sfincs call --
    from coastal_calibration.logging import suppress_hydromt_output
    from coastal_calibration.sfincs._hydromt_compat import apply_all_patches

    apply_all_patches()

    # Import *after* patches so local references pick up the fixed versions.
    with suppress_hydromt_output():
        from hydromt_sfincs.workflows.downscaling import make_index_cog

    model_root = Path(model_root)
    dem_path = Path(dem_path)

    if not dem_path.exists():
        raise FileNotFoundError(f"DEM not found: {dem_path}")

    map_file = model_root / "sfincs_map.nc"
    if not map_file.exists():
        raise FileNotFoundError(f"SFINCS map output not found: {map_file}")

    output_path = model_root / "floodmap_hmax.tif" if output_path is None else Path(output_path)
    index_path = model_root / "floodmap_index.tif" if index_path is None else Path(index_path)

    def _info(msg: str) -> None:
        if log is not None:
            log(msg)
        else:
            _log.info(msg)

    # -- Load model and read output ---------------------------------
    if model is None:
        with suppress_hydromt_output():
            from hydromt_sfincs import SfincsModel as _Sfincs

            # Use "r+" (same as the pipeline) so all components are writable
            # and the quadtree grid loads correctly.
            model = _Sfincs(root=str(model_root), mode="r+")
            model.read()

    with suppress_hydromt_output():
        model.output.read()

    if "zsmax" not in model.output.data:
        raise KeyError(
            "Variable 'zsmax' not found in SFINCS map output. "
            "Ensure SFINCS was configured to write zsmax (storzsmax = 1 in sfincs.inp)."
        )
    zsmax = model.output.data["zsmax"]
    _info(f"Loaded zsmax from {map_file}")

    # -- (Re)create index COG ---------------------------------------
    # Always regenerate so the index stays consistent with the
    # current grid and the patched ``get_indices_at_points``.
    if create_index:
        _info(f"Creating index COG: {index_path}")
        index_path.parent.mkdir(parents=True, exist_ok=True)
        output_path.parent.mkdir(parents=True, exist_ok=True)
        with suppress_hydromt_output():
            make_index_cog(
                model=model,
                indices_fn=str(index_path),
                topobathy_fn=str(dem_path),
                nrmax=nrmax,
            )
        _ensure_overviews(index_path, _info)
        _info(f"Index COG created ({index_path.stat().st_size / 1e6:.1f} MB)")

    # -- Downscale --------------------------------------------------
    # Read the DEM and index at full resolution.  The upstream
    # ``downscale_floodmap`` defaults to ``overview_level=0`` when
    # reading rasters from disk, which silently halves the resolution
    # and can mismatch with the index.  By loading the rasters into
    # memory first and passing them as DataArrays we bypass that bug.
    _info("Downscaling flood depth map")
    output_path.parent.mkdir(parents=True, exist_ok=True)

    _write_floodmap_cog(
        zsmax=zsmax,
        dem_path=dem_path,
        index_path=index_path if index_path.exists() else None,
        output_path=output_path,
        hmin=hmin,
        reproj_method=reproj_method,
        nrmax=nrmax,
        log_fn=_info,
    )

    _ensure_overviews(output_path, _info)

    size_mb = output_path.stat().st_size / 1e6
    _info(f"Flood depth map written: {output_path} ({size_mb:.1f} MB)")

    return output_path

Downloader#

validate_date_ranges#

validate_date_ranges #

validate_date_ranges(
    start_time,
    end_time,
    meteo_source,
    coastal_source,
    domain,
)

Validate that requested dates are within available ranges.

Source code in src/coastal_calibration/data/downloader.py
def validate_date_ranges(
    start_time: datetime,
    end_time: datetime,
    meteo_source: str,
    coastal_source: str,
    domain: str,
) -> list[str]:
    """Validate that requested dates are within available ranges."""
    errors: list[str] = []

    meteo_range = get_date_range(meteo_source, domain)
    if meteo_range:
        error = meteo_range.validate(start_time, end_time)
        if error:
            errors.append(error)

    if coastal_source != "tpxo":
        coastal_range = get_date_range(coastal_source, domain)
        if coastal_range:
            error = coastal_range.validate(start_time, end_time)
            if error:
                errors.append(error)

    return errors

NOAA CO-OPS API#

COOPSAPIClient#

COOPSAPIClient #

COOPSAPIClient(timeout=120)

Client for interacting with NOAA CO-OPS API.

Initialize COOPS API client.

PARAMETER DESCRIPTION
timeout

Request timeout in seconds, by default 120

TYPE: int DEFAULT: 120

RAISES DESCRIPTION
ImportError

If plot optional dependencies are not installed.

Source code in src/coastal_calibration/data/coops_api.py
def __init__(self, timeout: int = 120) -> None:
    """Initialize COOPS API client.

    Parameters
    ----------
    timeout : int, optional
        Request timeout in seconds, by default 120

    Raises
    ------
    ImportError
        If plot optional dependencies are not installed.
    """
    _check_plot_deps()
    self.timeout = timeout
    self._stations_metadata = self._get_stations_metadata()

stations_metadata property #

stations_metadata

Get metadata for all water level stations as a GeoDataFrame.

RETURNS DESCRIPTION
GeoDataFrame

GeoDataFrame with station metadata and Point geometries.

validate_parameters #

validate_parameters(
    product, datum, units, time_zone, interval
)

Validate API parameters.

PARAMETER DESCRIPTION
product

Data product type

TYPE: str

datum

Vertical datum

TYPE: str

units

Unit system

TYPE: str

time_zone

Time zone

TYPE: str

interval

Time interval for predictions

TYPE: str | int | None

RAISES DESCRIPTION
ValueError

If any parameter is invalid

Source code in src/coastal_calibration/data/coops_api.py
def validate_parameters(
    self,
    product: str,
    datum: str,
    units: str,
    time_zone: str,
    interval: str | int | None,
) -> None:
    """Validate API parameters.

    Parameters
    ----------
    product : str
        Data product type
    datum : str
        Vertical datum
    units : str
        Unit system
    time_zone : str
        Time zone
    interval : str | int | None
        Time interval for predictions

    Raises
    ------
    ValueError
        If any parameter is invalid
    """
    if product not in self.valid_products:
        raise ValueError(
            f"Invalid product '{product}'. Must be one of: {', '.join(self.valid_products)}"
        )

    if datum.upper() not in self.valid_datums:
        raise ValueError(
            f"Invalid datum '{datum}'. Must be one of: {', '.join(self.valid_datums)}"
        )

    if units not in self.valid_units:
        raise ValueError(
            f"Invalid units '{units}'. Must be one of: {', '.join(self.valid_units)}"
        )

    if time_zone not in self.valid_timezones:
        raise ValueError(
            f"Invalid time_zone '{time_zone}'. Must be one of: {', '.join(self.valid_timezones)}"
        )

    if (
        product == "predictions"
        and interval is not None
        and str(interval) not in self.valid_intervals
    ):
        raise ValueError(
            f"Invalid interval '{interval}' for predictions. "
            f"Must be one of: {', '.join(self.valid_intervals)}"
        )

build_url #

build_url(
    station_id,
    begin_date,
    end_date,
    product,
    datum,
    units,
    time_zone,
    interval,
)

Build API request URL for a station.

PARAMETER DESCRIPTION
station_id

Station ID

TYPE: str

begin_date

Start date

TYPE: str

end_date

End date

TYPE: str

product

Data product

TYPE: str

datum

Vertical datum

TYPE: str

units

Unit system

TYPE: str

time_zone

Time zone

TYPE: str

interval

Time interval for predictions

TYPE: str | int | None

RETURNS DESCRIPTION
str

Complete API request URL

Source code in src/coastal_calibration/data/coops_api.py
def build_url(
    self,
    station_id: str,
    begin_date: str,
    end_date: str,
    product: str,
    datum: str,
    units: str,
    time_zone: str,
    interval: str | int | None,
) -> str:
    """Build API request URL for a station.

    Parameters
    ----------
    station_id : str
        Station ID
    begin_date : str
        Start date
    end_date : str
        End date
    product : str
        Data product
    datum : str
        Vertical datum
    units : str
        Unit system
    time_zone : str
        Time zone
    interval : str | int | None, optional
        Time interval for predictions

    Returns
    -------
    str
        Complete API request URL
    """
    params = {
        "begin_date": begin_date,
        "end_date": end_date,
        "station": station_id,
        "product": product,
        "datum": datum,
        "units": units,
        "time_zone": time_zone,
        "format": "json",
        "application": "coastal_calibration_coops",
    }

    if product == "predictions" and interval is not None:
        params["interval"] = str(interval)

    query_parts = [f"{k}={v}" for k, v in params.items()]
    return f"{self.base_url}?{'&'.join(query_parts)}"

fetch_data #

fetch_data(urls)

Fetch data from API for multiple URLs.

PARAMETER DESCRIPTION
urls

List of API request URLs

TYPE: list[str]

RETURNS DESCRIPTION
list[dict | None]

List of JSON responses (None for failed requests)

Source code in src/coastal_calibration/data/coops_api.py
def fetch_data(self, urls: list[str]) -> list[dict[str, Any] | None]:
    """Fetch data from API for multiple URLs.

    Parameters
    ----------
    urls : list[str]
        List of API request URLs

    Returns
    -------
    list[dict | None]
        List of JSON responses (None for failed requests)
    """
    logger.info("  Fetching data from %d station(s)", len(urls))

    return fetch(
        urls,
        "json",
        request_method="get",
        timeout=self.timeout,
        raise_status=False,
    )

get_datums #

get_datums(station_ids: str) -> StationDatum
get_datums(station_ids: list[str]) -> list[StationDatum]
get_datums(station_ids)

Retrieve datum information for one or more stations.

PARAMETER DESCRIPTION
station_ids

Single station ID or list of station IDs

TYPE: str | list[str]

RETURNS DESCRIPTION
StationDatum | list[StationDatum]

Single StationDatum object if input is str, list of StationDatum if input is list

RAISES DESCRIPTION
ValueError

If no valid datum data is returned for any station.

Source code in src/coastal_calibration/data/coops_api.py
def get_datums(self, station_ids: str | list[str]) -> StationDatum | list[StationDatum]:
    """Retrieve datum information for one or more stations.

    Parameters
    ----------
    station_ids : str | list[str]
        Single station ID or list of station IDs

    Returns
    -------
    StationDatum | list[StationDatum]
        Single StationDatum object if input is str,
        list of StationDatum if input is list

    Raises
    ------
    ValueError
        If no valid datum data is returned for any station.
    """
    import numpy as np

    single_input = isinstance(station_ids, str)
    ids: list[str] = [station_ids] if isinstance(station_ids, str) else list(station_ids)

    datum_base_url = "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi/stations"
    urls = [f"{datum_base_url}/{sid}/datums.json" for sid in ids]
    logger.info("  Fetching datum information for %d station(s)", len(ids))
    responses = self.fetch_data(urls)
    datum_objects = []
    for station_id, response in zip(ids, responses, strict=False):
        if response is None:
            logger.warning("  No datum data returned for station %s", station_id)
            continue

        if "error" in response:
            logger.warning(
                "  Datum API error for station %s: %s",
                station_id,
                response["error"].get("message", "Unknown error"),
            )
            continue

        raw_datums = response.get("datums") or []
        datum_values = [
            DatumValue(
                name=datum_dict.get("name", ""),
                description=datum_dict.get("description", ""),
                value=np.float64(datum_dict.get("value", np.nan)),
            )
            for datum_dict in raw_datums
        ]

        station_datum = StationDatum(
            station_id=station_id,
            accepted=response.get("accepted", ""),
            superseded=response.get("superseded", ""),
            epoch=response.get("epoch", ""),
            units=response.get("units", ""),
            orthometric_datum=response.get("OrthometricDatum", ""),
            datums=datum_values,
            lat=np.float64(response.get("LAT", np.nan)),
            lat_date=response.get("LATdate", ""),
            lat_time=response.get("LATtime", ""),
            hat=np.float64(response.get("HAT", np.nan)),
            hat_date=response.get("HATdate", ""),
            hat_time=response.get("HATtime", ""),
            min_value=np.float64(response.get("min", np.nan)),
            min_date=response.get("mindate", ""),
            min_time=response.get("mintime", ""),
            max_value=np.float64(response.get("max", np.nan)),
            max_date=response.get("maxdate", ""),
            max_time=response.get("maxtime", ""),
            datum_analysis_period=response.get("DatumAnalysisPeriod") or [],
            ngs_link=response.get("NGSLink", ""),
            ctrl_station=response.get("ctrlStation", ""),
        )

        datum_objects.append(station_datum)

    if not datum_objects:
        raise ValueError("No valid datum data returned for any station")

    if single_input:
        return datum_objects[0]
    return datum_objects

query_coops_byids#

query_coops_byids #

query_coops_byids(
    station_ids,
    begin_date,
    end_date,
    *,
    product="water_level",
    datum="MLLW",
    units="metric",
    time_zone="gmt",
    interval=None,
)

Fetch water level data from NOAA CO-OPS API for multiple stations.

PARAMETER DESCRIPTION
station_ids

List of station IDs to retrieve data for.

TYPE: list[str]

begin_date

Start date in format: yyyyMMdd, yyyyMMdd HH:mm, MM/dd/yyyy, or MM/dd/yyyy HH:mm

TYPE: str

end_date

End date in same format as begin_date.

TYPE: str

product

Data product to retrieve, by default water_level.

TYPE: ('water_level', 'hourly_height', 'high_low', 'predictions') DEFAULT: "water_level"

datum

Vertical datum for water levels, by default "MLLW".

TYPE: str DEFAULT: 'MLLW'

units

Units for data, by default "metric".

TYPE: ('metric', 'english') DEFAULT: "metric"

time_zone

Time zone for returned data, by default "gmt".

TYPE: ('gmt', 'lst', 'lst_ldt') DEFAULT: "gmt"

interval

Time interval for predictions product only, by default None.

TYPE: str | int | None DEFAULT: None

RETURNS DESCRIPTION
Dataset

Dataset containing water level data with dimensions (time, station).

RAISES DESCRIPTION
ValueError

If invalid parameters are provided or if API returns errors.

Source code in src/coastal_calibration/data/coops_api.py
def query_coops_byids(
    station_ids: list[str],
    begin_date: str,
    end_date: str,
    *,
    product: Literal[
        "water_level",
        "hourly_height",
        "high_low",
        "predictions",
    ] = "water_level",
    datum: str = "MLLW",
    units: Literal["metric", "english"] = "metric",
    time_zone: Literal["gmt", "lst", "lst_ldt"] = "gmt",
    interval: str | int | None = None,
) -> xr.Dataset:
    """Fetch water level data from NOAA CO-OPS API for multiple stations.

    Parameters
    ----------
    station_ids : list[str]
        List of station IDs to retrieve data for.
    begin_date : str
        Start date in format: yyyyMMdd, yyyyMMdd HH:mm, MM/dd/yyyy, or MM/dd/yyyy HH:mm
    end_date : str
        End date in same format as begin_date.
    product : {"water_level", "hourly_height", "high_low", "predictions"}, optional
        Data product to retrieve, by default ``water_level``.
    datum : str, optional
        Vertical datum for water levels, by default "MLLW".
    units : {"metric", "english"}, optional
        Units for data, by default "metric".
    time_zone : {"gmt", "lst", "lst_ldt"}, optional
        Time zone for returned data, by default "gmt".
    interval : str | int | None, optional
        Time interval for predictions product only, by default None.

    Returns
    -------
    xr.Dataset
        Dataset containing water level data with dimensions (time, station).

    Raises
    ------
    ValueError
        If invalid parameters are provided or if API returns errors.
    """
    client = COOPSAPIClient()
    client.validate_parameters(product, datum, units, time_zone, interval)
    begin_dt = client.parse_date(begin_date)
    end_dt = client.parse_date(end_date)

    if end_dt <= begin_dt:
        raise ValueError("end_date must be after begin_date")

    begin_str = begin_dt.strftime("%Y%m%d %H:%M")
    end_str = end_dt.strftime("%Y%m%d %H:%M")

    logger.info(
        "  Requesting %s data for %d station(s) from %s to %s",
        product,
        len(station_ids),
        begin_str,
        end_str,
    )

    urls = [
        client.build_url(
            station_id=station_id,
            begin_date=begin_str,
            end_date=end_str,
            product=product,
            datum=datum,
            units=units,
            time_zone=time_zone,
            interval=interval,
        )
        for station_id in station_ids
    ]

    return _process_responses(
        responses=client.fetch_data(urls),
        station_ids=station_ids,
        product=product,
        datum=datum,
        units=units,
        time_zone=time_zone,
    )

query_coops_bygeometry#

query_coops_bygeometry #

query_coops_bygeometry(
    geometry,
    begin_date,
    end_date,
    *,
    product="water_level",
    datum="MLLW",
    units="metric",
    time_zone="gmt",
    interval=None,
)

Fetch water level data from NOAA CO-OPS API for stations within a geometry.

PARAMETER DESCRIPTION
geometry

Geometry to select stations within (Point, Polygon, etc.)

TYPE: BaseGeometry

begin_date

Start date in format: yyyyMMdd, yyyyMMdd HH:mm, MM/dd/yyyy, or MM/dd/yyyy HH:mm

TYPE: str

end_date

End date in same format as begin_date.

TYPE: str

product

Data product to retrieve, by default water_level.

TYPE: ('water_level', 'hourly_height', 'high_low', 'predictions') DEFAULT: "water_level"

datum

Vertical datum for water levels, by default "MLLW".

TYPE: str DEFAULT: 'MLLW'

units

Units for data, by default "metric".

TYPE: ('metric', 'english') DEFAULT: "metric"

time_zone

Time zone for returned data, by default "gmt".

TYPE: ('gmt', 'lst', 'lst_ldt') DEFAULT: "gmt"

interval

Time interval for predictions product only, by default None.

TYPE: str | int | None DEFAULT: None

RETURNS DESCRIPTION
Dataset

Dataset containing water level data for stations within the geometry.

Source code in src/coastal_calibration/data/coops_api.py
def query_coops_bygeometry(
    geometry: BaseGeometry,
    begin_date: str,
    end_date: str,
    *,
    product: Literal[
        "water_level",
        "hourly_height",
        "high_low",
        "predictions",
    ] = "water_level",
    datum: str = "MLLW",
    units: Literal["metric", "english"] = "metric",
    time_zone: Literal["gmt", "lst", "lst_ldt"] = "gmt",
    interval: str | int | None = None,
) -> xr.Dataset:
    """Fetch water level data from NOAA CO-OPS API for stations within a geometry.

    Parameters
    ----------
    geometry : shapely.geometry.base.BaseGeometry
        Geometry to select stations within (Point, Polygon, etc.)
    begin_date : str
        Start date in format: yyyyMMdd, yyyyMMdd HH:mm, MM/dd/yyyy, or MM/dd/yyyy HH:mm
    end_date : str
        End date in same format as begin_date.
    product : {"water_level", "hourly_height", "high_low", "predictions"}, optional
        Data product to retrieve, by default ``water_level``.
    datum : str, optional
        Vertical datum for water levels, by default "MLLW".
    units : {"metric", "english"}, optional
        Units for data, by default "metric".
    time_zone : {"gmt", "lst", "lst_ldt"}, optional
        Time zone for returned data, by default "gmt".
    interval : str | int | None, optional
        Time interval for predictions product only, by default None.

    Returns
    -------
    xr.Dataset
        Dataset containing water level data for stations within the geometry.
    """
    import numpy as np
    import shapely

    client = COOPSAPIClient()
    if not all(shapely.is_valid(np.atleast_1d(geometry))):  # pyright: ignore[reportCallIssue, reportArgumentType]
        raise ValueError("Invalid geometry provided.")

    stations_gdf = client.stations_metadata
    selected_stations = stations_gdf[stations_gdf.intersects(geometry)]

    if selected_stations.empty:
        raise ValueError("No stations found within the specified geometry and buffer.")

    station_ids = selected_stations["station_id"].tolist()
    return query_coops_byids(
        station_ids,
        begin_date,
        end_date,
        product=product,
        datum=datum,
        units=units,
        time_zone=time_zone,
        interval=interval,
    )

Type Aliases#

# Model type
ModelType = Literal["schism", "sfincs"]

# Meteorological data source
MeteoSource = Literal["nwm_retro", "nwm_ana"]

# Coastal domain identifier
CoastalDomain = Literal["prvi", "hawaii", "atlgulf", "pacific"]

# Boundary condition source
BoundarySource = Literal["tpxo", "stofs"]

# Logging level
LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]

Constants#

Default Paths#

DEFAULT_NFS_MOUNT = Path("/ngen-test")

Default Path Templates#

DEFAULT_WORK_DIR_TEMPLATE = (
    "/ngen-test/coastal/${user}/"
    "${model}_${simulation.coastal_domain}_${boundary.source}_${simulation.meteo_source}/"
    "${model}_${simulation.start_date}"
)

DEFAULT_RAW_DOWNLOAD_DIR_TEMPLATE = (
    "/ngen-test/coastal/${user}/"
    "${model}_${simulation.coastal_domain}_${boundary.source}_${simulation.meteo_source}/"
    "raw_data"
)

Model Registry#

MODEL_REGISTRY: dict[str, type[ModelConfig]] = {
    "schism": SchismModelConfig,
    "sfincs": SfincsModelConfig,
}