Skip to content

API Reference#

This page provides detailed documentation for the NWM Coastal Python API.

Configuration Classes#

CoastalCalibConfig#

CoastalCalibConfig dataclass #

CoastalCalibConfig(
    slurm,
    simulation,
    boundary,
    paths,
    model_config,
    monitoring=MonitoringConfig(),
    download=DownloadConfig(),
    _base_config=None,
)

Complete coastal calibration workflow configuration.

Supports both SCHISM and SFINCS models via the polymorphic :attr:model_config field. The concrete type is selected by the model key in the YAML file and resolved through :data:MODEL_REGISTRY.

model property #

model

Model identifier string (convenience accessor).

from_yaml classmethod #

from_yaml(config_path)

Load configuration from YAML file with optional inheritance.

Supports variable interpolation using ${section.key} syntax. Variables are resolved from other config values, e.g.:

  • ${slurm.user} -> value of slurm.user
  • ${simulation.coastal_domain} -> value of simulation.coastal_domain
  • ${model} -> the model type string ("schism" or "sfincs")
PARAMETER DESCRIPTION
config_path

Path to YAML configuration file.

TYPE: Path or str

RETURNS DESCRIPTION
CoastalCalibConfig

Loaded configuration.

RAISES DESCRIPTION
FileNotFoundError

If the configuration file does not exist.

YAMLError

If the YAML file is malformed.

Source code in src/coastal_calibration/config/schema.py
@classmethod
def from_yaml(cls, config_path: Path | str) -> CoastalCalibConfig:
    """Load configuration from YAML file with optional inheritance.

    Supports variable interpolation using ${section.key} syntax.
    Variables are resolved from other config values, e.g.:

    - ``${slurm.user}`` -> value of ``slurm.user``
    - ``${simulation.coastal_domain}`` -> value of ``simulation.coastal_domain``
    - ``${model}`` -> the model type string (``"schism"`` or ``"sfincs"``)

    Parameters
    ----------
    config_path : Path or str
        Path to YAML configuration file.

    Returns
    -------
    CoastalCalibConfig
        Loaded configuration.

    Raises
    ------
    FileNotFoundError
        If the configuration file does not exist.
    yaml.YAMLError
        If the YAML file is malformed.
    """
    config_path = Path(config_path)
    if not config_path.exists():
        raise FileNotFoundError(f"Configuration file not found: {config_path}")

    try:
        data = yaml.safe_load(config_path.read_text())
    except yaml.YAMLError as e:
        raise yaml.YAMLError(f"Invalid YAML in {config_path}: {e}") from e

    if data is None:
        raise ValueError(f"Configuration file is empty: {config_path}")

    base_config = None
    if "_base" in data:
        base_path = Path(data.pop("_base"))
        if not base_path.is_absolute():
            base_path = config_path.parent / base_path
        base_config = cls.from_yaml(base_path)
        data = _deep_merge(base_config.to_dict(), data)

    # Ensure model key has a default before interpolation
    data.setdefault("model", "schism")

    # Inject default path templates if not provided (before interpolation)
    if "paths" not in data:
        data["paths"] = {}
    if "work_dir" not in data["paths"]:
        data["paths"]["work_dir"] = DEFAULT_WORK_DIR_TEMPLATE
    if "raw_download_dir" not in data["paths"]:
        data["paths"]["raw_download_dir"] = DEFAULT_RAW_DOWNLOAD_DIR_TEMPLATE

    # Interpolate variables after merging
    data = _interpolate_config(data)

    return cls._from_dict(data, base_config_path=config_path if base_config else None)

to_yaml #

to_yaml(path)

Write configuration to YAML file.

PARAMETER DESCRIPTION
path

Path to YAML output file. Parent directories will be created if they don't exist.

TYPE: Path or str

Source code in src/coastal_calibration/config/schema.py
def to_yaml(self, path: Path | str) -> None:
    """Write configuration to YAML file.

    Parameters
    ----------
    path : Path or str
        Path to YAML output file. Parent directories will be created
        if they don't exist.
    """
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(yaml.dump(self.to_dict(), default_flow_style=False, sort_keys=False))

to_dict #

to_dict()

Convert config to dictionary.

Source code in src/coastal_calibration/config/schema.py
def to_dict(self) -> dict[str, Any]:
    """Convert config to dictionary."""
    return {
        "model": self.model,
        "slurm": {
            "job_name": self.slurm.job_name,
            "partition": self.slurm.partition,
            "time_limit": self.slurm.time_limit,
            "account": self.slurm.account,
            "qos": self.slurm.qos,
            "user": self.slurm.user,
        },
        "simulation": {
            "start_date": self.simulation.start_date.isoformat(),
            "duration_hours": self.simulation.duration_hours,
            "coastal_domain": self.simulation.coastal_domain,
            "meteo_source": self.simulation.meteo_source,
            "timestep_seconds": self.simulation.timestep_seconds,
        },
        "boundary": {
            "source": self.boundary.source,
            "stofs_file": (str(self.boundary.stofs_file) if self.boundary.stofs_file else None),
        },
        "paths": {
            "work_dir": str(self.paths.work_dir),
            "raw_download_dir": (
                str(self.paths.raw_download_dir) if self.paths.raw_download_dir else None
            ),
            "nfs_mount": str(self.paths.nfs_mount),
            "nwm_dir": str(self.paths.nwm_dir),
            "hot_start_file": (
                str(self.paths.hot_start_file) if self.paths.hot_start_file else None
            ),
            "conda_env_name": self.paths.conda_env_name,
            "parm_dir": str(self.paths.parm_dir),
        },
        "model_config": self.model_config.to_dict(),
        "monitoring": {
            "log_level": self.monitoring.log_level,
            "log_file": (str(self.monitoring.log_file) if self.monitoring.log_file else None),
            "enable_progress_tracking": self.monitoring.enable_progress_tracking,
            "enable_timing": self.monitoring.enable_timing,
        },
        "download": {
            "enabled": self.download.enabled,
            "timeout": self.download.timeout,
            "raise_on_error": self.download.raise_on_error,
            "limit_per_host": self.download.limit_per_host,
        },
    }

validate #

validate()

Validate configuration and return list of errors.

