manager.multitask.BaseTask

manager.multitask.BaseTask(
    dataframe=None,
    data_test=None,
    data_frame_name='default',
    cache_home=None,
    agg_weights=None,
    index_name='DateTime',
    number_folds=10,
    predict_size=24,
    bounds=None,
    contamination=0.03,
    imputation_method='weighted',
    use_exogenous_features=True,
    n_trials_optuna=15,
    n_trials_spotoptim=10,
    n_initial_spotoptim=5,
    auto_save_models=True,
    train_days=365 * 2,
    val_days=7 * 2,
    log_level=logging.INFO,
    verbose=False,
    **config_overrides,
)

Shared base for all multi-target forecasting pipeline tasks.

BaseTask encapsulates the data-preparation pipeline (steps 1–7) and all helper methods shared across the five task modes (lazy, optuna, spotoptim, predict, clean). Subclasses implement the run method with task-specific training, tuning, or prediction logic.

Parameters

Name Type Description Default
dataframe Optional[pd.DataFrame] Pre-loaded input DataFrame with training data. The DataFrame must contain a datetime column matching index_name plus at least one numeric target column. None
data_test Optional[pd.DataFrame] Pre-loaded input DataFrame with test data (ground truth for the forecast horizon). The DataFrame must contain a datetime column matching index_name plus at least one numeric target column. Optional. None
data_frame_name str Identifier for the active dataset, used for cache-directory naming and model file naming. 'default'
cache_home Optional[Path] Cache directory path. String or Path. None
agg_weights Optional[List[float]] Per-target aggregation weights. None
index_name str Datetime column name in the raw CSV. 'DateTime'
number_folds int Number of validation folds for hyperparameter tuning. 10
predict_size int Forecast horizon in hours. 24
bounds Optional[List[tuple]] Per-column hard outlier bounds (lower, upper). None
contamination float IsolationForest contamination fraction. 0.03
imputation_method str Gap-filling strategy — "weighted" or "linear". 'weighted'
use_exogenous_features bool Whether to build exogenous features. True
train_days int Number of days in the training window. 365 * 2
val_days int Number of days in each validation fold. Note that the total validation window is val_days * number_folds. Each fold is a contiguous block of val_days days, and folds are non-overlapping and sequential immediately after the training window. 7 * 2
n_trials_optuna int Number of Optuna Bayesian-search trials. 15
n_trials_spotoptim int Number of SpotOptim surrogate-search trials. 10
n_initial_spotoptim int Initial random evaluations for SpotOptim. 5
auto_save_models bool Whether to automatically save fitted models to disk after each training run. Defaults to True so that saved models are immediately available for PredictTask without any manual call to save_models(). True
log_level int Logging level for the pipeline logger. logging.INFO
verbose bool Whether to print verbose messages during data preparation and outlier detection. Defaults to False. False
config_overrides Any Extra keyword arguments forwarded to ConfigMulti. {}

Attributes

Name Type Description
config ConfigMulti Centralised pipeline configuration.
df_pipeline pd.DataFrame Pipeline DataFrame after preparation.
df_test pd.DataFrame Test DataFrame (ground truth).
weight_func Optional[Any] Sample-weight function from imputation.
exogenous_features pd.DataFrame Combined exogenous feature matrix.
exog_feature_names List[str] Selected exogenous feature names.
data_with_exog pd.DataFrame Merged target + exogenous data.
exo_pred pd.DataFrame Exogenous covariates for the forecast horizon.
results Dict[str, Dict] Per-task mapping of target name to prediction package.
agg_results Dict Mapping of task name to aggregated prediction package.

Methods

Name Description
agg_predictor Aggregate per-target prediction packages into a weighted forecast.
build_exogenous_features Build, combine, encode, and merge exogenous feature covariates.
create_forecaster Create a fresh ForecasterRecursive with shared configuration.
cv_ts Build a TimeSeriesFold for cross-validation.
detect_outliers Apply hard-bound filtering and IsolationForest outlier detection.
impute Fill missing values using the configured imputation strategy.
load_models Load the most recent fitted models from the cache directory.
load_tuning_results Load the most recent tuning results for a target from cache.
log_summary Log a summary of the current pipeline configuration.
plot_with_outliers Visualise original vs. cleaned data with outlier markers.
prepare_data Load, resample, validate, and configure the pipeline data.
run Execute the task-specific training / tuning pipeline.
save_models Save fitted forecaster models to the cache directory.
save_tuning_results Save tuning results (best parameters and lags) to a JSON file.

agg_predictor

manager.multitask.BaseTask.agg_predictor(results, targets, weights)

Aggregate per-target prediction packages into a weighted forecast.

Delegates to the module-level agg_predictor function. Available as an instance method so that subclasses can override the aggregation strategy when needed.

Parameters

