multitask.base.BaseTask

multitask.base.BaseTask(
    config=None,
    *,
    dataframe=None,
    data_test=None,
    cache_home=None,
    log_level=logging.INFO,
    **overrides,
)

Shared base for all multi-target forecasting pipeline tasks.

BaseTask encapsulates the data-preparation pipeline (steps 1-7) and all helper methods shared across the task modes (lazy, defaults, predict, clean). Subclasses implement the run method with task-specific training or prediction logic.

The constructor takes a single config object satisfying the PipelineConfig protocol — typically a ConfigMulti. All pipeline parameters (forecast horizon, training window, outlier policy, weather/ holiday hooks, cross-validation fold count, persistence policy, …) live on that object. Only genuinely call-time state (the dataframes, the cache directory override, the logging level) is passed as separate kwargs. Extra **overrides are forwarded to config.set_params and mutate the passed-in config in place.

Plotting is not available in spotforecast2-safe. The _show_prediction_figure and _show_prediction_figure_agg hook methods are no-ops; override them in a subclass or use the spotforecast2 sibling package for interactive visualisation.

Parameters

Name Type Description Default
config Optional[PipelineConfig] A PipelineConfig-conforming object owning every pipeline parameter. ConfigMulti satisfies the protocol. None
dataframe Optional[pd.DataFrame] Pre-loaded input DataFrame with training data. Must contain a datetime column matching config.index_name plus at least one numeric target column. None
data_test Optional[pd.DataFrame] Pre-loaded test DataFrame (ground truth for the forecast horizon). Optional. None
cache_home Optional[Path] Cache directory override. When not None, replaces config.cache_home for this task instance. None
log_level int Logging level for the pipeline logger. logging.INFO
**overrides Any Forwarded to config.set_params(**overrides) — a convenience for one-line tweaks without building a fresh config. Mutates the caller’s config object. {}

Attributes

Name Type Description
config PipelineConfig Centralised pipeline configuration.
df_pipeline pd.DataFrame Pipeline DataFrame after preparation.
df_test pd.DataFrame Test DataFrame (ground truth).
weight_func Optional[Any] Sample-weight function from imputation.
exogenous_features pd.DataFrame Combined exogenous feature matrix.
exog_feature_names List[str] Selected exogenous feature names.
data_with_exog pd.DataFrame Merged target + exogenous data.
exo_pred pd.DataFrame Exogenous covariates for the forecast horizon.
results Dict[str, Dict] Per-task mapping of target name to prediction package.
agg_results Dict Mapping of task name to aggregated prediction package.

Examples

import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask.base import BaseTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

rng = np.random.default_rng(0)
idx = pd.date_range("2023-01-01", periods=24 * 7, freq="h", tz="UTC")
df = pd.DataFrame({"load": rng.normal(500, 30, len(idx))}, index=idx)
df.index.name = "DateTime"

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(
        predict_size=6,
        use_exogenous_features=False,
        use_outlier_detection=False,
        cache_home=tmp,
        auto_save_models=False,
        verbose=False,
    )
    task = BaseTask(cfg, dataframe=df)
    print(f"Task mode: {task.TASK}")
    print(f"Config predict_size: {task.config.predict_size}")
Task mode: lazy
Config predict_size: 6

Methods

Name Description
agg_predictor Aggregate per-target prediction packages into a weighted forecast.
build_exogenous_features Build, combine, encode, and merge exogenous feature covariates.
create_forecaster Create a fresh forecaster for the given target.
cv_ts Build a TimeSeriesFold for cross-validation.
detect_outliers Apply hard-bound filtering and IsolationForest outlier detection.
impute Fill missing values using the configured imputation strategy.
load_models Load the most recent fitted models from the cache directory.
load_tuning_results Load the most recent tuning results for a target from cache.
log_summary Log a summary of the current pipeline configuration.
plot_with_outliers Visualise original vs. cleaned data with outlier markers.
prepare_data Load, resample, validate, and configure the pipeline data.
run Execute the task-specific training / prediction pipeline.
save_models Save fitted forecaster models to the cache directory.
save_tuning_results Save tuning results (best parameters and lags) to a JSON file.

agg_predictor

multitask.base.BaseTask.agg_predictor(results, targets, weights)

Aggregate per-target prediction packages into a weighted forecast.