Source code in src/coastal_calibration/config/schema.py
def validate(self) -> list[str]:
    """Validate configuration and return list of errors."""
    from coastal_calibration.downloader import validate_date_ranges

    errors: list[str] = []

    if self.simulation.duration_hours <= 0:
        errors.append("simulation.duration_hours must be positive")

    # Model-specific validation
    errors.extend(self.model_config.validate(self))

    # Shared boundary validation
    errors.extend(self._validate_boundary_source())

    # Date range validation
    if self.download.enabled:
        sim = self.simulation
        start_time = sim.start_date
        end_time = start_time + timedelta(hours=sim.duration_hours)
        date_errors = validate_date_ranges(
            start_time,
            end_time,
            sim.meteo_source,
            self.boundary.source,
            sim.coastal_domain,
        )
        errors.extend(date_errors)

    return errors

SlurmConfig#

SlurmConfig dataclass #

SlurmConfig(
    job_name="coastal_calibration",
    partition=DEFAULT_SLURM_PARTITION,
    time_limit=None,
    account=None,
    qos=None,
    user=(lambda: get("USER"))(),
)

SLURM job scheduling configuration.

Contains only parameters related to job scheduling (partition, account, time limits). Compute resources (nodes, tasks) are model-specific and live on the concrete :class:ModelConfig subclasses.

SimulationConfig#

SimulationConfig dataclass #

SimulationConfig(
    start_date,
    duration_hours,
    coastal_domain,
    meteo_source,
    timestep_seconds=3600,
)

Simulation time and domain configuration.

start_pdy property #

start_pdy

Return start date as YYYYMMDD string.

start_cyc property #

start_cyc

Return start cycle (hour) as HH string.

inland_domain property #

inland_domain

Inland domain directory name for this coastal domain.

nwm_domain property #

nwm_domain

NWM domain identifier for this coastal domain.

geo_grid property #

geo_grid

Geogrid filename for this coastal domain.

BoundaryConfig#

BoundaryConfig dataclass #

BoundaryConfig(source='tpxo', stofs_file=None)

Boundary condition configuration.

PathConfig#

PathConfig dataclass #

PathConfig(
    work_dir,
    raw_download_dir=None,
    nfs_mount=(lambda: DEFAULT_NFS_MOUNT)(),
    nwm_dir=(lambda: DEFAULT_NWM_DIR)(),
    hot_start_file=None,
    conda_env_name=DEFAULT_CONDA_ENV_NAME,
    parm_dir=(lambda: DEFAULT_PARM_DIR)(),
)

Path configuration for data and executables.

otps_dir property #

otps_dir

TPXO binary directory (inside Singularity container, not configurable).

tpxo_data_dir property #

tpxo_data_dir

TPXO tidal atlas data directory.

ush_nwm property #

ush_nwm

USH scripts directory.

exec_nwm property #

exec_nwm

Executables directory.

parm_nwm property #

parm_nwm

Parameter files directory.

conda_envs_path property #

conda_envs_path

Conda environments directory.

download_dir property #

download_dir

Effective download directory (fallback to work_dir/downloads).

meteo_dir #

meteo_dir(meteo_source)

Directory for meteorological data.

Source code in src/coastal_calibration/config/schema.py
def meteo_dir(self, meteo_source: str) -> Path:
    """Directory for meteorological data."""
    return self.download_dir / self.METEO_SUBDIR / meteo_source

streamflow_dir #

streamflow_dir(meteo_source)

Directory for streamflow/hydro data.

Source code in src/coastal_calibration/config/schema.py
def streamflow_dir(self, meteo_source: str) -> Path:
    """Directory for streamflow/hydro data."""
    if meteo_source == "nwm_retro":
        return self.download_dir / self.STREAMFLOW_SUBDIR / "nwm_retro"
    return self.download_dir / self.HYDRO_SUBDIR / "nwm"

coastal_dir #

coastal_dir(coastal_source)

Directory for coastal boundary data.

Source code in src/coastal_calibration/config/schema.py
def coastal_dir(self, coastal_source: str) -> Path:
    """Directory for coastal boundary data."""
    return self.download_dir / self.COASTAL_SUBDIR / coastal_source

geogrid_file #

geogrid_file(sim)

Geogrid file path for the given domain.

Source code in src/coastal_calibration/config/schema.py
def geogrid_file(self, sim: SimulationConfig) -> Path:
    """Geogrid file path for the given domain."""
    return self.parm_nwm / sim.inland_domain / sim.geo_grid

ModelConfig#

ModelConfig #

Bases: ABC

Abstract base class for model-specific configuration.

Each concrete subclass owns its compute parameters, environment variable construction, stage ordering, validation, and SLURM script generation. This keeps model-specific concerns out of the shared configuration and makes adding new models straightforward: create a new subclass, implement the abstract methods, and register it in :data:MODEL_REGISTRY.

model_name abstractmethod property #

model_name

Return the model identifier string (e.g. 'schism', 'sfincs').

stage_order abstractmethod property #

stage_order

Ordered list of stage names for this model's pipeline.

build_environment abstractmethod #

build_environment(env, config)

Add model-specific environment variables to env (mutating).

Called by :meth:WorkflowStage.build_environment after shared variables have been populated.

Source code in src/coastal_calibration/config/schema.py
@abstractmethod
def build_environment(self, env: dict[str, str], config: CoastalCalibConfig) -> dict[str, str]:
    """Add model-specific environment variables to *env* (mutating).

    Called by :meth:`WorkflowStage.build_environment` after shared
    variables have been populated.
    """

validate abstractmethod #

validate(config)

Return model-specific validation errors.

Source code in src/coastal_calibration/config/schema.py
@abstractmethod
def validate(self, config: CoastalCalibConfig) -> list[str]:
    """Return model-specific validation errors."""

create_stages abstractmethod #

create_stages(config, monitor)

Construct and return the {name: stage} dictionary.

Source code in src/coastal_calibration/config/schema.py
@abstractmethod
def create_stages(self, config: CoastalCalibConfig, monitor: Any) -> dict[str, Any]:
    """Construct and return the ``{name: stage}`` dictionary."""

generate_job_script_lines abstractmethod #

generate_job_script_lines(config)

Return #SBATCH directives specific to this model's compute needs.

Source code in src/coastal_calibration/config/schema.py
@abstractmethod
def generate_job_script_lines(self, config: CoastalCalibConfig) -> list[str]:
    """Return ``#SBATCH`` directives specific to this model's compute needs."""