Name Type Description Default
results Dict[str, Dict[str, Any]] Mapping of target name to prediction package (as returned by build_prediction_package). required
targets List[str] Ordered list of target names to include. required
weights List[float] Per-target aggregation weights aligned with targets. required

Returns

Name Type Description
Dict[str, Any] Aggregated prediction package dict.

build_exogenous_features

manager.multitask.BaseTask.build_exogenous_features()

Build, combine, encode, and merge exogenous feature covariates.

Returns

Name Type Description
BaseTask self (for method chaining).

Raises

Name Type Description
RuntimeError If method prepare_data has not been called.

create_forecaster

manager.multitask.BaseTask.create_forecaster()

Create a fresh ForecasterRecursive with shared configuration.

Returns

Name Type Description
Any A new, unfitted ForecasterRecursive instance.

Examples

from spotforecast2.manager.multitask import LazyTask

task = LazyTask(predict_size=24)
forecaster = task.create_forecaster()
print(f"Type: {type(forecaster).__name__}")
print(f"Lags: {forecaster.lags}")
Type: ForecasterRecursive
Lags: [ 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23]

cv_ts

manager.multitask.BaseTask.cv_ts(y_train)

Build a TimeSeriesFold for cross-validation.

Constructs the cross-validation splitter used by all tuning tasks (OptunaTask, SpotOptimTask).

Internally uses sklearn.model_selection.TimeSeriesSplit to compute split boundaries that respect temporal ordering and avoid data leakage between folds. Classical cross-validation techniques such as KFold assume i.i.d. samples and yield unreliable estimates on time series data; sklearn.model_selection.TimeSeriesSplit instead ensures every test fold consists only of observations that come after the corresponding training observations.

The validation boundary is determined by config.end_train_ts minus config.delta_val. When config.train_size is set, the sklearn splitter uses a sliding fixed-size training window (max_train_size); otherwise an expanding window is used so that each subsequent fold sees more historical data.

Parameters

Name Type Description Default
y_train pd.Series Training time series for the current target. Used both to determine the validation boundary and as the sequence passed to sklearn.model_selection.TimeSeriesSplit.split to derive initial_train_size. required

Returns

Name Type Description
TimeSeriesFold A configured TimeSeriesFold instance ready to be passed to
TimeSeriesFold a model-selection function.

detect_outliers

manager.multitask.BaseTask.detect_outliers()

Apply hard-bound filtering and IsolationForest outlier detection.

Returns

Name Type Description
BaseTask self (for method chaining).

Raises

Name Type Description
RuntimeError If method prepare_data has not been called.

impute

manager.multitask.BaseTask.impute()

Fill missing values using the configured imputation strategy.

Returns

Name Type Description
BaseTask self (for method chaining).

Raises

Name Type Description
RuntimeError If method prepare_data has not been called.

load_models

manager.multitask.BaseTask.load_models(
    task_name=None,
    target=None,
    max_age_days=None,
)

Load the most recent fitted models from the cache directory.

Scans <cache_home>/models/<data_frame_name>/ for .joblib files matching the current data_frame_name. Optionally filters by task_name, target, and max_age_days.

Parameters

Name Type Description Default
task_name Optional[str] If given, only load models from this task ("lazy", "optuna", or "spotoptim"). None accepts any task. None
target Optional[str] If given, only load the model for this target column. None loads the most recent model for every target found. None
max_age_days Optional[float] Maximum age in days. Models older than this are ignored. None accepts any age. None

Returns

Name Type Description
Dict[str, Any] Mapping {target: forecaster} of loaded model objects.
Dict[str, Any] Empty dict if no matching models were found.

load_tuning_results

manager.multitask.BaseTask.load_tuning_results(
    target,
    task_name=None,
    max_age_days=None,
)

Load the most recent tuning results for a target from cache.

Scans <cache_home>/tuning_results/ for files matching the current data_frame_name and target. Optionally filters by task_name and discards results older than max_age_days.

Parameters

Name Type Description Default
target str Name of the forecast target column. required
task_name Optional[str] If given, only consider results from this tuning algorithm (e.g. "optuna" or "spotoptim"). None accepts any algorithm. None
max_age_days Optional[float] Maximum age in days. Results older than this are ignored. None accepts any age. None

Returns

Name Type Description
Optional[Dict[str, Any]] A dictionary with keys best_params, best_lags,
Optional[Dict[str, Any]] task_name, target, data_frame_name, and
Optional[Dict[str, Any]] timestamp; or None if no matching file was found.

Examples

from spotforecast2.manager.multitask import LazyTask

task = LazyTask(data_frame_name="demo10")
# Save first so there is something to load
task.save_tuning_results(
    target="target_0",
    task_name="optuna",
    best_params={"n_estimators": 100},
    best_lags=24,
)
result = task.load_tuning_results(target="target_0")
print(result["best_params"])
{'n_estimators': 100}

log_summary

manager.multitask.BaseTask.log_summary()

Log a summary of the current pipeline configuration.

plot_with_outliers

