Skip to content

Python Reference#

The pyproject.toml of this repository primarily contains requirements for building documentation. The build sequence of the nwm-rte image is dynamic/configurable and involves multiple Python virtual environments.

CLI Help Menus#

These are raw outputs of the --help menu generated by Python's argparse library.

To update these, run: ./docs/update_python_cli_ref.sh

run_default.py --help

run_calibration.py --help

run_forecast.py --help

run_tests.py --help

run_regionalization.py --help

CLI Executable Modules#

run_default.py#

Called by run_default.sh

run_default.py --help

run_default #

Command-line executable to build and run a "default" realization. Supports realtime forcing configurations, e.g. "short_range", as well as historical/retrospective sources, e.g. "aorc".

This runs inside the ngen runtime environment. The CLI structure is mimicked in part by configs.RTEDefaultConfig. For settings that are not exposed by CLI arguments, see primarily consts.py.

See run_default.sh for example calls.

build_default_realization #

build_default_realization(
    cfg: RTEDefaultConfig,
) -> RealizationBuilder

Build and return a non-coldstart forecast realization

Source code in bin_mounted/run_default.py
def build_default_realization(cfg: RTEDefaultConfig) -> RealizationBuilder:
    """Build and return a non-coldstart forecast realization"""
    print("Building default realization...")
    rb = RealizationBuilder(**cfg.realization_builder_kwargs)
    rb.build_default_realization()
    configure_ngen_log(rb.work_dir, "default")  # TODO test this
    return rb

get_ngen_cmd #

get_ngen_cmd(
    cfg: RTEDefaultConfig, rb: RealizationBuilder
) -> list[str]

Build and return the ngen command as a list of strings. rb must have already been built, e.g. rb.build_default_realization() already called.

Source code in bin_mounted/run_default.py
def get_ngen_cmd(cfg: RTEDefaultConfig, rb: RealizationBuilder) -> list[str]:
    """Build and return the ngen command as a list of strings.
    rb must have already been built, e.g. rb.build_default_realization() already called."""
    cmd = [
        os.path.join(rb.input_dir, "ngen"),
        rb.cat_file,
        "all",
        rb.nexus_file,
        "all",
        rb.realization_file,
    ]
    if cfg.nprocs > 1:
        cmd = ["mpirun", "-n", f"{cfg.nprocs}"] + cmd + [rb.part_file]
    return cmd

run_calibration.py#

Called by run_calib.sh

run_calibration.py --help

run_calibration #

Command-line executable to build and run a "calibration" realization.

This runs inside the ngen runtime environment. The CLI structure is mimicked in part by configs.RTECalibConfig. For settings that are not exposed by CLI arguments, see primarily consts.py.

See run_calib.sh for example calls.

calibration__build_and_run #

calibration__build_and_run(cfg: RTECalibConfig) -> None

Build calibration realizations and run them as tests.

Source code in bin_mounted/run_calibration.py
def calibration__build_and_run(cfg: RTECalibConfig) -> None:
    """Build calibration realizations and run them as tests."""

    windows = CalibTimeWindows(
        calib_sim_start=cfg.calib_sim_start,
        calib_sim_duration=cfg.calib_sim_duration,
        calib_eval_delayment=cfg.calib_eval_delayment,
        valid_sim_advancement=cfg.valid_sim_advancement,
        valid_eval_curtailment=cfg.valid_eval_curtailment,
    )

    all_config_overrides = get_test_configs__calibration(
        nprocs=cfg.nprocs,
        gage_id=cfg.gage_id,
        gage_vintage=cfg.gage_vintage,
        obj_func=cfg.objective_function,
        optim_algo=cfg.optimization_algorithm,
        forcing_config_types=[cfg.forcing_source],
        global_domain=cfg.global_domain,
        forcing_provider=cfg.forcing_provider,
        forcing_static_dir=cfg.forcing_static_dir,
        windows=windows,
        obs_dir=cfg.obs_dir,
        nwmretro_file=cfg.nwmretro_file,
        run_type="calibration",
    )
    assert (
        len(all_config_overrides) == 1
    )  # Can be > 1 in test runner, not in atomic calibration runner
    config_overrides = all_config_overrides[0]

    rb_kwargs = {"config_overrides": config_overrides}
    rb = RealizationBuilder(**rb_kwargs)

    if cfg.forcing_source not in c.CALIB_FORCING_CONFIGURATION_TYPES:
        raise ValueError(
            f"cfg.default_realization = {cfg.default_realization} (calibration), but cfg.forcing_source {cfg.forcing_source} not in c.CALIB_FORCING_CONFIGURATION_TYPES {c.CALIB_FORCING_CONFIGURATION_TYPES}"
        )
    rb.build_calib_realization()
    log_path = get_calibration_log_file_overwrite_path(rb)
    cmd = [
        "calibration",
        str(rb.calib_config_file),
        "--log_path_overwrite",
        log_path,
    ]
    if cfg.worker_name:
        cmd.extend(["--worker_name", cfg.worker_name])
    cwd = None
    msg_suffix = f" Log path: {log_path}"

    configure_ngen_log(rb.work_dir, "cal")
    print(
        f"\n\nStarting calibration with configuration: {cfg.model_dump_json(indent=2)}\n\nvia command args: {cmd} with cwd={cwd}.{msg_suffix}"
    )
    start = time.perf_counter()
    proc = subprocess.run(cmd, check=False, cwd=cwd)
    print(
        f"\nFinished calibration with configuration: {cfg.model_dump_json(indent=2)},\nfinished in {((time.perf_counter() - start) / 60):.1f} minutes.\nReturn code {proc.returncode}.\nCommand was: {cmd}, with cwd={cwd}.{msg_suffix}"
    )
    proc.check_returncode()

run_forecast.py#

Called by run_fcst.sh

run_forecast.py --help

run_forecast #

Command-line executable to build and run a "forecast" realization, optionally with a coldstart.

This runs inside the ngen runtime environment. The CLI structure is mimicked in part by configs.RTEForecastConfig. For settings that are not exposed by CLI arguments, see primarily consts.py.

See run_fcst.sh for example calls.

build_coldstart_realization #

build_coldstart_realization(
    cfg: RTEForecastConfig,
) -> RealizationBuilder

Build and return a coldstart forecast realization

Source code in bin_mounted/run_forecast.py
def build_coldstart_realization(cfg: RTEForecastConfig) -> RealizationBuilder:
    """Build and return a coldstart forecast realization"""
    print(
        f"Building coldstart realization: {cfg.realization_builder_kwargs}, use_cold_start=True"
    )
    rb_cs = RealizationBuilder(**cfg.realization_builder_kwargs, use_cold_start=True)
    rb_cs.build_fcst_realization()
    print(f"Wrote: {rb_cs.realization_file}")
    # From existing fcst mgr conventions, e.g. ### Set environment variable NGEN_RESULTS_DIR to: /ngwpc/run_ngen/kge_dds/test_bmi/01123000/Output/Forecast_Run/fcst_run1_short_range
    configure_ngen_log(rb_cs.input_dir, "cs")
    if cfg.nprocs > 1 and not rb_cs.part_file:
        raise ValueError(
            f"Expected partition file since cfg.nprocs > 1 ({cfg.nprocs}), but it is {repr(rb_cs.part_file)}"
        )
    return rb_cs

build_forecast_realization #

build_forecast_realization(
    cfg: RTEForecastConfig,
) -> RealizationBuilder

Build and return a non-coldstart forecast realization