to_dict abstractmethod #

to_dict()

Serialize model-specific fields to a dictionary.

Source code in src/coastal_calibration/config/schema.py
@abstractmethod
def to_dict(self) -> dict[str, Any]:
    """Serialize model-specific fields to a dictionary."""

SchismModelConfig#

SchismModelConfig dataclass #

SchismModelConfig(
    singularity_image=(lambda: DEFAULT_SING_IMAGE_PATH)(),
    nodes=2,
    ntasks_per_node=18,
    exclusive=True,
    nscribes=2,
    omp_num_threads=2,
    oversubscribe=False,
    binary=_DEFAULT_SCHISM_BINARY,
    include_noaa_gages=False,
)

Bases: ModelConfig

SCHISM model configuration.

Contains compute parameters (MPI layout, SCHISM binary) that were previously split across MPIConfig and SlurmConfig.

PARAMETER DESCRIPTION
singularity_image

Path to the Singularity/Apptainer SIF image used to run SCHISM and its pre-/post-processing scripts inside a container. SFINCS manages its own container independently (see :attr:SfincsModelConfig.container_tag).

TYPE: Path DEFAULT: (lambda: DEFAULT_SING_IMAGE_PATH)()

nodes

Number of SLURM nodes.

TYPE: int DEFAULT: 2

ntasks_per_node

MPI tasks per node.

TYPE: int DEFAULT: 18

exclusive

Request exclusive node access.

TYPE: bool DEFAULT: True

nscribes

Number of SCHISM scribe processes.

TYPE: int DEFAULT: 2

omp_num_threads

OpenMP threads per MPI rank.

TYPE: int DEFAULT: 2

oversubscribe

Allow MPI oversubscription.

TYPE: bool DEFAULT: False

binary

SCHISM executable name.

TYPE: str DEFAULT: _DEFAULT_SCHISM_BINARY

include_noaa_gages

When True, automatically query NOAA CO-OPS for water level stations within the model domain (computed from the concave hull of open boundary nodes in hgrid.gr3), write a station.in file, set iout_sta = 1 in param.nml, and generate sim-vs-obs comparison plots after the run. Requires the plot optional dependencies.

TYPE: bool DEFAULT: False

total_tasks property #

total_tasks

Total number of MPI tasks (nodes * ntasks_per_node).

schism_mesh #

schism_mesh(sim, paths)

SCHISM ESMF mesh file path for the given domain.

Source code in src/coastal_calibration/config/schema.py
def schism_mesh(self, sim: SimulationConfig, paths: PathConfig) -> Path:
    """SCHISM ESMF mesh file path for the given domain."""
    return paths.parm_nwm / "coastal" / sim.coastal_domain / "hgrid.nc"

SfincsModelConfig#

SfincsModelConfig dataclass #

SfincsModelConfig(
    prebuilt_dir,
    model_root=None,
    include_noaa_gages=False,
    observation_points=list(),
    observation_locations_file=None,
    merge_observations=False,
    discharge_locations_file=None,
    merge_discharge=False,
    include_precip=False,
    include_wind=False,
    include_pressure=False,
    meteo_res=None,
    forcing_to_mesh_offset_m=0.0,
    vdatum_mesh_to_msl_m=0.0,
    sfincs_exe=None,
    container_tag="latest",
    container_image=None,
    omp_num_threads=0,
)

Bases: ModelConfig

SFINCS model configuration.

SFINCS runs on a single node using OpenMP (all available cores). There is no MPI or multi-node support.

PARAMETER DESCRIPTION
prebuilt_dir

Path to the directory containing the pre-built model files (sfincs.inp, sfincs.nc, region.geojson, etc.).

TYPE: Path

model_root

Output directory for the built model. Defaults to {work_dir}/sfincs_model.

TYPE: Path DEFAULT: None

include_noaa_gages

When True, automatically query NOAA CO-OPS for water level stations within the model domain and add them as observation points. Requires the plot optional dependencies.

TYPE: bool DEFAULT: False

observation_points

Observation point specifications as list of dicts with x, y, name keys (coordinates in model CRS).

TYPE: list DEFAULT: list()

observation_locations_file

Path to a GeoJSON file with observation point locations.

TYPE: Path DEFAULT: None

merge_observations

Whether to merge with pre-existing observation points.

TYPE: bool DEFAULT: False

discharge_locations_file

Path to a SFINCS .src or GeoJSON with discharge source point locations.

TYPE: Path DEFAULT: None

merge_discharge

Whether to merge with pre-existing discharge source points.

TYPE: bool DEFAULT: False

include_precip

When True, add precipitation forcing from the meteorological data catalog entry (derived from simulation.meteo_source).

TYPE: bool DEFAULT: False

include_wind

When True, add spatially-varying wind forcing (wind10_u, wind10_v) from the meteorological data catalog entry.

TYPE: bool DEFAULT: False

include_pressure

When True, add spatially-varying atmospheric pressure forcing (press_msl) and enable barometric correction (baro=1).

TYPE: bool DEFAULT: False

meteo_res

Output resolution (m) for gridded meteorological forcing (precipitation, wind, pressure). When None (default) the resolution is determined from the SFINCS quadtree grid — it equals the base cell size (coarsest level) so that the meteo grid is never finer than needed. Setting an explicit value (e.g. 2000) overrides the automatic calculation.

.. note::

Without this parameter the HydroMT reproject call retains the source-data resolution (≈ 1 km for NWM), and the LCC → UTM reprojection can inflate the output to the full CONUS extent, producing multi-GB files and very slow simulations.

TYPE: float DEFAULT: None

forcing_to_mesh_offset_m

Vertical offset in metres added to the boundary-condition water levels before they enter SFINCS.

Tidal-only sources such as TPXO provide oscillations centred on zero (MSL) but carry no information about where MSL sits on the mesh's vertical datum. This parameter anchors the forcing signal to the correct geodetic height on the mesh. Set it to the elevation of MSL in the mesh datum obtained from VDatum (e.g. 0.171 for a NAVD88 mesh on the Texas Gulf coast, where MSL is 0.171 m above NAVD88).

For sources that already report water levels in the mesh datum (e.g. STOFS on a NAVD88 mesh) set this to 0.0.

Defaults to 0.0.

TYPE: float DEFAULT: 0.0

vdatum_mesh_to_msl_m