Delegates to the module-level agg_predictor function. Available as an instance method so that subclasses can override the aggregation strategy when needed.

Parameters

Name Type Description Default
results Dict[str, Dict[str, Any]] Mapping of target name to prediction package (as returned by build_prediction_package). required
targets List[str] Ordered list of target names to include. required
weights List[float] Per-target aggregation weights aligned with targets. required

Returns

Name Type Description
Dict[str, Any] Aggregated prediction package dict.

Examples

import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask import LazyTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

rng = np.random.default_rng(0)
idx_train = pd.date_range("2023-01-01", periods=48, freq="h", tz="UTC")
idx_future = pd.date_range("2023-01-03", periods=6, freq="h", tz="UTC")

def _pkg(train_val, future_val):
    return {
        "train_actual": pd.Series(np.full(48, train_val), index=idx_train),
        "train_pred": pd.Series(np.full(48, train_val * 0.99), index=idx_train),
        "future_pred": pd.Series(np.full(6, future_val), index=idx_future),
        "future_actual": pd.Series(dtype="float64"),
    }

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(cache_home=tmp, verbose=False)
    task = LazyTask(cfg)
    results = {"wind": _pkg(100.0, 110.0), "solar": _pkg(200.0, 210.0)}
    agg = task.agg_predictor(results, ["wind", "solar"], [0.4, 0.6])
    print(f"Weighted future_pred: {agg['future_pred'].iloc[0]:.1f}")
Weighted future_pred: 170.0

build_exogenous_features

multitask.base.BaseTask.build_exogenous_features()

Build, combine, encode, and merge exogenous feature covariates.

This is step 4-7 of the pipeline (run after prepare_data, detect_outliers, and impute). It assembles the full exogenous-covariate matrix that the forecaster consumes, then merges it onto the target data. The orchestration proceeds in order:

  • 4a — Weather, via get_weather_features (Open-Meteo). The response is parquet-cached only when config.cache_home is set. Fetch failures are handled per config.on_weather_failure: "raise" re-raises WeatherFetchError; "skip" logs a warning and continues with an empty weather frame (fail-safe).
  • 4b — Calendar features, via get_calendar_features.
  • 4c — Day/night (solar) features, via get_day_night_features (computed with astral from config.latitude / config.longitude).
  • 4d — Holiday features, via get_holiday_features for config.country_code / config.state.
  • 5 — The four frames are concatenated along the columns and any residual gaps are back- then forward-filled. Provider-based exogenous columns are then appended via build_providers_from_config (requires spotforecast2-safe >= 15.7.0). The active providers are governed by the config flags include_covid_infection_rate, include_entsoe_forecast_load, include_entsoe_renewable_forecast, include_entsoe_net_load, and include_entsoe_day_ahead_price. Cyclical (sine/cosine) encoding is then applied via apply_cyclical_encoding, and degree-config.poly_features_degree interaction terms are added via create_interaction_features. When the degree is at least 2, the polynomial columns are ranked by mutual information with the primary target and capped to config.max_poly_features via select_top_poly_features.
  • 6 — The training feature set is chosen via select_exogenous_features, with provider columns appended (order-preserving, de-duplicated).
  • 7 — Targets and covariates are merged via merge_data_and_covariates into self.data_with_exog and the forecast-horizon covariates self.exo_pred.

When config.use_exogenous_features is False the method is a no-op and returns self immediately, leaving the pipeline target-only.

Attributes

Name Type Description
weather_aligned pd.DataFrame Weather frame aligned to the pipeline index, reused by the interaction and selection steps.
exogenous_features pd.DataFrame Full combined, encoded, and capped exogenous feature matrix.
exog_feature_names List[str] Names of the exogenous features selected for training (including provider columns).
data_with_exog pd.DataFrame Target data merged with the selected exogenous covariates.
exo_pred pd.DataFrame Exogenous covariates spanning the forecast horizon, supplied to the forecaster at predict time.

Returns

Name Type Description
BaseTask self (for method chaining).

Raises

Name Type Description
RuntimeError If prepare_data has not been called.
WeatherFetchError If the Open-Meteo fetch fails and config.on_weather_failure == "raise".

Examples

With exogenous features disabled the method is a no-op, so the example below runs without any network access and leaves the pipeline target-only.