Source code in bin_mounted/run_forecast.py
def build_forecast_realization(cfg: RTEForecastConfig) -> RealizationBuilder:
    """Build and return a non-coldstart forecast realization"""
    print(
        f"Building forecast realization: {cfg.realization_builder_kwargs}, use_cold_start=False"
    )
    rb_fcst = RealizationBuilder(**cfg.realization_builder_kwargs, use_cold_start=False)
    rb_fcst.build_fcst_realization()
    # From existing fcst mgr conventions, e.g. ### Set environment variable NGEN_RESULTS_DIR to: /ngwpc/run_ngen/kge_dds/test_bmi/01123000/Output/Forecast_Run/fcst_run1_short_range
    configure_ngen_log(rb_fcst.input_dir, "fcst")
    print(f"Wrote: {rb_fcst.realization_file}")
    if cfg.nprocs > 1 and not rb_fcst.part_file:
        raise ValueError(
            f"Expected partition file since cfg.nprocs > 1 ({cfg.nprocs}), but it is {repr(rb_fcst.part_file)}"
        )
    return rb_fcst

run_tests.py#

Called by run_tests.sh

run_tests.py --help

run_tests #

Command-line executable to build and run a series of "forecast" realizations, optionally with a "calibration" realization preceding them.

This runs inside the ngen runtime environment. The CLI structure is mimicked in part by configs.RTETestConfig. For settings that are not exposed by CLI arguments, see primarily consts.py.

When realizations fail, this program does not halt, but rather moves to the next configuration type in the list, with the goal of "trying" many different realization configurations in one call. The status of each configuration's build step and run step is reported and written to a json file at the end.

This includes options for stopping realizations mid-way through their run, rather than waiting for them to complete.

See run_tests.sh for example calls.

calibrations__build_and_run #

calibrations__build_and_run(
    cfg: RTETestConfig, tm: TestsManager
) -> None

Build calibration realizations and run them as tests.

Source code in bin_mounted/run_tests.py
def calibrations__build_and_run(cfg: RTETestConfig, tm: TestsManager) -> None:
    """Build calibration realizations and run them as tests."""
    for obj_func, optim_algo, _ in cfg.get_calib_permutations():
        for config_overrides in get_test_configs__calibration(
            nprocs=cfg.nprocs,
            gage_id=cfg.gage_id,
            gage_vintage=cfg.gage_vintage,
            obj_func=obj_func,
            optim_algo=optim_algo,
            global_domain=cfg.global_domain,
            forcing_provider=cfg.forcing_provider,
            forcing_static_dir=cfg.forcing_static_dir,
        ):
            fc = config_overrides.Forcing.forcing_configuration
            msg_prefix = f"Calibration {repr(fc)} with calib obj_func={repr(obj_func.value)}, optim_algo={repr(optim_algo.value)}"
            rb_kwargs = {"config_overrides": config_overrides}
            print(
                f"\n\n##########\n### {msg_prefix}: setting up test with rb_kwargs = \n{json.dumps(rb_kwargs, indent=2, default=pydantic_encoder)}"
            )
            t = ForecastTest(rb_kwargs=rb_kwargs)

            # Build the realization, trapping exceptions into class attrs
            print(f"### {msg_prefix}: building realization")
            t.make_realization_builder__build_realization(
                build_method="build_calib_realization"
            )

            if t.rb_stat == TestStat.PASS:
                configure_ngen_log(t.rb.work_dir, "cal_test")
                # Execute the realization via ngen, trapping exceptions and logs into class attrs
                print(f"### {msg_prefix}: executing calibration realization")
                t.execute_calibration(cfg.quit_calibration_after_duration)

            tm.add_forecast_test(t)

forecasts__build_and_run #

forecasts__build_and_run(
    cfg: RTETestConfig, tm: TestsManager, cs: bool
) -> None

Using ForecastTest, build and execute a list of forecast realizations. tests_manager is modified in-place, so some test results may be available if this function is interrupted. cs controls whether coldstart is used (not cfg.do_coldstart).

Source code in bin_mounted/run_tests.py
def forecasts__build_and_run(cfg: RTETestConfig, tm: TestsManager, cs: bool) -> None:
    """
    Using ForecastTest, build and execute a list of forecast realizations.
    tests_manager is modified in-place, so some test results may be available if this function is interrupted.
    `cs` controls whether coldstart is used (not `cfg.do_coldstart`).
    """
    for obj_func, optim_algo, test_paths in cfg.get_calib_permutations():
        test_configs = get_test_configs__forecast(
            cfg.do_all_forcing_configs,
            use_cold_start=cs,
            gage_id=cfg.gage_id,
            global_domain=cfg.global_domain,
            forcing_provider=cfg.forcing_provider,
            forcing_static_dir=cfg.forcing_static_dir,
            nprocs=cfg.nprocs,
        )
        for tc in test_configs:
            if (
                cfg.quit_forecast_after_forcing_running
                and tc.Forcing.forcing_configuration != "short_range"
            ):
                raise NotImplementedError(
                    f"quit_forecast_after_forcing_running not yet tested for forcing_configuration = {repr(tc.Forcing.forcing_configuration)}"
                )

        for config_overrides in test_configs:
            fc = config_overrides.Forcing.forcing_configuration
            msg_prefix = f"Forecast {repr(fc)} with calib obj_func={repr(obj_func.value)}, optim_algo={repr(optim_algo.value)}"
            rb_kwargs = {
                # "input_path": test_paths.dir_input,
                "valid_yaml": test_paths.valid_yaml,
                "fcst_run_name": cfg.fcst_run_name,
                "config_overrides": config_overrides,
                "use_cold_start": cs,
            }
            print(
                f"\n\n##########\n### {msg_prefix}: setting up test with rb_kwargs = {rb_kwargs}"
            )

            run_type = "Cold_Start_Run" if cs else "Forecast_Run"
            t = ForecastTest(
                rb_kwargs=rb_kwargs,
                ### TODO update this to work with new EWTS per-rank logs, and new RTE log paths
                # ngen_log=LogParser(
                #     path=f"{test_paths.dir_output}/{run_type}/{cfg.fcst_run_name}/logs/ngen.log"
                # ),
            )

            # Build the realization, trapping exceptions into class attrs
            print(f"### {msg_prefix}: building realization")
            t.make_realization_builder__build_realization(
                build_method="build_fcst_realization"
            )

            if t.rb_stat == TestStat.PASS:
                # Execute the realization via ngen, trapping exceptions and logs into class attrs
                configure_ngen_log(t.rb.input_dir, "fcst_test")
                print(f"### {msg_prefix}: executing realization via ngen")
                t.execute_forecast(
                    quit_forecast_after_forcing_running=cfg.quit_forecast_after_forcing_running,
                    quit_forecast_after_duration=cfg.quit_forecast_after_duration,
                )

            tm.add_forecast_test(t)

run_noop_mode #

run_noop_mode() -> None

Run noop mode - verify imports and basic setup without executing workflows.

Source code in bin_mounted/run_tests.py
def run_noop_mode() -> None:
    """Run noop mode - verify imports and basic setup without executing workflows."""
    print("\nRunning in noop mode - only checking imports and basic setup.")
    print("Successfully imported all required libraries.")
    print("Noop mode complete - exiting")
    sys.exit(0)  # Exit the program directly

Python Constants#

Currently bin_mounted/consts.py contains variables which rarely need editing, except for the MODELS formulation list. Soon, the MODELS variable will be parameterized as CLI argument(s).

consts #

Constants

MODELS module-attribute #

MODELS = 'noah-owp-modular,cfe-s'