Vertical offset in metres added to the simulated water level before comparison with NOAA CO-OPS observations (which are in MSL). The model output inherits the mesh vertical datum, so this converts it to MSL (e.g. 0.171 for a NAVD88 mesh on the Texas Gulf coast).

Defaults to 0.0.

TYPE: float DEFAULT: 0.0

sfincs_exe

Path to a locally compiled SFINCS executable. When set, the sfincs_run stage invokes this binary directly instead of using a Singularity container, making it possible to run on systems where Singularity is unavailable (e.g. macOS laptops). The container-related options (container_tag, container_image) are ignored when sfincs_exe is set.

TYPE: Path DEFAULT: None

container_tag

Tag for the deltares/sfincs-cpu Docker/Singularity image.

TYPE: str DEFAULT: 'latest'

container_image

Path to a pre-pulled Singularity SIF file.

TYPE: Path DEFAULT: None

omp_num_threads

Number of OpenMP threads. Defaults to the number of physical CPU cores on the current machine (see :func:~coastal_calibration.utils.system.get_cpu_count). On HPC nodes this auto-detects correctly; on a local laptop it avoids over-subscribing the system.

TYPE: int DEFAULT: 0

MonitoringConfig#

MonitoringConfig dataclass #

MonitoringConfig(
    log_level="INFO",
    log_file=None,
    enable_progress_tracking=True,
    enable_timing=True,
)

Workflow monitoring configuration.

DownloadConfig#

DownloadConfig dataclass #

DownloadConfig(
    enabled=True,
    timeout=600,
    raise_on_error=True,
    limit_per_host=4,
)

Data download configuration.

Workflow Runner#

CoastalCalibRunner#

CoastalCalibRunner #

CoastalCalibRunner(config)

Main workflow runner for coastal model calibration.

This class orchestrates the entire calibration workflow, managing stage execution, SLURM job submission, and progress monitoring.

Supports both SCHISM (model="schism", default) and SFINCS (model="sfincs") pipelines. The model type is selected via config.model.

Initialize the workflow runner.

PARAMETER DESCRIPTION
config

Coastal calibration configuration.

TYPE: CoastalCalibConfig

Source code in src/coastal_calibration/runner.py
def __init__(self, config: CoastalCalibConfig) -> None:
    """Initialize the workflow runner.

    Parameters
    ----------
    config : CoastalCalibConfig
        Coastal calibration configuration.
    """
    self.config = config

    # Ensure log directory exists early so file logging can start.
    config.paths.work_dir.mkdir(parents=True, exist_ok=True)

    # Set up file logging *before* creating the monitor so that
    # every message (including third-party) is captured on disk.
    if not config.monitoring.log_file:
        log_path = generate_log_path(config.paths.work_dir)
        configure_logger(file=str(log_path), file_level="DEBUG")

    # Silence noisy third-party loggers (HydroMT, xarray, ...)
    silence_third_party_loggers()

    self.monitor = WorkflowMonitor(config.monitoring)
    self._slurm: SlurmManager | None = None
    self._stages: dict[str, WorkflowStage] = {}
    self._results: dict[str, Any] = {}

validate #

validate()

Validate configuration and prerequisites.

RETURNS DESCRIPTION
list of str

List of validation error messages (empty if valid).

Source code in src/coastal_calibration/runner.py
def validate(self) -> list[str]:
    """Validate configuration and prerequisites.

    Returns
    -------
    list of str
        List of validation error messages (empty if valid).
    """
    errors = []

    config_errors = self.config.validate()
    errors.extend(config_errors)

    self._init_stages()
    for name, stage in self._stages.items():
        stage_errors = stage.validate()
        errors.extend(f"[{name}] {error}" for error in stage_errors)

    return errors

submit #

submit(
    wait=False,
    log_file=None,
    start_from=None,
    stop_after=None,
)

Submit workflow as a SLURM job.

Executes the same stage pipeline as :meth:run, but Python-only stages run on the login node while container stages are bundled into a SLURM bash script and submitted as a job.

PARAMETER DESCRIPTION
wait

If True, wait for job completion (interactive mode). If False, return immediately after job submission.

TYPE: bool DEFAULT: False

log_file

Custom path for SLURM output log. If not provided, logs are written to /slurm-.out.

TYPE: Path DEFAULT: None

start_from

Stage name to start from (skip earlier stages).

TYPE: str DEFAULT: None

stop_after

Stage name to stop after (skip later stages).

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
WorkflowResult

Result with job submission details.

