multitask.base.BaseTask(
config= None ,
* ,
dataframe= None ,
data_test= None ,
cache_home= None ,
log_level= logging.INFO,
** overrides,
)
Shared base for all multi-target forecasting pipeline tasks.
BaseTask encapsulates the data-preparation pipeline (steps 1-7) and all helper methods shared across the task modes (lazy, defaults, predict, clean). Subclasses implement the run method with task-specific training or prediction logic.
The constructor takes a single config object satisfying the PipelineConfig protocol — typically a ConfigMulti. All pipeline parameters (forecast horizon, training window, outlier policy, weather/ holiday hooks, cross-validation fold count, persistence policy, …) live on that object. Only genuinely call-time state (the dataframes, the cache directory override, the logging level) is passed as separate kwargs. Extra **overrides are forwarded to config.set_params and mutate the passed-in config in place.
Plotting is not available in spotforecast2-safe. The _show_prediction_figure and _show_prediction_figure_agg hook methods are no-ops; override them in a subclass or use the spotforecast2 sibling package for interactive visualisation.
Parameters
config
Optional [PipelineConfig]
A PipelineConfig-conforming object owning every pipeline parameter. ConfigMulti satisfies the protocol.
None
dataframe
Optional [pd .DataFrame ]
Pre-loaded input DataFrame with training data. Must contain a datetime column matching config.index_name plus at least one numeric target column.
None
data_test
Optional [pd .DataFrame ]
Pre-loaded test DataFrame (ground truth for the forecast horizon). Optional.
None
cache_home
Optional [Path ]
Cache directory override. When not None, replaces config.cache_home for this task instance.
None
log_level
int
Logging level for the pipeline logger.
logging.INFO
**overrides
Any
Forwarded to config.set_params(**overrides) — a convenience for one-line tweaks without building a fresh config. Mutates the caller’s config object.
{}
Attributes
config
PipelineConfig
Centralised pipeline configuration.
df_pipeline
pd .DataFrame
Pipeline DataFrame after preparation.
df_test
pd .DataFrame
Test DataFrame (ground truth).
weight_func
Optional [Any ]
Sample-weight function from imputation.
exogenous_features
pd .DataFrame
Combined exogenous feature matrix.
exog_feature_names
List [str ]
Selected exogenous feature names.
data_with_exog
pd .DataFrame
Merged target + exogenous data.
exo_pred
pd .DataFrame
Exogenous covariates for the forecast horizon.
results
Dict [str , Dict ]
Per-task mapping of target name to prediction package.
agg_results
Dict
Mapping of task name to aggregated prediction package.
Examples
import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask.base import BaseTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
rng = np.random.default_rng(0 )
idx = pd.date_range("2023-01-01" , periods= 24 * 7 , freq= "h" , tz= "UTC" )
df = pd.DataFrame({"load" : rng.normal(500 , 30 , len (idx))}, index= idx)
df.index.name = "DateTime"
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(
predict_size= 6 ,
use_exogenous_features= False ,
use_outlier_detection= False ,
cache_home= tmp,
auto_save_models= False ,
verbose= False ,
)
task = BaseTask(cfg, dataframe= df)
print (f"Task mode: { task. TASK} " )
print (f"Config predict_size: { task. config. predict_size} " )
Task mode: lazy
Config predict_size: 6
Methods
agg_predictor
Aggregate per-target prediction packages into a weighted forecast.
build_exogenous_features
Build, combine, encode, and merge exogenous feature covariates.
create_forecaster
Create a fresh forecaster for the given target.
cv_ts
Build a TimeSeriesFold for cross-validation.
detect_outliers
Apply hard-bound filtering and IsolationForest outlier detection.
impute
Fill missing values using the configured imputation strategy.
load_models
Load the most recent fitted models from the cache directory.
load_tuning_results
Load the most recent tuning results for a target from cache.
log_summary
Log a summary of the current pipeline configuration.
plot_with_outliers
Visualise original vs. cleaned data with outlier markers.
prepare_data
Load, resample, validate, and configure the pipeline data.
run
Execute the task-specific training / prediction pipeline.
save_models
Save fitted forecaster models to the cache directory.
save_tuning_results
Save tuning results (best parameters and lags) to a JSON file.
agg_predictor
multitask.base.BaseTask.agg_predictor(results, targets, weights)
Aggregate per-target prediction packages into a weighted forecast.
Delegates to the module-level agg_predictor function. Available as an instance method so that subclasses can override the aggregation strategy when needed.
Parameters
results
Dict [str , Dict [str , Any ]]
Mapping of target name to prediction package (as returned by build_prediction_package).
required
targets
List [str ]
Ordered list of target names to include.
required
weights
List [float ]
Per-target aggregation weights aligned with targets.
required
Returns
Dict [str , Any ]
Aggregated prediction package dict.
Examples
import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask import LazyTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
rng = np.random.default_rng(0 )
idx_train = pd.date_range("2023-01-01" , periods= 48 , freq= "h" , tz= "UTC" )
idx_future = pd.date_range("2023-01-03" , periods= 6 , freq= "h" , tz= "UTC" )
def _pkg(train_val, future_val):
return {
"train_actual" : pd.Series(np.full(48 , train_val), index= idx_train),
"train_pred" : pd.Series(np.full(48 , train_val * 0.99 ), index= idx_train),
"future_pred" : pd.Series(np.full(6 , future_val), index= idx_future),
"future_actual" : pd.Series(dtype= "float64" ),
}
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(cache_home= tmp, verbose= False )
task = LazyTask(cfg)
results = {"wind" : _pkg(100.0 , 110.0 ), "solar" : _pkg(200.0 , 210.0 )}
agg = task.agg_predictor(results, ["wind" , "solar" ], [0.4 , 0.6 ])
print (f"Weighted future_pred: { agg['future_pred' ]. iloc[0 ]:.1f} " )
Weighted future_pred: 170.0
build_exogenous_features
multitask.base.BaseTask.build_exogenous_features()
Build, combine, encode, and merge exogenous feature covariates.
This is step 4-7 of the pipeline (run after prepare_data, detect_outliers, and impute). It assembles the full exogenous-covariate matrix that the forecaster consumes, then merges it onto the target data. The orchestration proceeds in order:
4a — Weather, via get_weather_features (Open-Meteo). The response is parquet-cached only when config.cache_home is set. Fetch failures are handled per config.on_weather_failure: "raise" re-raises WeatherFetchError; "skip" logs a warning and continues with an empty weather frame (fail-safe).
4b — Calendar features, via get_calendar_features.
4c — Day/night (solar) features, via get_day_night_features (computed with astral from config.latitude / config.longitude).
4d — Holiday features, via get_holiday_features for config.country_code / config.state.
5 — The four frames are concatenated along the columns and any residual gaps are back- then forward-filled. Provider-based exogenous columns are then appended via build_providers_from_config (requires spotforecast2-safe >= 15.7.0). The active providers are governed by the config flags include_covid_infection_rate, include_entsoe_forecast_load, include_entsoe_renewable_forecast, include_entsoe_net_load, and include_entsoe_day_ahead_price. Cyclical (sine/cosine) encoding is then applied via apply_cyclical_encoding, and degree-config.poly_features_degree interaction terms are added via create_interaction_features. When the degree is at least 2, the polynomial columns are ranked by mutual information with the primary target and capped to config.max_poly_features via select_top_poly_features.
6 — The training feature set is chosen via select_exogenous_features, with provider columns appended (order-preserving, de-duplicated).
7 — Targets and covariates are merged via merge_data_and_covariates into self.data_with_exog and the forecast-horizon covariates self.exo_pred.
When config.use_exogenous_features is False the method is a no-op and returns self immediately, leaving the pipeline target-only.
Attributes
weather_aligned
pd .DataFrame
Weather frame aligned to the pipeline index, reused by the interaction and selection steps.
exogenous_features
pd .DataFrame
Full combined, encoded, and capped exogenous feature matrix.
exog_feature_names
List [str ]
Names of the exogenous features selected for training (including provider columns).
data_with_exog
pd .DataFrame
Target data merged with the selected exogenous covariates.
exo_pred
pd .DataFrame
Exogenous covariates spanning the forecast horizon, supplied to the forecaster at predict time.
Raises
RuntimeError
If prepare_data has not been called.
WeatherFetchError
If the Open-Meteo fetch fails and config.on_weather_failure == "raise".
Examples
With exogenous features disabled the method is a no-op, so the example below runs without any network access and leaves the pipeline target-only.
import tempfile
import pandas as pd
import numpy as np
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
rng = np.random.default_rng(0 )
idx = pd.date_range("2023-01-01" , periods= 24 * 14 , freq= "h" , tz= "UTC" )
df = pd.DataFrame({"a" : rng.normal(100 , 10 , len (idx))}, index= idx)
df.index.name = "DateTime"
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(
predict_size= 6 ,
use_exogenous_features= False ,
use_outlier_detection= False ,
cache_home= tmp,
)
mt = MultiTask(cfg, dataframe= df)
mt.prepare_data().detect_outliers().impute().build_exogenous_features()
print (f"Exogenous features used: { mt. config. use_exogenous_features} " )
print (f"Selected exog feature names: { mt. exog_feature_names} " )
Exogenous features used: False
Selected exog feature names: []
create_forecaster
multitask.base.BaseTask.create_forecaster(target= None )
Create a fresh forecaster for the given target.
Delegates to config.forecaster_factory when set; otherwise falls back to default_lgbm_forecaster_factory. This factory hook lets callers swap the estimator without subclassing BaseTask.
Parameters
target
Optional [str ]
Optional target column name. Forwarded to the factory so that custom factories can specialise per target.
None
Returns
Any
A new, unfitted forecaster instance.
Examples
import tempfile
from pathlib import Path
from spotforecast2_safe.multitask import LazyTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(
predict_size= 6 ,
use_exogenous_features= False ,
cache_home= Path(tmp),
)
task = LazyTask(cfg)
forecaster = task.create_forecaster()
print (f"Type: { type (forecaster). __name__ } " )
print (f"Lags: { forecaster. lags} " )
Type: ForecasterRecursive
Lags: [ 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23]
cv_ts
multitask.base.BaseTask.cv_ts(y_train)
Build a TimeSeriesFold for cross-validation.
Constructs the cross-validation splitter used by all tuning tasks. Internally uses sklearn.model_selection.TimeSeriesSplit to compute split boundaries that respect temporal ordering and avoid data leakage between folds.
The validation boundary is determined by run_state.end_train_ts minus config.delta_val. When config.train_size is set, the sklearn splitter uses a sliding fixed-size training window (max_train_size); otherwise an expanding window is used.
Parameters
y_train
pd .Series
Training time series for the current target. Used both to determine the validation boundary and as the sequence passed to TimeSeriesSplit.split to derive initial_train_size.
required
Examples
import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
rng = np.random.default_rng(0 )
idx = pd.date_range("2023-01-01" , periods= 24 * 14 , freq= "h" , tz= "UTC" )
df = pd.DataFrame({"a" : rng.normal(100 , 10 , len (idx))}, index= idx)
df.index.name = "DateTime"
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(
predict_size= 6 ,
use_exogenous_features= False ,
use_outlier_detection= False ,
cache_home= tmp,
number_folds= 2 ,
auto_save_models= False ,
verbose= False ,
)
mt = MultiTask(cfg, dataframe= df)
mt.prepare_data().detect_outliers().impute().build_exogenous_features()
y_train = mt.df_pipeline["a" ]
cv = mt.cv_ts(y_train)
print (f"TimeSeriesFold steps: { cv. steps} " )
print (f"initial_train_size: { cv. initial_train_size} " )
TimeSeriesFold steps: 6
initial_train_size: 324
detect_outliers
multitask.base.BaseTask.detect_outliers()
Apply hard-bound filtering and IsolationForest outlier detection.
Hard bounds from config.bounds are applied to the pipeline data (out-of-bound values are removed and later filled by impute()). IsolationForest detection (config.use_outlier_detection) is advisory: detected outliers are logged per column but not removed.
Examples
import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
rng = np.random.default_rng(0 )
idx = pd.date_range("2023-01-01" , periods= 24 * 14 , freq= "h" , tz= "UTC" )
df = pd.DataFrame({"a" : rng.normal(100 , 10 , len (idx))}, index= idx)
df.index.name = "DateTime"
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(
predict_size= 6 ,
use_exogenous_features= False ,
use_outlier_detection= False ,
cache_home= tmp,
auto_save_models= False ,
verbose= False ,
)
mt = MultiTask(cfg, dataframe= df)
mt.prepare_data()
mt.detect_outliers()
print (f"Pipeline shape: { mt. df_pipeline. shape} " )
assert mt.df_pipeline_original is not None
impute
multitask.base.BaseTask.impute()
Fill missing values using the configured imputation strategy.
Examples
import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
rng = np.random.default_rng(0 )
idx = pd.date_range("2023-01-01" , periods= 24 * 14 , freq= "h" , tz= "UTC" )
values = rng.normal(100 , 10 , len (idx))
values[10 :13 ] = float ("nan" ) # inject a few gaps
df = pd.DataFrame({"a" : values}, index= idx)
df.index.name = "DateTime"
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(
predict_size= 6 ,
use_exogenous_features= False ,
use_outlier_detection= False ,
cache_home= tmp,
auto_save_models= False ,
verbose= False ,
)
mt = MultiTask(cfg, dataframe= df)
mt.prepare_data().detect_outliers().impute()
missing = mt.df_pipeline["a" ].isna().sum ()
print (f"Missing values after imputation: { missing} " )
assert missing == 0
Missing values after imputation: 0
load_models
multitask.base.BaseTask.load_models(
task_name= None ,
target= None ,
max_age_days= None ,
)
Load the most recent fitted models from the cache directory.
Scans <cache_home>/models/<data_frame_name>/ for .joblib files matching the current data_frame_name. Optionally filters by task_name, target, and max_age_days.
Parameters
task_name
Optional [str ]
If given, only load models from this task ("lazy", "defaults", "optuna", or "spotoptim"). None accepts any task.
None
target
Optional [str ]
If given, only load the model for this target column. None loads the most recent model for every target found.
None
max_age_days
Optional [float ]
Maximum age in days. Models older than this are ignored. None accepts any age.
None
Returns
Dict [str , Any ]
Mapping {target: forecaster} of loaded model objects.
Dict [str , Any ]
Empty dict if no matching models were found.
Examples
import tempfile
from pathlib import Path
from spotforecast2_safe.multitask import LazyTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(
data_frame_name= "demo" ,
cache_home= Path(tmp),
verbose= False ,
)
task = LazyTask(cfg)
# Save a dummy object, then load it back.
dummy_forecaster = {"lags" : [1 , 2 , 24 ]}
task.save_models(
task_name= "lazy" ,
forecasters= {"load" : dummy_forecaster},
)
loaded = task.load_models(task_name= "lazy" )
print (f"Loaded targets: { list (loaded.keys())} " )
assert loaded["load" ]["lags" ] == [1 , 2 , 24 ]
load_tuning_results
multitask.base.BaseTask.load_tuning_results(
target,
task_name= None ,
max_age_days= None ,
)
Load the most recent tuning results for a target from cache.
Scans <cache_home>/tuning_results/ for files matching the current data_frame_name and target. Optionally filters by task_name and discards results older than max_age_days.
Parameters
target
str
Name of the forecast target column.
required
task_name
Optional [str ]
If given, only consider results from this tuning algorithm (e.g. "optuna" or "spotoptim"). None accepts any algorithm.
None
max_age_days
Optional [float ]
Maximum age in days. Results older than this are ignored. None accepts any age.
None
Examples
import tempfile
from pathlib import Path
from spotforecast2_safe.multitask import LazyTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(data_frame_name= "demo10" , cache_home= Path(tmp))
task = LazyTask(cfg)
task.save_tuning_results(
target= "target_0" ,
task_name= "optuna" ,
best_params= {"n_estimators" : 100 },
best_lags= 24 ,
)
result = task.load_tuning_results(target= "target_0" )
print (result["best_params" ])
log_summary
multitask.base.BaseTask.log_summary()
Log a summary of the current pipeline configuration.
Examples
import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
rng = np.random.default_rng(0 )
idx = pd.date_range("2023-01-01" , periods= 24 * 14 , freq= "h" , tz= "UTC" )
df = pd.DataFrame({"a" : rng.normal(100 , 10 , len (idx))}, index= idx)
df.index.name = "DateTime"
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(
predict_size= 6 ,
use_exogenous_features= False ,
use_outlier_detection= False ,
cache_home= tmp,
auto_save_models= False ,
verbose= False ,
)
mt = MultiTask(cfg, dataframe= df)
mt.prepare_data().detect_outliers().impute().build_exogenous_features()
# log_summary writes to the pipeline logger; call it to confirm
# it runs without error.
mt.log_summary()
print ("log_summary completed without error" )
log_summary completed without error
plot_with_outliers
multitask.base.BaseTask.plot_with_outliers()
Visualise original vs. cleaned data with outlier markers.
Raises
RuntimeError
If method detect_outliers has not been called.
NotImplementedError
Always — plotting is not available in spotforecast2-safe. Use the spotforecast2 package for visualisation.
Examples
import tempfile
import numpy as np
import pandas as pd
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
rng = np.random.default_rng(0 )
idx = pd.date_range("2023-01-01" , periods= 24 * 14 , freq= "h" , tz= "UTC" )
df = pd.DataFrame({"a" : rng.normal(100 , 10 , len (idx))}, index= idx)
df.index.name = "DateTime"
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(
predict_size= 6 ,
use_exogenous_features= False ,
use_outlier_detection= False ,
cache_home= tmp,
auto_save_models= False ,
verbose= False ,
)
mt = MultiTask(cfg, dataframe= df)
mt.prepare_data().detect_outliers()
try :
mt.plot_with_outliers()
except NotImplementedError as exc:
print (f"Plotting unavailable in spotforecast2-safe: { exc} " )
Plotting unavailable in spotforecast2-safe: Plotting is not available in spotforecast2-safe (no plotly/matplotlib). Use the spotforecast2 package for visualisation.
prepare_data
multitask.base.BaseTask.prepare_data(demo_data= None , df_test= None )
Load, resample, validate, and configure the pipeline data.
Uses the following precedence for the training data:
demo_data argument (if provided).
self._dataframe set via the constructor.
Similarly for test data:
df_test argument (if provided).
self.data_test set via the constructor.
self.config.test_data_loader(self.config) if set.
Parameters
demo_data
Optional [pd .DataFrame ]
Pre-loaded input DataFrame. When None, the constructor dataframe is used.
None
df_test
Optional [pd .DataFrame ]
Pre-loaded test DataFrame. When None, the constructor data_test is used, then config.test_data_loader.
None
Raises
ValueError
If no data source is available (no demo_data, no constructor dataframe).
Examples
import tempfile
import pandas as pd
import numpy as np
from spotforecast2_safe.multitask import MultiTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
rng = np.random.default_rng(0 )
idx = pd.date_range("2023-01-01" , periods= 24 * 14 , freq= "h" , tz= "UTC" )
df = pd.DataFrame({"a" : rng.normal(100 , 10 , len (idx))}, index= idx)
df.index.name = "DateTime"
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(
predict_size= 6 ,
use_exogenous_features= False ,
use_outlier_detection= False ,
cache_home= tmp,
)
mt = MultiTask(cfg, dataframe= df)
mt.prepare_data()
print (f"Pipeline shape: { mt. df_pipeline. shape} " )
print (f"Targets: { mt. run_state. targets} " )
Pipeline shape: (336, 1)
Targets: ['a']
run
multitask.base.BaseTask.run(
show= False ,
task= None ,
task_name= None ,
use_tuned_params= True ,
max_age_days= None ,
search_space= None ,
dry_run= False ,
cache_home= None ,
** kwargs,
)
Execute the task-specific training / prediction pipeline.
Subclasses must override this method.
Parameters
show
bool
If True, invoke the visualisation hooks (no-ops in this package; meaningful only in spotforecast2).
False
task
Optional [str ]
Task mode override (used by MultiTask).
None
task_name
Optional [str ]
Restrict model loading to a specific source task (used by PredictTask).
None
use_tuned_params
bool
Load cached tuning results when available (used by LazyTask).
True
max_age_days
Optional [float ]
Maximum age in days for cached results (used by LazyTask and PredictTask). Freshness is judged against the wall-clock timestamp embedded in the cache filename, so the check is machine-local.
None
search_space
Optional [Any ]
Hyperparameter search-space definition (accepted for API compatibility; not used in this package).
None
dry_run
bool
Report what would be deleted without removing anything (used by CleanTask).
False
cache_home
Optional [Path ]
Override the cache directory (used by CleanTask).
None
**kwargs
Any
Additional task-specific arguments.
{}
Returns
Dict [str , Any ]
Aggregated prediction package for the task.
Examples
import tempfile
from pathlib import Path
from spotforecast2_safe.multitask.base import BaseTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
# BaseTask.run is abstract and always raises NotImplementedError.
# Concrete subclasses (LazyTask, DefaultsTask, PredictTask, CleanTask)
# provide the real implementation.
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(cache_home= Path(tmp), verbose= False )
task = BaseTask(cfg)
try :
task.run()
except NotImplementedError as exc:
print (f"Expected: { exc} " )
Expected: BaseTask must implement run(). Use LazyTask, DefaultsTask, PredictTask, or CleanTask.
save_models
multitask.base.BaseTask.save_models(task_name, forecasters= None )
Save fitted forecaster models to the cache directory.
Each model is serialised with joblib (compress=3) into <cache_home>/models/<data_frame_name>/ using a datetime-stamped filename so that multiple snapshots can coexist.
Filename format::
<data_frame_name>_<target>_<task_name>_<YYYYMMDD_HHMMSS>.joblib
If forecasters is None the method collects fitted models from self.results[task_name], where each prediction package is expected to contain a "forecaster" key.
Parameters
task_name
str
Task identifier ("lazy", "defaults"). The names "optuna" and "spotoptim" are also accepted so that model caches produced by the spotforecast2 sibling package can be saved and loaded; no tuning is performed in this package.
required
forecasters
Optional [Dict [str , Any ]]
Optional mapping {target: fitted_forecaster}. When None, models are taken from the prediction packages stored in self.results.
None
Returns
Dict [str , Path ]
Mapping {target: Path} of saved model file paths.
Raises
ValueError
If task_name is not one of "lazy", "defaults", "optuna", "spotoptim".
RuntimeError
If no fitted models are available for the requested task.
Examples
import tempfile
from pathlib import Path
from spotforecast2_safe.multitask import LazyTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(
data_frame_name= "demo" ,
cache_home= Path(tmp),
verbose= False ,
)
task = LazyTask(cfg)
# Supply a tiny in-memory object as a stand-in for a fitted forecaster.
dummy_forecaster = object ()
saved = task.save_models(
task_name= "lazy" ,
forecasters= {"load" : dummy_forecaster},
)
print (f"Saved targets: { list (saved.keys())} " )
assert saved["load" ].suffix == ".joblib"
save_tuning_results
multitask.base.BaseTask.save_tuning_results(
target,
task_name,
best_params,
best_lags,
)
Save tuning results (best parameters and lags) to a JSON file.
The file is stored under <cache_home>/tuning_results/ with a datetime-stamped filename so that loaders can determine freshness.
Filename format::
<data_frame_name>_<target>_<task_name>_<YYYYMMDD_HHMMSS>.json
Parameters
target
str
Name of the forecast target column.
required
task_name
str
Tuning algorithm identifier (e.g. "optuna", "spotoptim").
required
best_params
Dict [str , Any ]
Best hyperparameters discovered during tuning.
required
best_lags
Any
Best lag configuration (int, list, or nested list).
required
Returns
Path
Path to the saved JSON file.
Examples
import tempfile
from pathlib import Path
from spotforecast2_safe.multitask import LazyTask
from spotforecast2_safe.configurator.config_multi import ConfigMulti
with tempfile.TemporaryDirectory() as tmp:
cfg = ConfigMulti(data_frame_name= "demo10" , cache_home= Path(tmp))
task = LazyTask(cfg)
path = task.save_tuning_results(
target= "target_0" ,
task_name= "optuna" ,
best_params= {"n_estimators" : 100 , "learning_rate" : 0.05 },
best_lags= [1 , 2 , 24 ],
)
print (path.name[:10 ])