Model formulations. See nwm-msw-mgr for details.

Configuration Classes#

These mimic the CLI interfaces and perform some additional argument parsing and preparation of classes that are passed to other components of the system.

configs #

TestPaths dataclass #

Paths dependent on calibration settings. If iterating over a list of objective functions or optimization algorithms, obj_func and optim_algo may need to be replaced on the fly during the iterations.

Source code in bin_mounted/configs.py
@dataclass
class TestPaths:
    """
    Paths dependent on calibration settings.
    If iterating over a list of objective functions or optimization algorithms,
    obj_func and optim_algo may need to be replaced on the fly during the iterations.
    """

    gage_id: str
    gage_vintage: str
    obj_func: c.CalObjective | None
    optim_algo: c.CalOptimizationAlgo | None
    global_domain: str
    forcing_provider: str
    forcing_static_dir: str

    def update_obj_func_and_optim_algo(
        self, obj_func: c.CalObjective, optim_algo: c.CalOptimizationAlgo
    ) -> None:
        """Informal setter for obj_func and optim_algo"""
        self.obj_func = obj_func
        self.optim_algo = optim_algo

    @property
    def fpp(self):
        """Build and return a ForcingProviderPaths instance to assist with setup."""
        return ForcingProviderPaths(
            global_domain=self.global_domain,
            forcing_provider=self.forcing_provider,
            forcing_static_dir=self.forcing_static_dir,
        )

    @property
    def dir_base(self) -> str:
        """The base directory of the model (can contain calibrations and forecasts)."""
        if not (self.obj_func and self.optim_algo):
            raise ValueError(
                "obj_func and optim_algo must be set before calling this method"
            )
        return f"{c.DEFAULT_MAIN_DIR}/{self.obj_func.value}_{self.optim_algo.value}/{self.fpp.formulation_name}/{self.gage_id}"

    @property
    def dir_input(self) -> str:
        """The Input directory of the model"""
        return f"{self.dir_base}/Input"

    @property
    def dir_output(self) -> str:
        """The Output directory of the model"""
        return f"{self.dir_base}/Output"

    @property
    def ngen_log_file(self) -> str:
        """The ngen.log file of the model"""
        return f"{self.dir_base}/logs/ngen.log"

    @property
    def calib_config_file(self) -> str:
        """Path to example input calibration config file"""
        return (
            f"{self.dir_base}/configs/input_calibration_{self.forcing_provider}.config"
        )
        # return f"{self.dir_base}/configs/input_calibration_{self.forcing_provider}_short.config"

    @property
    def fcst_config_file(self) -> str:
        """Path to example input forecast config file"""
        return f"{self.dir_base}/configs/input_forecast.config"

    @property
    def valid_yaml(self) -> str:
        """Path to validation yaml config file"""
        return f"{self.dir_output}/Validation_Run/{self.gage_id}_config_valid_best.yaml"

fpp property #

fpp

Build and return a ForcingProviderPaths instance to assist with setup.

dir_base property #

dir_base: str

The base directory of the model (can contain calibrations and forecasts).

dir_input property #

dir_input: str

The Input directory of the model

dir_output property #

dir_output: str

The Output directory of the model

ngen_log_file property #

ngen_log_file: str

The ngen.log file of the model

calib_config_file property #

calib_config_file: str

Path to example input calibration config file

fcst_config_file property #

fcst_config_file: str

Path to example input forecast config file

valid_yaml property #

valid_yaml: str

Path to validation yaml config file

update_obj_func_and_optim_algo #

update_obj_func_and_optim_algo(
    obj_func: CalObjective, optim_algo: CalOptimizationAlgo
) -> None

Informal setter for obj_func and optim_algo

Source code in bin_mounted/configs.py
def update_obj_func_and_optim_algo(
    self, obj_func: c.CalObjective, optim_algo: c.CalOptimizationAlgo
) -> None:
    """Informal setter for obj_func and optim_algo"""
    self.obj_func = obj_func
    self.optim_algo = optim_algo

RTESetup #

Bases: BaseModel

Used to set up a RTE run. Triggers certain setup actions, such as creation of WCOSS-path symlinks. Classes that inherit from this should call super().model_post_init(__context) inside their own model_post_init() method, if they have that method also defined in the child.

Source code in bin_mounted/configs.py
class RTESetup(BaseModel):
    """Used to set up a RTE run. Triggers certain setup actions, such as creation of WCOSS-path symlinks.
    Classes that inherit from this should call super().model_post_init(__context) inside their own
    model_post_init() method, if they have that method also defined in the child."""

    def model_post_init(self, __context) -> None:
        make_wcoss_path_symlinks()

RTEDefaultConfig #

Bases: RTESetup

Configuration class for building and running one default realization (realtime forcing configuration or historical / retrospective forcing configuration).

ATTRIBUTE DESCRIPTION
delete_scratch_and_mesh_first

Causes scratch dir and intermediary mesh to be deleted first

TYPE: bool

delete_forcing_raw_input_first

Causes realtime forcing data cache dir to be deleted first

TYPE: bool

gage_id__gage_vintage

Gage ID and vintage

TYPE: list[str] = Field(min_length=2, max_length=2)

global_domain

e.g. "CONUS", "Hawaii", "Alaska", "PuertoRico"

TYPE: str

forcing_static_dir

Forcing static directory

TYPE: str

forcing_provider

Forcing provider, i.e. "bmi" or "csv"

TYPE: str

cycle_datetime

Start time of the realization

TYPE: datetime

historical_sim_duration

Duration of the simulation (only used for historical / retrospective forcing configurations)

TYPE: timedelta | None

forcing_configuration

Forcing configuration, e.g. "aorc" or "short_range"

TYPE: str

fcst_run_name

Name of the forecast realization run. Affects a directory name.

TYPE: str

nprocs

Number of processors to use

TYPE: int = Field(ge=1)

# The following are set after init during self.model_post_init(). Do not provide

gage_id

Gage ID

TYPE: str = Field(init=False, default=None)

gage_vintage

Gage vintage

TYPE: str = Field(init=False, default=None)

realtime_mode

Realtime mode

TYPE: bool = Field(init=False, default=None)

realization_builder_kwargs

Realization builder kwargs (passed to nwm-msw-mgr)

TYPE: dict = Field(init=False, default=None)