manager.multitask.BaseTask.plot_with_outliers()

Visualise original vs. cleaned data with outlier markers.

Raises

Name Type Description
RuntimeError If method detect_outliers has not been called.

prepare_data

manager.multitask.BaseTask.prepare_data(demo_data=None, df_test=None)

Load, resample, validate, and configure the pipeline data.

Uses the following precedence for the training data:

  1. demo_data argument (if provided).
  2. self._dataframe set via the constructor.

Similarly for test data:

  1. df_test argument (if provided).
  2. self.data_test set via the constructor.

Parameters

Name Type Description Default
demo_data Optional[pd.DataFrame] Pre-loaded input DataFrame. When None, the constructor dataframe is used. None
df_test Optional[pd.DataFrame] Pre-loaded test DataFrame. When None, the constructor data_test is used. None

Returns

Name Type Description
BaseTask self (for method chaining).

Raises

Name Type Description
ValueError If no data source is available (no demo_data, no constructor dataframe).

Examples

import pandas as pd
from spotforecast2.manager.multitask import MultiTask
from spotforecast2_safe.data.fetch_data import (
    fetch_data, get_package_data_home,
)

data_home = get_package_data_home()
df = fetch_data(filename=str(data_home / "demo10.csv"))

mt = MultiTask(dataframe=df, predict_size=24)
mt.prepare_data()
print(f"Pipeline shape: {mt.df_pipeline.shape}")
print(f"Targets: {mt.config.targets}")
Pipeline shape: (18118, 11)
Targets: ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K']

run

manager.multitask.BaseTask.run(
    show=True,
    task=None,
    task_name=None,
    use_tuned_params=True,
    max_age_days=None,
    search_space=None,
    dry_run=False,
    cache_home=None,
    **kwargs,
)

Execute the task-specific training / tuning pipeline.

Subclasses must override this method.

Parameters

Name Type Description Default
show bool If True, display prediction figures. True
task Optional[str] Task mode override (used by MultiTask). None
task_name Optional[str] Restrict model loading to a specific source task (used by PredictTask). None
use_tuned_params bool Load cached tuning results when available (used by LazyTask). True
max_age_days Optional[float] Maximum age in days for cached results (used by LazyTask and PredictTask). None
search_space Optional[Any] Hyperparameter search-space definition (used by OptunaTask and SpotOptimTask). None
dry_run bool Report what would be deleted without removing anything (used by CleanTask). False
cache_home Optional[Path] Override the cache directory (used by CleanTask). None
**kwargs Any Additional task-specific arguments. {}

Returns

Name Type Description
Dict[str, Any] Aggregated prediction package for the task.

Raises

Name Type Description
NotImplementedError Always, unless overridden by a subclass.

save_models

manager.multitask.BaseTask.save_models(task_name, forecasters=None)

Save fitted forecaster models to the cache directory.

Each model is serialised with joblib (compress=3) into <cache_home>/models/<data_frame_name>/ using a datetime-stamped filename so that multiple snapshots can coexist.

Filename format::

<data_frame_name>_<target>_<task_name>_<YYYYMMDD_HHMMSS>.joblib

If forecasters is None the method collects fitted models from self.results[task_name], where each prediction package is expected to contain a "forecaster" key.

Parameters

Name Type Description Default
task_name str Task identifier ("lazy", "optuna", or "spotoptim"). required
forecasters Optional[Dict[str, Any]] Optional mapping {target: fitted_forecaster}. When None, models are taken from the prediction packages stored in self.results. None

Returns

Name Type Description
Dict[str, Path] Mapping {target: Path} of saved model file paths.

Raises

Name Type Description
ValueError If task_name is not one of "lazy", "optuna", "spotoptim".
RuntimeError If no fitted models are available for the requested task.

save_tuning_results

manager.multitask.BaseTask.save_tuning_results(
    target,
    task_name,
    best_params,
    best_lags,
)

Save tuning results (best parameters and lags) to a JSON file.

The file is stored under <cache_home>/tuning_results/ with a datetime-stamped filename so that loaders can determine freshness.

Filename format::

<data_frame_name>_<target>_<task_name>_<YYYYMMDD_HHMMSS>.json

Parameters

Name Type Description Default
target str Name of the forecast target column. required
task_name str Tuning algorithm identifier (e.g. "optuna", "spotoptim"). required
best_params Dict[str, Any] Best hyperparameters discovered during tuning. required
best_lags Any Best lag configuration (int, list, or nested list). required

Returns

Name Type Description
Path Path to the saved JSON file.

Examples

from spotforecast2.manager.multitask import LazyTask

task = LazyTask(data_frame_name="demo10")
path = task.save_tuning_results(
    target="target_0",
    task_name="optuna",
    best_params={"n_estimators": 100, "learning_rate": 0.05},
    best_lags=[1, 2, 24],
)
print(path.name)
demo10_target_0_optuna_20260325_204034.json