import tempfile
import pandas as pd
import numpy as np
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

rng = np.random.default_rng(0)
idx = pd.date_range("2023-01-01", periods=24 * 14, freq="h", tz="UTC")
df = pd.DataFrame({"a": rng.normal(100, 10, len(idx))}, index=idx)
df.index.name = "DateTime"

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(
        predict_size=6,
        use_exogenous_features=False,
        use_outlier_detection=False,
        cache_home=tmp,
    )
    mt = MultiTask(cfg, dataframe=df)
    mt.prepare_data().detect_outliers().impute().build_exogenous_features()
    print(f"Exogenous features used: {mt.config.use_exogenous_features}")
    print(f"Selected exog feature names: {mt.exog_feature_names}")
Exogenous features used: False
Selected exog feature names: []

create_forecaster

multitask.base.BaseTask.create_forecaster(target=None)

Create a fresh forecaster for the given target.

Delegates to config.forecaster_factory when set; otherwise falls back to default_lgbm_forecaster_factory. This factory hook lets callers swap the estimator without subclassing BaseTask.

Parameters

Name Type Description Default
target Optional[str] Optional target column name. Forwarded to the factory so that custom factories can specialise per target. None

Returns

Name Type Description
Any A new, unfitted forecaster instance.

Examples

import tempfile
from pathlib import Path
from spotforecast2_safe.multitask import LazyTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(
        predict_size=6,
        use_exogenous_features=False,
        cache_home=Path(tmp),
    )
    task = LazyTask(cfg)
    forecaster = task.create_forecaster()
print(f"Type: {type(forecaster).__name__}")
print(f"Lags: {forecaster.lags}")
Type: ForecasterRecursive
Lags: [ 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23]

cv_ts

multitask.base.BaseTask.cv_ts(y_train)

Build a TimeSeriesFold for cross-validation.

Constructs the cross-validation splitter used by all tuning tasks. Internally uses sklearn.model_selection.TimeSeriesSplit to compute split boundaries that respect temporal ordering and avoid data leakage between folds.

The validation boundary is determined by run_state.end_train_ts minus config.delta_val. When config.train_size is set, the sklearn splitter uses a sliding fixed-size training window (max_train_size); otherwise an expanding window is used.

Parameters

Name Type Description Default
y_train pd.Series Training time series for the current target. Used both to determine the validation boundary and as the sequence passed to TimeSeriesSplit.split to derive initial_train_size. required

Returns

Name Type Description
TimeSeriesFold A configured TimeSeriesFold instance ready to be passed to
TimeSeriesFold a model-selection function.

Examples

import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

rng = np.random.default_rng(0)
idx = pd.date_range("2023-01-01", periods=24 * 14, freq="h", tz="UTC")
df = pd.DataFrame({"a": rng.normal(100, 10, len(idx))}, index=idx)
df.index.name = "DateTime"

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(
        predict_size=6,
        use_exogenous_features=False,
        use_outlier_detection=False,
        cache_home=tmp,
        number_folds=2,
        auto_save_models=False,
        verbose=False,
    )
    mt = MultiTask(cfg, dataframe=df)
    mt.prepare_data().detect_outliers().impute().build_exogenous_features()
    y_train = mt.df_pipeline["a"]
    cv = mt.cv_ts(y_train)
    print(f"TimeSeriesFold steps: {cv.steps}")
    print(f"initial_train_size: {cv.initial_train_size}")
TimeSeriesFold steps: 6
initial_train_size: 324

detect_outliers

multitask.base.BaseTask.detect_outliers()

Apply hard-bound filtering and IsolationForest outlier detection.

Hard bounds from config.bounds are applied to the pipeline data (out-of-bound values are removed and later filled by impute()). IsolationForest detection (config.use_outlier_detection) is advisory: detected outliers are logged per column but not removed.

Returns

Name Type Description
BaseTask self (for method chaining).

Raises

Name Type Description
RuntimeError If method prepare_data has not been called.

Examples

import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

rng = np.random.default_rng(0)
idx = pd.date_range("2023-01-01", periods=24 * 14, freq="h", tz="UTC")
df = pd.DataFrame({"a": rng.normal(100, 10, len(idx))}, index=idx)
df.index.name = "DateTime"

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(
        predict_size=6,
        use_exogenous_features=False,
        use_outlier_detection=False,
        cache_home=tmp,
        auto_save_models=False,
        verbose=False,
    )
    mt = MultiTask(cfg, dataframe=df)
    mt.prepare_data()
    mt.detect_outliers()
    print(f"Pipeline shape: {mt.df_pipeline.shape}")
    assert mt.df_pipeline_original is not None