Source code in bin_mounted/configs.py
class RTEDefaultConfig(RTESetup):
    """Configuration class for building and running one default realization
    (realtime forcing configuration or historical / retrospective forcing configuration).

    Attributes
    ----------
    delete_scratch_and_mesh_first: bool
        Causes scratch dir and intermediary mesh to be deleted first
    delete_forcing_raw_input_first: bool
        Causes realtime forcing data cache dir to be deleted first
    gage_id__gage_vintage: list[str] = Field(min_length=2, max_length=2)
        Gage ID and vintage
    global_domain: str
        e.g. "CONUS", "Hawaii", "Alaska", "PuertoRico"
    forcing_static_dir :
        Forcing static directory
    forcing_provider: str
        Forcing provider, i.e. "bmi" or "csv"
    cycle_datetime: datetime
        Start time of the realization
    historical_sim_duration: timedelta | None
        Duration of the simulation (only used for historical / retrospective forcing configurations)
    forcing_configuration: str
        Forcing configuration, e.g. "aorc" or "short_range"
    fcst_run_name: str
        Name of the forecast realization run. Affects a directory name.
    nprocs: int = Field(ge=1)
        Number of processors to use
    # The following are set after init during self.model_post_init(). Do not provide
    gage_id: str = Field(init=False, default=None)
        Gage ID
    gage_vintage: str = Field(init=False, default=None)
        Gage vintage
    realtime_mode: bool = Field(init=False, default=None)
        Realtime mode
    realization_builder_kwargs: dict = Field(init=False, default=None)
        Realization builder kwargs (passed to `nwm-msw-mgr`)
    """

    model_config = ConfigDict(strict=True, arbitrary_types_allowed=True)

    delete_scratch_and_mesh_first: bool
    delete_forcing_raw_input_first: bool
    gage_id__gage_vintage: list[str] = Field(min_length=2, max_length=2)
    global_domain: str
    forcing_static_dir: str
    forcing_provider: str
    cycle_datetime: datetime
    historical_sim_duration: timedelta | None
    forcing_configuration: str
    fcst_run_name: str
    nprocs: int = Field(ge=1)

    # Set after init
    gage_id: str = Field(init=False, default=None)
    gage_vintage: str = Field(init=False, default=None)
    realtime_mode: bool = Field(init=False, default=None)

    # Other derived attrs (not passed to __init__)
    realization_builder_kwargs: dict = Field(init=False, default=None)

    def model_post_init(self, __context) -> None:
        super().model_post_init(__context)  # Call RTESetup's post init

        errors = []

        if (
            self.forcing_configuration
            not in c.FORECAST_FORCING_CONFIGURATION_TYPES__ALL
        ):
            self.realtime_mode = False
        else:
            self.realtime_mode = True

        self.gage_id, self.gage_vintage, errors_extend = parse_gage_id__gage_vintage(
            self.gage_id__gage_vintage
        )
        errors.extend(errors_extend)

        if (not self.realtime_mode) and (not self.historical_sim_duration):
            errors.extend(
                [
                    f"Forcing configuration {repr(self.forcing_configuration)} is *not* realtime, and requires that CLI arg -dur aka --historical_sim_duration is provided, but it was not."
                ]
            )
        if self.realtime_mode and self.historical_sim_duration:
            errors.extend(
                [
                    f"Forcing configuration {repr(self.forcing_configuration)} *is* realtime, but CLI arg -dur aka --historical_sim_duration was also provided (it should not be)."
                ]
            )

        if errors:
            raise RuntimeError(errors)

        self.realization_builder_kwargs = self._make_realization_builder_kwargs()

    def _make_realization_builder_kwargs(self) -> dict:
        """Build and return a dictionary for creating a RealizationBuilder instance."""
        fpp = ForcingProviderPaths(
            forcing_provider=self.forcing_provider,
            global_domain=self.global_domain,
            forcing_static_dir=self.forcing_static_dir,
        )

        windows = CalibTimeWindows(
            calib_sim_start=self.cycle_datetime,
            calib_sim_duration=self.historical_sim_duration
            if self.historical_sim_duration
            else c.CALIB_SIM_DURATION_DEFAULT,
            calib_eval_delayment=c.CALIB_EVAL_DELAYMENT_DEFAULT,
            valid_sim_advancement=c.VALID_SIM_ADVANCEMENT_DEFAULT,
            valid_eval_curtailment=c.VALID_EVAL_CURTAILMENT_DEFAULT,
        )

        if self.realtime_mode:
            start_period = None
            end_period = None
        else:
            start_period = windows.calib_eval_start.strftime(DDF)
            end_period = windows.calib_eval_end.strftime(DDF)

        cycle_datetime = self.cycle_datetime.strftime(
            mswm_settings.DEFAULT_DATETIME_FORMAT
        )

        realization_kwargs = {
            # "input_path": forecast_vars.forecast_input_config,
            "fcst_run_name": self.fcst_run_name,
            "config_overrides": InputConfig(
                General=GeneralConfig(
                    basin=self.gage_id,
                    run_type="default",
                    models=c.MODELS,
                    formulation=fpp.formulation_name,
                    main_dir=c.DEFAULT_MAIN_DIR,
                    start_period=start_period,
                    end_period=end_period,
                    output_precip=True,
                    output_swe=True,
                    output_sm=True,
                    domain=self.global_domain.lower(),
                ),
                ModuleProperties=ModulePropertiesConfig(),
                Forcing=ForcingConfig(
                    forcing_provider=fpp.forcing_provider,
                    forcing_dir=fpp.get_forcing_dir(gage_id=self.gage_id),
                    forcing_template_dir=c.FORCING_TEMPLATE_DIR,
                    root_dir=c.FORCING_ROOT_DIR,
                    forcing_configuration=self.forcing_configuration,
                    cycle_datetime=cycle_datetime,
                    cold_start_datetime=None,
                    global_domain=self.global_domain,
                    forcing_static_dir=self.forcing_static_dir,
                    scratch_dir_override=c.SCRATCH_DIR_OVERRIDE,
                    input_forcing_dirs_override_root=c.INPUT_FORCING_DIRS_OVERRIDE_ROOT,
                    forcing_product_versions=c.FORCING_PRODUCT_VERSIONS_DICT,
                ),
                DataFile=DataFileConfig(
                    **(
                        c.DATAFILE_LIBS
                        | {
                            "hydrofab_file": f"{c.HYDROFABRIC_DIR}/2.2/{self.global_domain}/{self.gage_id}/GEOPACKAGE/USGS/{self.gage_vintage}/gauge_{self.gage_id}.gpkg",
                        }
                    ),
                ),
                Parallel=make_parallel_config(self.nprocs),
            ),
        }
        return realization_kwargs

RTECalibConfig #

Bases: RTESetup

Configuration class for building and running one calibration realization.

ATTRIBUTE DESCRIPTION
delete_scratch_and_mesh_first

Causes scratch dir and intermediary mesh to be deleted first

TYPE: bool

delete_forcing_raw_input_first

Causes realtime forcing data cache dir to be deleted first

TYPE: bool

objective_function

Objective function, e.g. "kge"

TYPE: CalObjective

optimization_algorithm

Optimization algorithm, e.g. "dds"

TYPE: CalOptimizationAlgo

nprocs

Number of processors to use

TYPE: int = Field(ge=1)

gage_id__gage_vintage

Gage ID and vintage

TYPE: list[str] = Field(min_length=2, max_length=2)

calib_sim_start

Calibration start time

TYPE: datetime

calib_sim_duration

Calibration simulation duration

TYPE: timedelta

calib_eval_delayment

Used for evaluation / validation time windowing

TYPE: timedelta

valid_sim_advancement

Used for evaluation / validation time windowing

TYPE: timedelta

valid_eval_curtailment

Used for evaluation / validation time windowing

TYPE: timedelta

forcing_source

Source of forcing data, e.g. "aorc" or "nwm"

TYPE: str

global_domain

e.g. "CONUS", "Hawaii", "Alaska", "PuertoRico"

TYPE: str

forcing_provider

Forcing provider, i.e. "bmi" or "csv"

TYPE: str

forcing_static_dir

Forcing static directory

TYPE: str

worker_name

Name of the ngen worker (used to build a directory name)

TYPE: str | None

# The following are set after init during self.model_post_init(). Do not provide.

gage_id

Gage ID

TYPE: str = Field(init=False, default=None)

gage_vintage

Gage vintage

TYPE: str = Field(init=False, default=None)

obs_dir

Directory of observed flow data

TYPE: str | None = Field(init=False, default=None)

nwmretro_file

File containing retrospective NWM flow data

TYPE: str | None = Field(init=False, default=None)