Source code in src/coastal_calibration/runner.py
def submit(
    self,
    wait: bool = False,
    log_file: Path | None = None,
    start_from: str | None = None,
    stop_after: str | None = None,
) -> WorkflowResult:
    """Submit workflow as a SLURM job.

    Executes the same stage pipeline as :meth:`run`, but Python-only
    stages run on the login node while container stages are bundled
    into a SLURM bash script and submitted as a job.

    Parameters
    ----------
    wait : bool, default False
        If True, wait for job completion (interactive mode).
        If False, return immediately after job submission.
    log_file : Path, optional
        Custom path for SLURM output log. If not provided, logs are
        written to <work_dir>/slurm-<job_id>.out.
    start_from : str, optional
        Stage name to start from (skip earlier stages).
    stop_after : str, optional
        Stage name to stop after (skip later stages).

    Returns
    -------
    WorkflowResult
        Result with job submission details.
    """
    start_time = datetime.now()
    stages_completed: list[str] = []
    errors: list[str] = []

    validation_errors = self.validate()
    # When resuming mid-pipeline, verify that earlier stages completed.
    if not validation_errors and start_from:
        validation_errors = self._check_prerequisites(start_from)
    if validation_errors:
        return WorkflowResult(
            success=False,
            job_id=None,
            start_time=start_time,
            end_time=datetime.now(),
            stages_completed=[],
            stages_failed=[],
            outputs={},
            errors=validation_errors,
        )

    stages_to_run = self._get_stages_to_run(start_from, stop_after)
    pre_job, job, post_job = self._split_stages_for_submit(stages_to_run)

    # Register all stages for monitoring
    self.monitor.register_stages(stages_to_run)
    self.monitor.start_workflow()

    self._prepare_work_directory()
    self.monitor.info("-" * 40)

    # --- Run pre-job stages on login node ---
    try:
        pre_completed = self._run_stages_on_login_node(pre_job)
        stages_completed.extend(pre_completed)
    except Exception as e:
        self.monitor.error(f"Pre-job stage failed: {e}")
        self.monitor.end_workflow(success=False)
        errors.append(str(e))
        return WorkflowResult(
            success=False,
            job_id=None,
            start_time=start_time,
            end_time=datetime.now(),
            stages_completed=stages_completed,
            stages_failed=[
                pre_job[len(stages_completed)]
                if len(stages_completed) < len(pre_job)
                else "unknown"
            ],
            outputs={},
            errors=errors,
        )

    # --- Submit container stages as SLURM job ---
    if not job:
        # No container stages — everything ran on login node
        self.monitor.end_workflow(success=True)
        return WorkflowResult(
            success=True,
            job_id=None,
            start_time=start_time,
            end_time=datetime.now(),
            stages_completed=stages_completed,
            stages_failed=[],
            outputs={"slurm_status": "NO_JOB_NEEDED"},
            errors=[],
        )

    self.monitor.info(f"Submitting SLURM job with {len(job)} stage(s):")
    for stage_name in job:
        self.monitor.info(f"  \u2022 {stage_name}")

    self._generate_runner_script(job)

    job_script = self.config.paths.work_dir / "submit_job.sh"
    self.slurm.generate_job_script(job_script, log_file=log_file)

    job_id = self.slurm.submit_job(job_script)

    if not wait:
        # Return immediately without waiting for job completion
        self.monitor.end_workflow(success=True)
        return WorkflowResult(
            success=True,
            job_id=job_id,
            start_time=start_time,
            end_time=datetime.now(),
            stages_completed=stages_completed,
            stages_failed=[],
            outputs={"slurm_status": "SUBMITTED"},
            errors=[],
        )

    return self._wait_for_slurm_job(
        job_id=job_id,
        job=job,
        post_job=post_job,
        start_time=start_time,
        stages_completed=stages_completed,
        errors=errors,
    )

run #

run(start_from=None, stop_after=None, dry_run=False)

Execute the calibration workflow.

PARAMETER DESCRIPTION
start_from

Stage name to start from (skip earlier stages).

TYPE: str DEFAULT: None

stop_after

Stage name to stop after (skip later stages).

TYPE: str DEFAULT: None

dry_run

If True, validate but don't execute.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
WorkflowResult

Result with execution details.

Source code in src/coastal_calibration/runner.py
def run(
    self,
    start_from: str | None = None,
    stop_after: str | None = None,
    dry_run: bool = False,
) -> WorkflowResult:
    """Execute the calibration workflow.

    Parameters
    ----------
    start_from : str, optional
        Stage name to start from (skip earlier stages).
    stop_after : str, optional
        Stage name to stop after (skip later stages).
    dry_run : bool, default False
        If True, validate but don't execute.

    Returns
    -------
    WorkflowResult
        Result with execution details.
    """
    start_time = datetime.now()
    stages_completed: list[str] = []
    stages_failed: list[str] = []
    outputs: dict[str, Any] = {}
    errors: list[str] = []

    validation_errors = self.validate()
    # When resuming mid-pipeline, verify that earlier stages completed.
    if not validation_errors and start_from:
        validation_errors = self._check_prerequisites(start_from)
    if validation_errors:
        return WorkflowResult(
            success=False,
            job_id=None,
            start_time=start_time,
            end_time=datetime.now(),
            stages_completed=[],
            stages_failed=[],
            outputs={},
            errors=validation_errors,
        )

    if dry_run:
        self.monitor.info("Dry run mode - validation passed, no execution")
        return WorkflowResult(
            success=True,
            job_id=None,
            start_time=start_time,
            end_time=datetime.now(),
            stages_completed=[],
            stages_failed=[],
            outputs={"dry_run": True},
            errors=[],
        )

    self.monitor.register_stages(self.STAGE_ORDER)
    self.monitor.start_workflow()
    self.monitor.info("-" * 40)

    stages_to_run = self._get_stages_to_run(start_from, stop_after)

    current_stage = ""
    try:
        for current_stage in stages_to_run:
            stage = self._stages[current_stage]

            with self.monitor.stage_context(current_stage, stage.description):
                result = stage.run()
                self._results[current_stage] = result
                outputs[current_stage] = result
                stages_completed.append(current_stage)
                self._save_stage_status(current_stage)

        self.monitor.end_workflow(success=True)
        success = True

    except Exception as e:
        self.monitor.error(f"Workflow failed: {e}")
        self.monitor.end_workflow(success=False)
        errors.append(str(e))
        stages_failed.append(current_stage)
        success = False

    result = WorkflowResult(
        success=success,
        job_id=None,
        start_time=start_time,
        end_time=datetime.now(),
        stages_completed=stages_completed,
        stages_failed=stages_failed,
        outputs=outputs,
        errors=errors,
    )

    result_file = self.config.paths.work_dir / "workflow_result.json"
    result.save(result_file)
    self.monitor.save_progress(self.config.paths.work_dir / "workflow_progress.json")

    return result

WorkflowResult#

WorkflowResult dataclass #

WorkflowResult(
    success,
    job_id,
    start_time,
    end_time,
    stages_completed,
    stages_failed,
    outputs,
    errors,
)

Result of a workflow execution.

duration_seconds property #

duration_seconds

Get workflow duration in seconds.

to_dict #

to_dict()

Convert to dictionary.