Pipeline shape: (336, 1)

impute

multitask.base.BaseTask.impute()

Fill missing values using the configured imputation strategy.

Returns

Name Type Description
BaseTask self (for method chaining).

Raises

Name Type Description
RuntimeError If method prepare_data has not been called.

Examples

import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

rng = np.random.default_rng(0)
idx = pd.date_range("2023-01-01", periods=24 * 14, freq="h", tz="UTC")
values = rng.normal(100, 10, len(idx))
values[10:13] = float("nan")  # inject a few gaps
df = pd.DataFrame({"a": values}, index=idx)
df.index.name = "DateTime"

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(
        predict_size=6,
        use_exogenous_features=False,
        use_outlier_detection=False,
        cache_home=tmp,
        auto_save_models=False,
        verbose=False,
    )
    mt = MultiTask(cfg, dataframe=df)
    mt.prepare_data().detect_outliers().impute()
    missing = mt.df_pipeline["a"].isna().sum()
    print(f"Missing values after imputation: {missing}")
    assert missing == 0
Missing values after imputation: 0

load_models

multitask.base.BaseTask.load_models(
    task_name=None,
    target=None,
    max_age_days=None,
)

Load the most recent fitted models from the cache directory.

Scans <cache_home>/models/<data_frame_name>/ for .joblib files matching the current data_frame_name. Optionally filters by task_name, target, and max_age_days.

Parameters

Name Type Description Default
task_name Optional[str] If given, only load models from this task ("lazy", "defaults", "optuna", or "spotoptim"). None accepts any task. None
target Optional[str] If given, only load the model for this target column. None loads the most recent model for every target found. None
max_age_days Optional[float] Maximum age in days. Models older than this are ignored. None accepts any age. None

Returns

Name Type Description
Dict[str, Any] Mapping {target: forecaster} of loaded model objects.
Dict[str, Any] Empty dict if no matching models were found.

Examples

import tempfile
from pathlib import Path
from spotforecast2_safe.multitask import LazyTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(
        data_frame_name="demo",
        cache_home=Path(tmp),
        verbose=False,
    )
    task = LazyTask(cfg)
    # Save a dummy object, then load it back.
    dummy_forecaster = {"lags": [1, 2, 24]}
    task.save_models(
        task_name="lazy",
        forecasters={"load": dummy_forecaster},
    )
    loaded = task.load_models(task_name="lazy")
    print(f"Loaded targets: {list(loaded.keys())}")
    assert loaded["load"]["lags"] == [1, 2, 24]
Loaded targets: ['load']

load_tuning_results

multitask.base.BaseTask.load_tuning_results(
    target,
    task_name=None,
    max_age_days=None,
)

Load the most recent tuning results for a target from cache.

Scans <cache_home>/tuning_results/ for files matching the current data_frame_name and target. Optionally filters by task_name and discards results older than max_age_days.

Parameters

Name Type Description Default
target str Name of the forecast target column. required
task_name Optional[str] If given, only consider results from this tuning algorithm (e.g. "optuna" or "spotoptim"). None accepts any algorithm. None
max_age_days Optional[float] Maximum age in days. Results older than this are ignored. None accepts any age. None

Returns

Name Type Description
Optional[Dict[str, Any]] A dictionary with keys best_params, best_lags,
Optional[Dict[str, Any]] task_name, target, data_frame_name, and
Optional[Dict[str, Any]] timestamp; or None if no matching file was found.

Examples

import tempfile
from pathlib import Path
from spotforecast2_safe.multitask import LazyTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(data_frame_name="demo10", cache_home=Path(tmp))
    task = LazyTask(cfg)
    task.save_tuning_results(
        target="target_0",
        task_name="optuna",
        best_params={"n_estimators": 100},
        best_lags=24,
    )
    result = task.load_tuning_results(target="target_0")
    print(result["best_params"])
{'n_estimators': 100}

log_summary

multitask.base.BaseTask.log_summary()

Log a summary of the current pipeline configuration.