Source code in bin_mounted/configs.py
class RTECalibConfig(RTESetup):
    """Configuration class for building and running one calibration realization.

    Attributes
    ----------
    delete_scratch_and_mesh_first: bool
        Causes scratch dir and intermediary mesh to be deleted first
    delete_forcing_raw_input_first: bool
        Causes realtime forcing data cache dir to be deleted first
    objective_function: c.CalObjective
        Objective function, e.g. "kge"
    optimization_algorithm: c.CalOptimizationAlgo
        Optimization algorithm, e.g. "dds"
    nprocs: int = Field(ge=1)
        Number of processors to use
    gage_id__gage_vintage: list[str] = Field(min_length=2, max_length=2)
        Gage ID and vintage
    calib_sim_start: datetime
        Calibration start time
    calib_sim_duration: timedelta
        Calibration simulation duration
    calib_eval_delayment: timedelta
        Used for evaluation / validation time windowing
    valid_sim_advancement: timedelta
        Used for evaluation / validation time windowing
    valid_eval_curtailment: timedelta
        Used for evaluation / validation time windowing
    forcing_source: str
        Source of forcing data, e.g. "aorc" or "nwm"
    global_domain: str
        e.g. "CONUS", "Hawaii", "Alaska", "PuertoRico"
    forcing_provider: str
        Forcing provider, i.e. "bmi" or "csv"
    forcing_static_dir: str
        Forcing static directory
    worker_name: str | None
        Name of the ngen worker (used to build a directory name)
    # The following are set after init during self.model_post_init(). Do not provide.
    gage_id: str = Field(init=False, default=None)
        Gage ID
    gage_vintage: str = Field(init=False, default=None)
        Gage vintage
    obs_dir: str | None = Field(init=False, default=None)
        Directory of observed flow data
    nwmretro_file: str | None = Field(init=False, default=None)
        File containing retrospective NWM flow data
    """

    model_config = ConfigDict(strict=True, arbitrary_types_allowed=True)

    delete_scratch_and_mesh_first: bool
    delete_forcing_raw_input_first: bool
    objective_function: c.CalObjective
    optimization_algorithm: c.CalOptimizationAlgo
    nprocs: int = Field(ge=1)
    gage_id__gage_vintage: list[str] = Field(min_length=2, max_length=2)
    calib_sim_start: datetime
    calib_sim_duration: timedelta
    calib_eval_delayment: timedelta
    valid_sim_advancement: timedelta
    valid_eval_curtailment: timedelta
    forcing_source: str
    global_domain: str
    forcing_provider: str
    forcing_static_dir: str
    worker_name: str | None

    # Set after init
    gage_id: str = Field(init=False, default=None)
    gage_vintage: str = Field(init=False, default=None)
    obs_dir: str | None = Field(init=False, default=None)
    nwmretro_file: str | None = Field(init=False, default=None)

    def model_post_init(self, __context) -> None:
        super().model_post_init(__context)  # Call RTESetup's post init

        errors = []

        self.gage_id, self.gage_vintage, errors_extend = parse_gage_id__gage_vintage(
            self.gage_id__gage_vintage
        )
        errors.extend(errors_extend)

        self.obs_dir, self.nwmretro_file, errors_extend = get_data_paths_for_lstm(
            self.global_domain,
            self.gage_id,
        )
        errors.extend(errors_extend)

        if errors:
            raise RuntimeError(errors)

RTEForecastConfig #

Bases: RTESetup

Configuration class for building and running one forecast realization.

ATTRIBUTE DESCRIPTION
delete_scratch_and_mesh_first

Causes scratch dir and intermediary mesh to be deleted first

TYPE: bool

delete_forcing_raw_input_first

Causes realtime forcing data cache dir to be deleted first

TYPE: bool

objective_function

Affects input realization path. Objective function of previously-ran calibration realization, e.g. "kge"

TYPE: CalObjective

optimization_algorithm

Affects input realization path. Optimization algorithm of previously-ran calibration realization, e.g. "dds"

TYPE: CalOptimizationAlgo

gage_id

Gage ID

TYPE: str

global_domain

e.g. "CONUS", "Hawaii", "Alaska", "PuertoRico"

TYPE: str

forcing_static_dir

Forcing static directory

TYPE: str

forcing_provider

Forcing provider, i.e. "bmi" or "csv"

TYPE: str

cycle_datetime

Start time of the realization (or end time for coldstart, if cold_start_datetime is provided)

TYPE: datetime | None

cold_start_datetime

Start time of the coldstart realization. If None, coldstart is not performed.

TYPE: datetime | None

forcing_configuration

Forcing configuration, e.g. "aorc" or "short_range"

TYPE: str

fcst_run_name

Name of the forecast realization run

TYPE: str

nprocs

Number of processors to use

TYPE: int = Field(ge=1)

# The following are set after init during self.model_post_init(). Do not provide.

run_dir_base

Run directory root

TYPE: str = Field(init=False, default=None)

run_dir_input

Input run directory

TYPE: str = Field(init=False, default=None)

run_dir_output

Output run directory

TYPE: str = Field(init=False, default=None)

ngen_log_file

ngen stdout + stderr stream log file

TYPE: str = Field(init=False, default=None)

valid_best_yaml

Validation yaml file (output from previously-ran calibration realization)

TYPE: str = Field(init=False, default=None)

realization_builder_kwargs

Realization builder kwargs (passed to nwm-msw-mgr)

TYPE: dict = Field(init=False, default=None)