Source code in src/coastal_calibration/runner.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary."""
    return {
        "success": self.success,
        "job_id": self.job_id,
        "start_time": self.start_time.isoformat(),
        "end_time": self.end_time.isoformat() if self.end_time else None,
        "duration_seconds": self.duration_seconds,
        "stages_completed": self.stages_completed,
        "stages_failed": self.stages_failed,
        "outputs": self.outputs,
        "errors": self.errors,
    }

save #

save(path)

Save result to JSON file.

PARAMETER DESCRIPTION
path

Path to output JSON file. Parent directories will be created if they don't exist.

TYPE: Path or str

Source code in src/coastal_calibration/runner.py
def save(self, path: Path | str) -> None:
    """Save result to JSON file.

    Parameters
    ----------
    path : Path or str
        Path to output JSON file. Parent directories will be created
        if they don't exist.
    """
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(self.to_dict(), indent=2))

Downloader#

validate_date_ranges#

validate_date_ranges #

validate_date_ranges(
    start_time,
    end_time,
    meteo_source,
    coastal_source,
    domain,
)

Validate that requested dates are within available ranges.

Source code in src/coastal_calibration/downloader.py
def validate_date_ranges(
    start_time: datetime,
    end_time: datetime,
    meteo_source: str,
    coastal_source: str,
    domain: str,
) -> list[str]:
    """Validate that requested dates are within available ranges."""
    errors: list[str] = []

    meteo_range = get_date_range(meteo_source, domain)
    if meteo_range:
        error = meteo_range.validate(start_time, end_time)
        if error:
            errors.append(error)

    if coastal_source != "tpxo":
        coastal_range = get_date_range(coastal_source, domain)
        if coastal_range:
            error = coastal_range.validate(start_time, end_time)
            if error:
                errors.append(error)

    return errors

NOAA CO-OPS API#

COOPSAPIClient#

COOPSAPIClient #

COOPSAPIClient(timeout=120)

Client for interacting with NOAA CO-OPS API.

Initialize COOPS API client.

PARAMETER DESCRIPTION
timeout

Request timeout in seconds, by default 120

TYPE: int DEFAULT: 120

RAISES DESCRIPTION
ImportError

If plot optional dependencies are not installed.

Source code in src/coastal_calibration/coops_api.py
def __init__(self, timeout: int = 120) -> None:
    """Initialize COOPS API client.

    Parameters
    ----------
    timeout : int, optional
        Request timeout in seconds, by default 120

    Raises
    ------
    ImportError
        If plot optional dependencies are not installed.
    """
    _check_plot_deps()
    self.timeout = timeout
    self._stations_metadata = self._get_stations_metadata()

stations_metadata property #

stations_metadata

Get metadata for all water level stations as a GeoDataFrame.

RETURNS DESCRIPTION
GeoDataFrame

GeoDataFrame with station metadata and Point geometries.

validate_parameters #

validate_parameters(
    product, datum, units, time_zone, interval
)

Validate API parameters.

PARAMETER DESCRIPTION
product

Data product type

TYPE: str

datum

Vertical datum

TYPE: str

units

Unit system

TYPE: str

time_zone

Time zone

TYPE: str

interval

Time interval for predictions

TYPE: str | int | None

RAISES DESCRIPTION
ValueError

If any parameter is invalid

Source code in src/coastal_calibration/coops_api.py
def validate_parameters(
    self,
    product: str,
    datum: str,
    units: str,
    time_zone: str,
    interval: str | int | None,
) -> None:
    """Validate API parameters.

    Parameters
    ----------
    product : str
        Data product type
    datum : str
        Vertical datum
    units : str
        Unit system
    time_zone : str
        Time zone
    interval : str | int | None
        Time interval for predictions

    Raises
    ------
    ValueError
        If any parameter is invalid
    """
    if product not in self.valid_products:
        raise ValueError(
            f"Invalid product '{product}'. Must be one of: {', '.join(self.valid_products)}"
        )

    if datum.upper() not in self.valid_datums:
        raise ValueError(
            f"Invalid datum '{datum}'. Must be one of: {', '.join(self.valid_datums)}"
        )

    if units not in self.valid_units:
        raise ValueError(
            f"Invalid units '{units}'. Must be one of: {', '.join(self.valid_units)}"
        )

    if time_zone not in self.valid_timezones:
        raise ValueError(
            f"Invalid time_zone '{time_zone}'. Must be one of: {', '.join(self.valid_timezones)}"
        )

    if (
        product == "predictions"
        and interval is not None
        and str(interval) not in self.valid_intervals
    ):
        raise ValueError(
            f"Invalid interval '{interval}' for predictions. "
            f"Must be one of: {', '.join(self.valid_intervals)}"
        )

build_url #

build_url(
    station_id,
    begin_date,
    end_date,
    product,
    datum,
    units,
    time_zone,
    interval,
)

Build API request URL for a station.

PARAMETER DESCRIPTION
station_id

Station ID

TYPE: str

begin_date

Start date

TYPE: str

end_date

End date

TYPE: str

product

Data product

TYPE: str

datum

Vertical datum

TYPE: str

units

Unit system

TYPE: str

time_zone

Time zone

TYPE: str

interval

Time interval for predictions

TYPE: str | int | None

RETURNS DESCRIPTION
str

Complete API request URL

Source code in src/coastal_calibration/coops_api.py
def build_url(
    self,
    station_id: str,
    begin_date: str,
    end_date: str,
    product: str,
    datum: str,
    units: str,
    time_zone: str,
    interval: str | int | None,
) -> str:
    """Build API request URL for a station.

    Parameters
    ----------
    station_id : str
        Station ID
    begin_date : str
        Start date
    end_date : str
        End date
    product : str
        Data product
    datum : str
        Vertical datum
    units : str
        Unit system
    time_zone : str
        Time zone
    interval : str | int | None, optional
        Time interval for predictions

    Returns
    -------
    str
        Complete API request URL
    """
    params = {
        "begin_date": begin_date,
        "end_date": end_date,
        "station": station_id,
        "product": product,
        "datum": datum,
        "units": units,
        "time_zone": time_zone,
        "format": "json",
        "application": "coastal_calibration_coops",
    }

    if product == "predictions" and interval is not None:
        params["interval"] = str(interval)

    query_parts = [f"{k}={v}" for k, v in params.items()]
    return f"{self.base_url}?{'&'.join(query_parts)}"

fetch_data #

fetch_data(urls)

Fetch data from API for multiple URLs.

PARAMETER DESCRIPTION
urls

List of API request URLs

TYPE: list[str]

RETURNS DESCRIPTION
list[dict | None]

List of JSON responses (None for failed requests)

Source code in src/coastal_calibration/coops_api.py
def fetch_data(self, urls: list[str]) -> list[dict[str, Any] | None]:
    """Fetch data from API for multiple URLs.

    Parameters
    ----------
    urls : list[str]
        List of API request URLs

    Returns
    -------
    list[dict | None]
        List of JSON responses (None for failed requests)
    """
    logger.info("Fetching data from %d station(s)", len(urls))

    return fetch(
        urls,
        "json",
        request_method="get",
        timeout=self.timeout,
        raise_status=False,
    )

get_datums #

get_datums(station_ids: str) -> StationDatum
get_datums(station_ids: list[str]) -> list[StationDatum]
get_datums(station_ids)

Retrieve datum information for one or more stations.

PARAMETER DESCRIPTION
station_ids

Single station ID or list of station IDs

TYPE: str | list[str]

RETURNS DESCRIPTION
StationDatum | list[StationDatum]

Single StationDatum object if input is str, list of StationDatum if input is list

RAISES DESCRIPTION
ValueError

If no valid datum data is returned for any station.

Source code in src/coastal_calibration/coops_api.py
def get_datums(self, station_ids: str | list[str]) -> StationDatum | list[StationDatum]:
    """Retrieve datum information for one or more stations.

    Parameters
    ----------
    station_ids : str | list[str]
        Single station ID or list of station IDs

    Returns
    -------
    StationDatum | list[StationDatum]
        Single StationDatum object if input is str,
        list of StationDatum if input is list

    Raises
    ------
    ValueError
        If no valid datum data is returned for any station.
    """
    import numpy as np

    single_input = isinstance(station_ids, str)
    if single_input:
        station_ids = [station_ids]

    datum_base_url = "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi/stations"
    urls = [f"{datum_base_url}/{sid}/datums.json" for sid in station_ids]
    logger.info("Fetching datum information for %d station(s)", len(station_ids))
    responses = self.fetch_data(urls)
    datum_objects = []
    for station_id, response in zip(station_ids, responses, strict=False):
        if response is None:
            logger.warning("No datum data returned for station %s", station_id)
            continue

        if "error" in response:
            logger.warning(
                "Datum API error for station %s: %s",
                station_id,
                response["error"].get("message", "Unknown error"),
            )
            continue

        raw_datums = response.get("datums") or []
        datum_values = [
            DatumValue(
                name=datum_dict.get("name", ""),
                description=datum_dict.get("description", ""),
                value=np.float64(datum_dict.get("value", np.nan)),
            )
            for datum_dict in raw_datums
        ]

        station_datum = StationDatum(
            station_id=station_id,
            accepted=response.get("accepted", ""),
            superseded=response.get("superseded", ""),
            epoch=response.get("epoch", ""),
            units=response.get("units", ""),
            orthometric_datum=response.get("OrthometricDatum", ""),
            datums=datum_values,
            lat=np.float64(response.get("LAT", np.nan)),
            lat_date=response.get("LATdate", ""),
            lat_time=response.get("LATtime", ""),
            hat=np.float64(response.get("HAT", np.nan)),
            hat_date=response.get("HATdate", ""),
            hat_time=response.get("HATtime", ""),
            min_value=np.float64(response.get("min", np.nan)),
            min_date=response.get("mindate", ""),
            min_time=response.get("mintime", ""),
            max_value=np.float64(response.get("max", np.nan)),
            max_date=response.get("maxdate", ""),
            max_time=response.get("maxtime", ""),
            datum_analysis_period=response.get("DatumAnalysisPeriod") or [],
            ngs_link=response.get("NGSLink", ""),
            ctrl_station=response.get("ctrlStation", ""),
        )

        datum_objects.append(station_datum)

    if not datum_objects:
        raise ValueError("No valid datum data returned for any station")

    if single_input:
        return datum_objects[0]
    return datum_objects

query_coops_byids#

query_coops_byids #

query_coops_byids(
    station_ids,
    begin_date,
    end_date,
    *,
    product="water_level",
    datum="MLLW",
    units="metric",
    time_zone="gmt",
    interval=None,
)

Fetch water level data from NOAA CO-OPS API for multiple stations.

PARAMETER DESCRIPTION
station_ids

List of station IDs to retrieve data for.

TYPE: list[str]

begin_date

Start date in format: yyyyMMdd, yyyyMMdd HH:mm, MM/dd/yyyy, or MM/dd/yyyy HH:mm

TYPE: str

end_date

End date in same format as begin_date.

TYPE: str

product

Data product to retrieve, by default water_level.

TYPE: ('water_level', 'hourly_height', 'high_low', 'predictions') DEFAULT: "water_level"

datum

Vertical datum for water levels, by default "MLLW".

TYPE: str DEFAULT: 'MLLW'

units

Units for data, by default "metric".

TYPE: ('metric', 'english') DEFAULT: "metric"

time_zone

Time zone for returned data, by default "gmt".

TYPE: ('gmt', 'lst', 'lst_ldt') DEFAULT: "gmt"

interval

Time interval for predictions product only, by default None.

TYPE: str | int | None DEFAULT: None

RETURNS DESCRIPTION
Dataset

Dataset containing water level data with dimensions (time, station).

RAISES DESCRIPTION
ValueError

If invalid parameters are provided or if API returns errors.

Source code in src/coastal_calibration/coops_api.py
def query_coops_byids(
    station_ids: list[str],
    begin_date: str,
    end_date: str,
    *,
    product: Literal[
        "water_level",
        "hourly_height",
        "high_low",
        "predictions",
    ] = "water_level",
    datum: str = "MLLW",
    units: Literal["metric", "english"] = "metric",
    time_zone: Literal["gmt", "lst", "lst_ldt"] = "gmt",
    interval: str | int | None = None,
) -> xr.Dataset:
    """Fetch water level data from NOAA CO-OPS API for multiple stations.

    Parameters
    ----------
    station_ids : list[str]
        List of station IDs to retrieve data for.
    begin_date : str
        Start date in format: yyyyMMdd, yyyyMMdd HH:mm, MM/dd/yyyy, or MM/dd/yyyy HH:mm
    end_date : str
        End date in same format as begin_date.
    product : {"water_level", "hourly_height", "high_low", "predictions"}, optional
        Data product to retrieve, by default ``water_level``.
    datum : str, optional
        Vertical datum for water levels, by default "MLLW".
    units : {"metric", "english"}, optional
        Units for data, by default "metric".
    time_zone : {"gmt", "lst", "lst_ldt"}, optional
        Time zone for returned data, by default "gmt".
    interval : str | int | None, optional
        Time interval for predictions product only, by default None.

    Returns
    -------
    xr.Dataset
        Dataset containing water level data with dimensions (time, station).

    Raises
    ------
    ValueError
        If invalid parameters are provided or if API returns errors.
    """
    client = COOPSAPIClient()
    client.validate_parameters(product, datum, units, time_zone, interval)
    begin_dt = client.parse_date(begin_date)
    end_dt = client.parse_date(end_date)

    if end_dt <= begin_dt:
        raise ValueError("end_date must be after begin_date")

    begin_str = begin_dt.strftime("%Y%m%d %H:%M")
    end_str = end_dt.strftime("%Y%m%d %H:%M")

    logger.info(
        "Requesting %s data for %d station(s) from %s to %s",
        product,
        len(station_ids),
        begin_str,
        end_str,
    )

    urls = [
        client.build_url(
            station_id=station_id,
            begin_date=begin_str,
            end_date=end_str,
            product=product,
            datum=datum,
            units=units,
            time_zone=time_zone,
            interval=interval,
        )
        for station_id in station_ids
    ]

    return _process_responses(
        responses=client.fetch_data(urls),
        station_ids=station_ids,
        product=product,
        datum=datum,
        units=units,
        time_zone=time_zone,
    )

query_coops_bygeometry#

query_coops_bygeometry #

query_coops_bygeometry(
    geometry,
    begin_date,
    end_date,
    *,
    product="water_level",
    datum="MLLW",
    units="metric",
    time_zone="gmt",
    interval=None,
)

Fetch water level data from NOAA CO-OPS API for stations within a geometry.

PARAMETER DESCRIPTION
geometry

Geometry to select stations within (Point, Polygon, etc.)

TYPE: BaseGeometry

begin_date

Start date in format: yyyyMMdd, yyyyMMdd HH:mm, MM/dd/yyyy, or MM/dd/yyyy HH:mm

TYPE: str

end_date

End date in same format as begin_date.

TYPE: str

product

Data product to retrieve, by default water_level.

TYPE: ('water_level', 'hourly_height', 'high_low', 'predictions') DEFAULT: "water_level"

datum

Vertical datum for water levels, by default "MLLW".

TYPE: str DEFAULT: 'MLLW'

units

Units for data, by default "metric".

TYPE: ('metric', 'english') DEFAULT: "metric"

time_zone

Time zone for returned data, by default "gmt".

TYPE: ('gmt', 'lst', 'lst_ldt') DEFAULT: "gmt"

interval

Time interval for predictions product only, by default None.

TYPE: str | int | None DEFAULT: None

RETURNS DESCRIPTION
Dataset

Dataset containing water level data for stations within the geometry.

Source code in src/coastal_calibration/coops_api.py
def query_coops_bygeometry(
    geometry: shapely.geometry.base.BaseGeometry,
    begin_date: str,
    end_date: str,
    *,
    product: Literal[
        "water_level",
        "hourly_height",
        "high_low",
        "predictions",
    ] = "water_level",
    datum: str = "MLLW",
    units: Literal["metric", "english"] = "metric",
    time_zone: Literal["gmt", "lst", "lst_ldt"] = "gmt",
    interval: str | int | None = None,
) -> xr.Dataset:
    """Fetch water level data from NOAA CO-OPS API for stations within a geometry.

    Parameters
    ----------
    geometry : shapely.geometry.base.BaseGeometry
        Geometry to select stations within (Point, Polygon, etc.)
    begin_date : str
        Start date in format: yyyyMMdd, yyyyMMdd HH:mm, MM/dd/yyyy, or MM/dd/yyyy HH:mm
    end_date : str
        End date in same format as begin_date.
    product : {"water_level", "hourly_height", "high_low", "predictions"}, optional
        Data product to retrieve, by default ``water_level``.
    datum : str, optional
        Vertical datum for water levels, by default "MLLW".
    units : {"metric", "english"}, optional
        Units for data, by default "metric".
    time_zone : {"gmt", "lst", "lst_ldt"}, optional
        Time zone for returned data, by default "gmt".
    interval : str | int | None, optional
        Time interval for predictions product only, by default None.

    Returns
    -------
    xr.Dataset
        Dataset containing water level data for stations within the geometry.
    """
    import numpy as np
    import shapely

    client = COOPSAPIClient()
    if not all(shapely.is_valid(np.atleast_1d(geometry))):  # pyright: ignore[reportCallIssue,reportArgumentType]
        raise ValueError("Invalid geometry provided.")

    stations_gdf = client.stations_metadata
    selected_stations = stations_gdf[stations_gdf.intersects(geometry)]

    if selected_stations.empty:
        raise ValueError("No stations found within the specified geometry and buffer.")

    station_ids = selected_stations["station_id"].tolist()
    return query_coops_byids(
        station_ids,
        begin_date,
        end_date,
        product=product,
        datum=datum,
        units=units,
        time_zone=time_zone,
        interval=interval,
    )

Type Aliases#

# Model type
ModelType = Literal["schism", "sfincs"]

# Meteorological data source
MeteoSource = Literal["nwm_retro", "nwm_ana"]

# Coastal domain identifier
CoastalDomain = Literal["prvi", "hawaii", "atlgulf", "pacific"]

# Boundary condition source
BoundarySource = Literal["tpxo", "stofs"]

# Logging level
LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]

Constants#

Default Paths#

DEFAULT_SING_IMAGE_PATH = Path("/ngencerf-app/singularity/ngen-coastal.sif")
DEFAULT_PARM_DIR = Path("/ngen-test/coastal/ngwpc-coastal")
DEFAULT_NFS_MOUNT = Path("/ngen-test")
DEFAULT_CONDA_ENV_NAME = "ngen_forcing_coastal"
DEFAULT_NWM_DIR = Path("/ngen-app/nwm.v3.0.6")
DEFAULT_OTPS_DIR = Path("/ngen-app/OTPSnc")
DEFAULT_SLURM_PARTITION = "c5n-18xlarge"

Default Path Templates#

DEFAULT_WORK_DIR_TEMPLATE = (
    "/ngen-test/coastal/${slurm.user}/"
    "${model}_${simulation.coastal_domain}_${boundary.source}_${simulation.meteo_source}/"
    "${model}_${simulation.start_date}"
)

DEFAULT_RAW_DOWNLOAD_DIR_TEMPLATE = (
    "/ngen-test/coastal/${slurm.user}/"
    "${model}_${simulation.coastal_domain}_${boundary.source}_${simulation.meteo_source}/"
    "raw_data"
)

Model Registry#

MODEL_REGISTRY: dict[str, type[ModelConfig]] = {
    "schism": SchismModelConfig,
    "sfincs": SfincsModelConfig,
}