Examples

import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

rng = np.random.default_rng(0)
idx = pd.date_range("2023-01-01", periods=24 * 14, freq="h", tz="UTC")
df = pd.DataFrame({"a": rng.normal(100, 10, len(idx))}, index=idx)
df.index.name = "DateTime"

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(
        predict_size=6,
        use_exogenous_features=False,
        use_outlier_detection=False,
        cache_home=tmp,
        auto_save_models=False,
        verbose=False,
    )
    mt = MultiTask(cfg, dataframe=df)
    mt.prepare_data().detect_outliers().impute().build_exogenous_features()
    # log_summary writes to the pipeline logger; call it to confirm
    # it runs without error.
    mt.log_summary()
    print("log_summary completed without error")
log_summary completed without error

plot_with_outliers

multitask.base.BaseTask.plot_with_outliers()

Visualise original vs. cleaned data with outlier markers.

Raises

Name Type Description
RuntimeError If method detect_outliers has not been called.
NotImplementedError Always — plotting is not available in spotforecast2-safe. Use the spotforecast2 package for visualisation.

Examples

import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

rng = np.random.default_rng(0)
idx = pd.date_range("2023-01-01", periods=24 * 14, freq="h", tz="UTC")
df = pd.DataFrame({"a": rng.normal(100, 10, len(idx))}, index=idx)
df.index.name = "DateTime"

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(
        predict_size=6,
        use_exogenous_features=False,
        use_outlier_detection=False,
        cache_home=tmp,
        auto_save_models=False,
        verbose=False,
    )
    mt = MultiTask(cfg, dataframe=df)
    mt.prepare_data().detect_outliers()
    try:
        mt.plot_with_outliers()
    except NotImplementedError as exc:
        print(f"Plotting unavailable in spotforecast2-safe: {exc}")
Plotting unavailable in spotforecast2-safe: Plotting is not available in spotforecast2-safe (no plotly/matplotlib). Use the spotforecast2 package for visualisation.

prepare_data

multitask.base.BaseTask.prepare_data(demo_data=None, df_test=None)

Load, resample, validate, and configure the pipeline data.

Uses the following precedence for the training data:

  1. demo_data argument (if provided).
  2. self._dataframe set via the constructor.

Similarly for test data:

  1. df_test argument (if provided).
  2. self.data_test set via the constructor.
  3. self.config.test_data_loader(self.config) if set.

Parameters

Name Type Description Default
demo_data Optional[pd.DataFrame] Pre-loaded input DataFrame. When None, the constructor dataframe is used. None
df_test Optional[pd.DataFrame] Pre-loaded test DataFrame. When None, the constructor data_test is used, then config.test_data_loader. None

Returns

Name Type Description
BaseTask self (for method chaining).

Raises

Name Type Description
ValueError If no data source is available (no demo_data, no constructor dataframe).

Examples

import tempfile
import pandas as pd
import numpy as np
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

rng = np.random.default_rng(0)
idx = pd.date_range("2023-01-01", periods=24 * 14, freq="h", tz="UTC")
df = pd.DataFrame({"a": rng.normal(100, 10, len(idx))}, index=idx)
df.index.name = "DateTime"

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(
        predict_size=6,
        use_exogenous_features=False,
        use_outlier_detection=False,
        cache_home=tmp,
    )
    mt = MultiTask(cfg, dataframe=df)
    mt.prepare_data()
    print(f"Pipeline shape: {mt.df_pipeline.shape}")
    print(f"Targets: {mt.run_state.targets}")
Pipeline shape: (336, 1)
Targets: ['a']

run

multitask.base.BaseTask.run(
    show=False,
    task=None,
    task_name=None,
    use_tuned_params=True,
    max_age_days=None,
    search_space=None,
    dry_run=False,
    cache_home=None,
    **kwargs,
)

Execute the task-specific training / prediction pipeline.

Subclasses must override this method.

Parameters