Source code in bin_mounted/configs.py
class RTEForecastConfig(RTESetup):
    """Configuration class for building and running one forecast realization.

    Attributes
    ----------
    delete_scratch_and_mesh_first: bool
        Causes scratch dir and intermediary mesh to be deleted first
    delete_forcing_raw_input_first: bool
        Causes realtime forcing data cache dir to be deleted first
    objective_function: c.CalObjective
        Affects input realization path. Objective function of previously-ran calibration realization, e.g. "kge"
    optimization_algorithm: c.CalOptimizationAlgo
        Affects input realization path. Optimization algorithm of previously-ran calibration realization, e.g. "dds"
    gage_id: str
        Gage ID
    global_domain: str
        e.g. "CONUS", "Hawaii", "Alaska", "PuertoRico"
    forcing_static_dir: str
        Forcing static directory
    forcing_provider: str
        Forcing provider, i.e. "bmi" or "csv"
    cycle_datetime: datetime | None
        Start time of the realization (or end time for coldstart, if `cold_start_datetime` is provided)
    cold_start_datetime: datetime | None
        Start time of the coldstart realization. If None, coldstart is not performed.
    forcing_configuration: str
        Forcing configuration, e.g. "aorc" or "short_range"
    fcst_run_name: str
        Name of the forecast realization run
    nprocs: int = Field(ge=1)
        Number of processors to use
    # The following are set after init during self.model_post_init(). Do not provide.
    run_dir_base: str = Field(init=False, default=None)
        Run directory root
    run_dir_input: str = Field(init=False, default=None)
        Input run directory
    run_dir_output: str = Field(init=False, default=None)
        Output run directory
    ngen_log_file: str = Field(init=False, default=None)
        ngen stdout + stderr stream log file
    valid_best_yaml: str = Field(init=False, default=None)
        Validation yaml file (output from previously-ran calibration realization)
    realization_builder_kwargs: dict = Field(init=False, default=None)
        Realization builder kwargs (passed to `nwm-msw-mgr`)
    """

    model_config = ConfigDict(strict=True, arbitrary_types_allowed=True)

    delete_scratch_and_mesh_first: bool
    delete_forcing_raw_input_first: bool
    ### These calibration parameters affect directory path
    objective_function: c.CalObjective
    optimization_algorithm: c.CalOptimizationAlgo
    gage_id: str
    global_domain: str
    forcing_static_dir: str
    forcing_provider: str
    cycle_datetime: datetime | None
    cold_start_datetime: datetime | None
    forcing_configuration: str
    fcst_run_name: str
    nprocs: int = Field(ge=1)

    # Derived paths (not passed to __init__)
    run_dir_base: str = Field(init=False, default=None)
    run_dir_input: str = Field(init=False, default=None)
    run_dir_output: str = Field(init=False, default=None)
    ngen_log_file: str = Field(init=False, default=None)
    valid_best_yaml: str = Field(init=False, default=None)

    # Other derived attrs (not passed to __init__)
    realization_builder_kwargs: dict = Field(init=False, default=None)

    def model_post_init(self, __context) -> None:
        super().model_post_init(__context)  # Call RTESetup's post init

        self.run_dir_base = f"{c.DEFAULT_MAIN_DIR}/{self.objective_function.value}_{self.optimization_algorithm.value}/test_{self.forcing_provider}/{self.gage_id}"
        if not os.path.isdir(self.run_dir_base):
            msg = f"Not a directory: {repr(self.run_dir_base)}. Please review choices for objective function, optimization algorithm, and gage, which affect this path."
            raise NotADirectoryError(msg)

        self.run_dir_input = f"{self.run_dir_base}/Input"
        self.run_dir_output = f"{self.run_dir_base}/Output"
        self.ngen_log_file = f"{self.run_dir_base}/logs/ngen.log"
        self.valid_best_yaml = f"{self.run_dir_output}/Validation_Run/{self.gage_id}_config_valid_best.yaml"

        self.realization_builder_kwargs = self._make_realization_builder_kwargs()

    def _make_realization_builder_kwargs(self) -> dict:
        """Build and return a dictionary for creating a RealizationBuilder instance."""
        fpp = ForcingProviderPaths(
            forcing_provider=self.forcing_provider,
            global_domain=self.global_domain,
            forcing_static_dir=self.forcing_static_dir,
        )
        realization_kwargs = {
            # "input_path": forecast_vars.forecast_input_config,
            "valid_yaml": self.valid_best_yaml,
            "fcst_run_name": self.fcst_run_name,
            "config_overrides": InputConfig(
                Forcing=ForcingConfig(
                    forcing_provider=fpp.forcing_provider,
                    forcing_dir=fpp.get_forcing_dir(gage_id=self.gage_id),
                    forcing_template_dir=c.FORCING_TEMPLATE_DIR,
                    root_dir=c.FORCING_ROOT_DIR,
                    forcing_configuration=self.forcing_configuration,
                    cycle_datetime=self.cycle_datetime.strftime(
                        mswm_settings.DEFAULT_DATETIME_FORMAT
                    ),
                    cold_start_datetime=self.cold_start_datetime.strftime(
                        mswm_settings.DEFAULT_DATETIME_FORMAT
                    )
                    if self.cold_start_datetime
                    else None,
                    global_domain=self.global_domain,
                    forcing_static_dir=self.forcing_static_dir,
                    scratch_dir_override=c.SCRATCH_DIR_OVERRIDE,
                    input_forcing_dirs_override_root=c.INPUT_FORCING_DIRS_OVERRIDE_ROOT,
                    forcing_product_versions=c.FORCING_PRODUCT_VERSIONS_DICT,
                ),
                Parallel=make_parallel_config(self.nprocs),
            ),
        }
        return realization_kwargs

RTETestConfig #

Bases: RTESetup

Configuration class for building and running a set of test realizations.

ATTRIBUTE DESCRIPTION
delete_scratch_and_mesh_first

Causes scratch dir and intermediary mesh to be deleted first

TYPE: bool

delete_forcing_raw_input_first

Causes realtime forcing data cache dir to be deleted first

TYPE: bool

skip_forecast

Causes forecast to be skipped (only do calibration)

TYPE: bool

quit_forecast_after_forcing_running

Causes forecasts to be stopped midway once log files indicate that the model is well underway

TYPE: bool

quit_forecast_after_duration

Causes forecasts to be stopped midway after a set duration (seconds of processing time)

TYPE: float | None = Field(ge=0)

do_calibration

Causes calibration to be ran, before forecasts (needed if a calibration has not yet been ran for the gage)

TYPE: bool

quit_calibration_after_duration

Causes calibrations to be stopped midway after a set duration (seconds of processing time)

TYPE: float | None = Field(ge=0)

objective_functions

For calibration, list of objective functions to run, e.g. "kge". Replaced with full list when do_all_objective_functions = True

TYPE: list[CalObjective]

do_all_objective_functions

For calibration, causes all objective functions to be used.

TYPE: bool

optimization_algorithms

For calibration, list of optimization algorithms to run, e.g. "dds". Replaced with full list when do_all_optimization_algorithms = True

TYPE: list[CalOptimizationAlgo]

do_all_optimization_algorithms

For calibration, causes all optimization algorithms to be used.

TYPE: bool

do_all_forcing_configs

Causes all forcing configurations to be used, e.g. "short_range", "standard_ana", "medium_range_blend", "extended_ana", "short_range_hawaii", etc.

TYPE: bool

do_coldstart

Causes coldstart to be ran before forecast.

TYPE: bool

fcst_run_name

Name of the forecast realization run. Affects a directory name.

TYPE: str

nprocs

Number of processors to use

TYPE: int = Field(ge=1)

gage_id__gage_vintage

Gage ID and vintage

TYPE: list[str] = Field(min_length=2, max_length=2)

global_domain

e.g. "CONUS", "Hawaii", "Alaska", "PuertoRico"

TYPE: str

forcing_provider

Forcing provider, i.e. "bmi" or "csv"

TYPE: str

forcing_static_dir

Forcing static directory

TYPE: str

noop

Causes a noop to occur (for confirming that Python packages are importable).

TYPE: bool

gage_id

Gage ID

TYPE: str = Field(init=False, default=None)

gage_vintage

Gage vintage

TYPE: str = Field(init=False, default=None)

Source code in bin_mounted/configs.py
class RTETestConfig(RTESetup):
    """Configuration class for building and running a set of test realizations.

    Attributes
    ----------
    delete_scratch_and_mesh_first: bool
        Causes scratch dir and intermediary mesh to be deleted first
    delete_forcing_raw_input_first: bool
        Causes realtime forcing data cache dir to be deleted first
    skip_forecast: bool
        Causes forecast to be skipped (only do calibration)
    quit_forecast_after_forcing_running: bool
        Causes forecasts to be stopped midway once log files indicate that the model is well underway
    quit_forecast_after_duration: float | None = Field(ge=0)
        Causes forecasts to be stopped midway after a set duration (seconds of processing time)
    do_calibration: bool
        Causes calibration to be ran, before forecasts (needed if a calibration has not yet been ran for the gage)
    quit_calibration_after_duration: float | None = Field(ge=0)
        Causes calibrations to be stopped midway after a set duration (seconds of processing time)
    objective_functions: list[c.CalObjective]
        For calibration, list of objective functions to run, e.g. "kge". Replaced with full list when do_all_objective_functions = True
    do_all_objective_functions: bool
        For calibration, causes all objective functions to be used.
    optimization_algorithms: list[c.CalOptimizationAlgo]
        For calibration, list of optimization algorithms to run, e.g. "dds". Replaced with full list when do_all_optimization_algorithms = True
    do_all_optimization_algorithms: bool
        For calibration, causes all optimization algorithms to be used.
    do_all_forcing_configs: bool
        Causes all forcing configurations to be used, e.g. "short_range", "standard_ana", "medium_range_blend", "extended_ana", "short_range_hawaii", etc.
    do_coldstart: bool
        Causes coldstart to be ran before forecast.
    fcst_run_name: str
        Name of the forecast realization run. Affects a directory name.
    nprocs: int = Field(ge=1)
        Number of processors to use
    gage_id__gage_vintage: list[str] = Field(min_length=2, max_length=2)
        Gage ID and vintage
    global_domain: str
        e.g. "CONUS", "Hawaii", "Alaska", "PuertoRico"
    forcing_provider: str
        Forcing provider, i.e. "bmi" or "csv"
    forcing_static_dir: str
        Forcing static directory
    noop: bool
        Causes a noop to occur (for confirming that Python packages are importable).
    gage_id: str = Field(init=False, default=None)
        Gage ID
    gage_vintage: str = Field(init=False, default=None)
        Gage vintage
    """

    model_config = ConfigDict(strict=True, arbitrary_types_allowed=True)

    delete_scratch_and_mesh_first: bool
    delete_forcing_raw_input_first: bool
    skip_forecast: bool
    quit_forecast_after_forcing_running: bool
    quit_forecast_after_duration: float | None = Field(ge=0)
    do_calibration: bool
    quit_calibration_after_duration: float | None = Field(ge=0)
    # Replaced with full list when do_all_objective_functions = True
    objective_functions: list[c.CalObjective]
    do_all_objective_functions: bool
    # Replaced with full list when do_all_optimization_algorithms = True
    optimization_algorithms: list[c.CalOptimizationAlgo]
    do_all_optimization_algorithms: bool
    do_all_forcing_configs: bool
    do_coldstart: bool
    fcst_run_name: str
    nprocs: int = Field(ge=1)
    gage_id__gage_vintage: list[str] = Field(min_length=2, max_length=2)
    global_domain: str
    forcing_provider: str
    forcing_static_dir: str
    noop: bool

    # Set after init
    gage_id: str = Field(init=False, default=None)
    gage_vintage: str = Field(init=False, default=None)

    def model_post_init(self, __context) -> None:
        super().model_post_init(__context)  # Call RTESetup's post init

        errors = []

        if self.quit_forecast_after_forcing_running:
            errors.append(
                RuntimeError(
                    "quit_forecast_after_forcing_running is currently not allowed, pending updates."
                )
            )

        self.gage_id, self.gage_vintage, errors_extend = parse_gage_id__gage_vintage(
            self.gage_id__gage_vintage
        )
        errors.extend(errors_extend)

        errors_extend = parse_fcst_run_name(self.fcst_run_name)
        errors.extend(errors_extend)

        if self.do_all_objective_functions:
            self.objective_functions = list(c.CalObjective)
        if self.do_all_optimization_algorithms:
            self.optimization_algorithms = list(c.CalOptimizationAlgo)

        if self.do_all_forcing_configs:
            if self.skip_forecast and (not self.do_coldstart):
                errors.append(
                    ValueError(
                        f"When do_all_forcing_configs={self.do_all_forcing_configs}, must have coldstart and/or forecast enabled."
                    )
                )

        if errors:
            raise RuntimeError(errors)

    def get_calib_permutations(
        self,
    ) -> list[tuple[c.CalObjective, c.CalOptimizationAlgo, TestPaths]]:
        """Returns the permutations of objective function and optimization algorithm specified in the config, as well as a TestPaths instance for each.
        If only_first, then only the first permutation will be returned. Else all permutations will be returned."""
        ret = []
        for obj_func in self.objective_functions:
            if obj_func == c.CalOptimizationAlgo.none:
                # TODO enable objective function "none" for supported circumstances
                continue
            for optim_algo in self.optimization_algorithms:
                if optim_algo == c.CalOptimizationAlgo.none:
                    # TODO enable optimization algo "none" for supported circumstances
                    continue
                ret.append(
                    (
                        obj_func,
                        optim_algo,
                        TestPaths(
                            self.gage_id,
                            self.gage_vintage,
                            obj_func,
                            optim_algo,
                            self.global_domain,
                            self.forcing_provider,
                            self.forcing_static_dir,
                        ),
                    )
                )
        return ret

get_calib_permutations #

get_calib_permutations() -> (
    list[
        tuple[CalObjective, CalOptimizationAlgo, TestPaths]
    ]
)

Returns the permutations of objective function and optimization algorithm specified in the config, as well as a TestPaths instance for each. If only_first, then only the first permutation will be returned. Else all permutations will be returned.

Source code in bin_mounted/configs.py
def get_calib_permutations(
    self,
) -> list[tuple[c.CalObjective, c.CalOptimizationAlgo, TestPaths]]:
    """Returns the permutations of objective function and optimization algorithm specified in the config, as well as a TestPaths instance for each.
    If only_first, then only the first permutation will be returned. Else all permutations will be returned."""
    ret = []
    for obj_func in self.objective_functions:
        if obj_func == c.CalOptimizationAlgo.none:
            # TODO enable objective function "none" for supported circumstances
            continue
        for optim_algo in self.optimization_algorithms:
            if optim_algo == c.CalOptimizationAlgo.none:
                # TODO enable optimization algo "none" for supported circumstances
                continue
            ret.append(
                (
                    obj_func,
                    optim_algo,
                    TestPaths(
                        self.gage_id,
                        self.gage_vintage,
                        obj_func,
                        optim_algo,
                        self.global_domain,
                        self.forcing_provider,
                        self.forcing_static_dir,
                    ),
                )
            )
    return ret

ForcingProviderPaths #

Bases: BaseModel

Helper class for managing model paths.

Source code in bin_mounted/configs.py
class ForcingProviderPaths(BaseModel):
    """Helper class for managing model paths."""

    model_config = ConfigDict(strict=True)
    forcing_provider: Literal["csv", "bmi"]
    global_domain: str  # e.g. CONUS. TODO restrict choices
    forcing_static_dir: str

    def get_forcing_dir(self, gage_id: str | None) -> str | None:
        if self.forcing_provider == "csv":
            if not gage_id:
                raise ValueError(
                    "Gage ID must be provided when forcing_provider == 'csv'"
                )
            return c.CSV_FORCING_DIR_FORMAT.format(
                global_domain=self.global_domain, gage_id=gage_id
            )
        elif self.forcing_provider == "bmi":
            return self.forcing_static_dir
        else:
            raise ValueError(f"Unexpected forcing_provider: {self.forcing_provider}")

    @property
    def formulation_name(self) -> str:
        """Formulation name, as a part of the model path."""
        return f"test_{self.forcing_provider}"

formulation_name property #

formulation_name: str

Formulation name, as a part of the model path.

CalibTimeWindows #

Bases: BaseModel

Calibration time windows defined by a start time and some timedelta offsets.