Name Type Description Default
show bool If True, invoke the visualisation hooks (no-ops in this package; meaningful only in spotforecast2). False
task Optional[str] Task mode override (used by MultiTask). None
task_name Optional[str] Restrict model loading to a specific source task (used by PredictTask). None
use_tuned_params bool Load cached tuning results when available (used by LazyTask). True
max_age_days Optional[float] Maximum age in days for cached results (used by LazyTask and PredictTask). Freshness is judged against the wall-clock timestamp embedded in the cache filename, so the check is machine-local. None
search_space Optional[Any] Hyperparameter search-space definition (accepted for API compatibility; not used in this package). None
dry_run bool Report what would be deleted without removing anything (used by CleanTask). False
cache_home Optional[Path] Override the cache directory (used by CleanTask). None
**kwargs Any Additional task-specific arguments. {}

Returns

Name Type Description
Dict[str, Any] Aggregated prediction package for the task.

Raises

Name Type Description
NotImplementedError Always, unless overridden by a subclass.

Examples

import tempfile
from pathlib import Path
from spotforecast2_safe.multitask.base import BaseTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

# BaseTask.run is abstract and always raises NotImplementedError.
# Concrete subclasses (LazyTask, DefaultsTask, PredictTask, CleanTask)
# provide the real implementation.
with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(cache_home=Path(tmp), verbose=False)
    task = BaseTask(cfg)
    try:
        task.run()
    except NotImplementedError as exc:
        print(f"Expected: {exc}")
Expected: BaseTask must implement run(). Use LazyTask, DefaultsTask, PredictTask, or CleanTask.

save_models

multitask.base.BaseTask.save_models(task_name, forecasters=None)

Save fitted forecaster models to the cache directory.

Each model is serialised with joblib (compress=3) into <cache_home>/models/<data_frame_name>/ using a datetime-stamped filename so that multiple snapshots can coexist.

Filename format::

<data_frame_name>_<target>_<task_name>_<YYYYMMDD_HHMMSS>.joblib

If forecasters is None the method collects fitted models from self.results[task_name], where each prediction package is expected to contain a "forecaster" key.

Parameters

Name Type Description Default
task_name str Task identifier ("lazy", "defaults"). The names "optuna" and "spotoptim" are also accepted so that model caches produced by the spotforecast2 sibling package can be saved and loaded; no tuning is performed in this package. required
forecasters Optional[Dict[str, Any]] Optional mapping {target: fitted_forecaster}. When None, models are taken from the prediction packages stored in self.results. None

Returns

Name Type Description
Dict[str, Path] Mapping {target: Path} of saved model file paths.

Raises

Name Type Description
ValueError If task_name is not one of "lazy", "defaults", "optuna", "spotoptim".
RuntimeError If no fitted models are available for the requested task.

Examples

import tempfile
from pathlib import Path
from spotforecast2_safe.multitask import LazyTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(
        data_frame_name="demo",
        cache_home=Path(tmp),
        verbose=False,
    )
    task = LazyTask(cfg)
    # Supply a tiny in-memory object as a stand-in for a fitted forecaster.
    dummy_forecaster = object()
    saved = task.save_models(
        task_name="lazy",
        forecasters={"load": dummy_forecaster},
    )
    print(f"Saved targets: {list(saved.keys())}")
    assert saved["load"].suffix == ".joblib"
Saved targets: ['load']

save_tuning_results

multitask.base.BaseTask.save_tuning_results(
    target,
    task_name,
    best_params,
    best_lags,
)

Save tuning results (best parameters and lags) to a JSON file.

The file is stored under <cache_home>/tuning_results/ with a datetime-stamped filename so that loaders can determine freshness.

Filename format::

<data_frame_name>_<target>_<task_name>_<YYYYMMDD_HHMMSS>.json

Parameters

Name Type Description Default
target str Name of the forecast target column. required
task_name str Tuning algorithm identifier (e.g. "optuna", "spotoptim"). required
best_params Dict[str, Any] Best hyperparameters discovered during tuning. required
best_lags Any Best lag configuration (int, list, or nested list). required

Returns

Name Type Description
Path Path to the saved JSON file.

Examples

import tempfile
from pathlib import Path
from spotforecast2_safe.multitask import LazyTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti

with tempfile.TemporaryDirectory() as tmp:
    cfg = ConfigMulti(data_frame_name="demo10", cache_home=Path(tmp))
    task = LazyTask(cfg)
    path = task.save_tuning_results(
        target="target_0",
        task_name="optuna",
        best_params={"n_estimators": 100, "learning_rate": 0.05},
        best_lags=[1, 2, 24],
    )
    print(path.name[:10])
demo10_tar