Source code in bin_mounted/configs.py
class CalibTimeWindows(BaseModel):
    """Calibration time windows defined by a start time
    and some timedelta offsets."""

    calib_sim_start: datetime = Field(default=c.CALIB_SIM_START_DEFAULT)
    calib_sim_duration: timedelta = Field(default=c.CALIB_SIM_DURATION_DEFAULT)
    # Delayed start from calibration simulation, for warmup
    calib_eval_delayment: timedelta = Field(default=c.CALIB_EVAL_DELAYMENT_DEFAULT)
    # Validation simulation starts before calibration simulation, by this amount
    valid_sim_advancement: timedelta = Field(default=c.VALID_SIM_ADVANCEMENT_DEFAULT)
    # Valid eval window cut short by this amount
    valid_eval_curtailment: timedelta = Field(default=c.VALID_EVAL_CURTAILMENT_DEFAULT)

    @property
    def calib_sim_end(self) -> datetime:
        """End of the calibration simulation window."""
        return self.calib_sim_start + self.calib_sim_duration

    @property
    def calib_eval_start(self) -> datetime:
        """Start of the calibration evaluation window."""
        return self.calib_sim_start + self.calib_eval_delayment

    @property
    def calib_eval_end(self) -> datetime:
        """End of the calibration evaluation window."""
        return self.calib_sim_end

    @property
    def valid_sim_start(self) -> datetime:
        """Start of the validation simulation window."""
        return self.calib_sim_start - self.valid_sim_advancement

    @property
    def valid_sim_end(self) -> datetime:
        """End of the validation simulation window."""
        return self.calib_sim_end

    @property
    def valid_eval_start(self) -> datetime:
        """Start of the validation evaluation window."""
        return self.calib_sim_start

    @property
    def valid_eval_end(self) -> datetime:
        """End of the validation evaluation window."""
        return self.calib_sim_end - self.valid_eval_curtailment

    @property
    def full_eval_start(self) -> datetime:
        """Start of the full evaluation window"""
        return self.calib_sim_start

    @property
    def full_eval_end(self) -> datetime:
        """End of the full evaluation window"""
        return self.calib_sim_end

calib_sim_end property #

calib_sim_end: datetime

End of the calibration simulation window.

calib_eval_start property #

calib_eval_start: datetime

Start of the calibration evaluation window.

calib_eval_end property #

calib_eval_end: datetime

End of the calibration evaluation window.

valid_sim_start property #

valid_sim_start: datetime

Start of the validation simulation window.

valid_sim_end property #

valid_sim_end: datetime

End of the validation simulation window.

valid_eval_start property #

valid_eval_start: datetime

Start of the validation evaluation window.

valid_eval_end property #

valid_eval_end: datetime

End of the validation evaluation window.

full_eval_start property #

full_eval_start: datetime

Start of the full evaluation window

full_eval_end property #

full_eval_end: datetime

End of the full evaluation window

make_parallel_config #

make_parallel_config(nprocs: int) -> ParallelConfig

Build and return the ParallelConfig instance.

Source code in bin_mounted/configs.py
def make_parallel_config(nprocs: int) -> ParallelConfig:
    """Build and return the ParallelConfig instance."""
    if nprocs and nprocs > 1:
        parallel = ParallelConfig(
            parallel_ngen_exe=c.NGEN_BIN__LINK,
            partition_generator_exe=c.PARTITION_GENERATOR_BIN__LINK,
            nprocs=nprocs,
        )
    else:
        parallel = ParallelConfig(nprocs=nprocs)
    return parallel

parse_gage_id__gage_vintage #

parse_gage_id__gage_vintage(
    gage_id__gage_vintage: tuple[str, str],
) -> tuple[str | None, str | None, list[Exception]]

Parse the provided string and split it into two strings: gage_id and gage_vintage

Source code in bin_mounted/configs.py
def parse_gage_id__gage_vintage(
    gage_id__gage_vintage: tuple[str, str],
) -> tuple[str | None, str | None, list[Exception]]:
    """Parse the provided string and split it into two strings: gage_id and gage_vintage"""
    errors: list[Exception] = []
    gage_id, gage_vintage = gage_id__gage_vintage

    if gage_id != gage_id.strip():
        errors.append(
            ValueError(f"Whitespace found on end of gage_id: {repr(gage_id)}")
        )
        gage_id = None

    if gage_vintage != gage_vintage.strip():
        errors.append(
            ValueError(f"Whitespace found on end of gage_vintage: {repr(gage_vintage)}")
        )
        gage_vintage = None

    return gage_id, gage_vintage, errors

parse_fcst_run_name #

parse_fcst_run_name(fcst_run_name: str) -> list[Exception]

Validate the provided forecast run name, and return a list of errors.

Source code in bin_mounted/configs.py
def parse_fcst_run_name(fcst_run_name: str) -> list[Exception]:
    """Validate the provided forecast run name, and return a list of errors."""
    errors: list[Exception] = []
    if fcst_run_name != fcst_run_name.strip():
        errors.append(
            ValueError(
                f"Whitespace found on end of fcst_run_name: {repr(fcst_run_name)}"
            )
        )
    return errors

get_data_paths_for_lstm #

get_data_paths_for_lstm(
    global_domain: str, gage_id: str
) -> tuple[str | None, str | None, list[Exception]]

Build and return two data paths needed for LSTM model, as well as a list of errors encountered during this function. Return None for obs_dir and nwmretro_file if not LSTM.

Source code in bin_mounted/configs.py
def get_data_paths_for_lstm(
    global_domain: str,
    gage_id: str,
) -> tuple[str | None, str | None, list[Exception]]:
    """Build and return two data paths needed for LSTM model,
    as well as a list of errors encountered during this function.
    Return None for obs_dir and nwmretro_file if not LSTM."""

    errors: list[Exception] = []

    if "lstm" in c.MODELS.lower():
        obs_dir = find_obs_dir(global_domain, gage_id)
        nwmretro_file = f"{c.NWM_RETRO_STREAMFLOW_DIR}/{gage_id}.csv"
        if not os.path.exists(obs_dir):
            errors.append(NotADirectoryError(obs_dir))
        if not os.path.exists(nwmretro_file):
            errors.append(FileNotFoundError(nwmretro_file))
    else:
        obs_dir = None
        nwmretro_file = None

    return obs_dir, nwmretro_file, errors

find_obs_dir #

find_obs_dir(global_domain: str, gage_id: str) -> str

Search the grandparent directory of observed flow csv files to determine the directory that contains one of them. Assert that only one such csv file is found. If multiple are found, then this function needs to be reworked to handle more complex situations, such as multiple vintages of this data existing on disk.

Source code in bin_mounted/configs.py
def find_obs_dir(global_domain: str, gage_id: str) -> str:
    """Search the grandparent directory of observed flow csv files to determine
    the directory that contains one of them.  Assert that only one such csv file is found.
    If multiple are found, then this function needs to be reworked to handle more complex
    situations, such as multiple vintages of this data existing on disk."""
    grandparent = f"{c.DEFAULT_MAIN_DIR}/data/streamflow_observations/{global_domain}"
    print(f"Searching directory for observed flow files: {grandparent}")
    candidate_csvs = []
    for root, dirs, files in os.walk(grandparent):
        dirs.sort()
        files.sort()
        for fn in files:
            pattern = f"^{gage_id}_hourly_discharge.csv$"
            if re.fullmatch(pattern, fn):
                candidate_csvs.append(os.path.join(root, fn))
    if len(candidate_csvs) != 1:
        raise ValueError(
            f"Expected to find 1 candidate csv for observed flow for global_domain={global_domain}, gage_id={gage_id}, but found {len(candidate_csvs)} when searching from {repr(grandparent)}: {candidate_csvs}"
        )
    obs_dir = os.path.dirname(candidate_csvs[0])
    return obs_dir

Regionalization Workflows#

The regionalization workflows run via command-line interface (CLI) Scripts.

Here is the argparse help menu of run_regionalization.py:

run_regionalization.py --help

run_regionalization is called by run_region.sh

For more information, see the nwm-region-mgr repository.

run_regionalization #

RTE regionalization workflow runner script.