Introduction to Model Training

The train_new_model() function in spotforecast2.manager.trainer_full serves as the primary gateway for bootstrapping a forecasting context. It is designed to cleanly separate the complex realities of data ingestion, training window creation (cutoffs), hyperparameter tuning, and cross-platform model persistence.

This guide explores the standard approach to training models with train_new_model(), evaluating argument utility, and demonstrating both basic and advanced scenarios.

Core Arguments Overview

When interacting with train_new_model(), you supply necessary context parameters that define what model is built and what data it learns from.

Table 1: Available arguments for train_new_model().
Argument Type Description
model_class type Reference to the python class representing the forecaster. It must accept iteration, end_dev, and train_size, and expose a tune() method.
n_iteration int Incremental version number distinguishing this training cycle from predecessors. Strongly recommended for lineage tagging.
model_name str | None Base tracking tag. The final saved filename follows the format: <model_name>_forecaster_<n_iteration>.joblib.
train_size pd.Timedelta | None Total duration of the time series window extracted backwards from the end_dev cutoff.
save_to_file bool Automatically compress and serialize the fully tuned model to a .joblib component on disk.
model_dir str | Path | None Output directory for the joblib. Defaults to the framework’s canonical cache home if left unspecified.
end_dev str | pd.Timestamp | None Hard cutoff timestamp. Data strictly chronologically after this timestamp is masked from the training pipeline. If None, it automatically infers this as one day before the most recent data point.
data_filename str | None Target csv path inside the dataset dir to load. Defers to fetch_data() logic if omitted.
**kwargs Any Key-value arguments streamed dynamically right into the model_class initialization lifecycle.

Simple Training Example

Let’s look at the most basic way to initialize and launch tuning for a new model pipeline. For the purpose of these examples, we will define a MockForecaster class representing our forecaster model, similar to how internal tracking elements act.

import pandas as pd
from spotforecast2.manager.trainer_full import train_new_model

# 1. Define a Mock Model Class meeting the API requirements
class MockForecaster:
    def __init__(self, iteration, end_dev, train_size, **kwargs):
        self.iteration = iteration
        self.end_dev = end_dev
        self.train_size = train_size
        self.config = kwargs
    
    def tune(self):
        # In actual usage, this acts as the gateway to spotoptim_search
        print(f"Executing tune() for iteration {self.iteration}")
        print(f"Focus window cuts off at: {self.end_dev}")

    def get_params(self):
        return {"stub": "mock"}

# 2. Start a basic training run explicitly overriding the cutoff
# Note: we disable saving to prevent dumping a joblib locally during the example
from spotforecast2_safe.data.fetch_data import get_package_data_home
demo_file = get_package_data_home() / "demo01.csv"

model_basic = train_new_model(
    model_class=MockForecaster,
    n_iteration=1,
    model_name="baseline_mock",
    end_dev="2023-01-01 00:00+00:00",
    train_size=None, # Use the entire history
    save_to_file=False,
    data_filename=str(demo_file)
)

print(f"Constructed class type: {type(model_basic).__name__}")
print(f"Model internal cutoff limit: {model_basic.end_dev}")
Executing tune() for iteration 1
Focus window cuts off at: 2023-01-01 00:00:00+00:00
Constructed class type: MockForecaster
Model internal cutoff limit: 2023-01-01 00:00:00+00:00

Advanced Training Scenarios

In production systems, train_new_model handles rolling window progression safely via argument parameters. You will rarely want to default to train_size=None (complete history) as this risks severe memory allocation and concept drift over time. Instead, utilizing fixed continuous windows mapped against hard checkpoints handles edge cases effectively.

We can combine train_size constraint generation dynamically with extra parameter streaming (**kwargs):

# 1. Start an advanced tuning workflow
from spotforecast2_safe.data.fetch_data import get_package_data_home
demo_file = get_package_data_home() / "demo01.csv"

model_advanced = train_new_model(
    model_class=MockForecaster,
    n_iteration=3,
    model_name="production_mock",
    train_size=pd.Timedelta(days=365), # Force exactly 1 year backward logic
    end_dev="2024-03-15 00:00+00:00",
    save_to_file=False,
    data_filename=str(demo_file),
    # Inject specific kwargs dynamically
    lags=48,
    advanced_regularization=True,
    surrogate_seed=1214
)

print(f"Validation bounded train_size setting: {model_advanced.train_size.days} days")
print(f"Injected **kwargs parameters -> lags: {model_advanced.config.get('lags')}")
print(f"Injected **kwargs parameters -> regularization: {model_advanced.config.get('advanced_regularization')}")
Executing tune() for iteration 3
Focus window cuts off at: 2024-03-15 00:00:00+00:00
Validation bounded train_size setting: 365 days
Injected **kwargs parameters -> lags: 48
Injected **kwargs parameters -> regularization: True

Because of its generalized class hook mechanism, any ForecasterRecursive wrap, complex pipeline, or hybrid system that matches the initialization signature and tune() command standard can be successfully optimized and routed through this framework entrypoint.

Fully Functional End-to-End Example

To bridge theory into a real application, the following completely functional example demonstrates loading the packaged demo01.csv historical dataset. We construct a minimal implementation of the model_class, utilizing fetch_data to load the history inside the tune method and performing a genuine ForecasterRecursive fit.

import pandas as pd
from sklearn.linear_model import Ridge
from spotforecast2_safe.forecaster.recursive import ForecasterRecursive
from spotforecast2_safe.data.fetch_data import fetch_data, get_package_data_home
from spotforecast2.manager.trainer_full import train_new_model

class FunctionalForecaster:
    # Notice we capture `dataset_path` from dynamic **kwargs
    def __init__(self, iteration, end_dev, train_size, dataset_path=None, **kwargs):
        self.iteration = iteration
        self.end_dev = end_dev
        self.train_size = train_size
        self.dataset_path = dataset_path
        
        # A simple internal forecaster to be trained
        self.forecaster = ForecasterRecursive(estimator=Ridge(), lags=3)
        self.name = "demo01_model"

    def tune(self):
        # 1. Fetch the data inside the model
        df = fetch_data(filename=self.dataset_path)
        y = df["Actual Load"]
        
        # 2. Slice the historical data strictly up to end_dev according to train_size
        if self.train_size is not None:
            start_date = self.end_dev - self.train_size
            y_train = y.loc[start_date:self.end_dev]
        else:
            y_train = y.loc[:self.end_dev]

        # 3. Fit the model genuinely
        print(f"Fitting model strictly on data until {self.end_dev}")
        print(f"Training window length: {len(y_train)} hours")
        self.forecaster.fit(y=y_train)
        
    def get_params(self):
        return {}

# 1. Define path to the demo dataset packaged dynamically with spotforecast2_safe
demo_file = get_package_data_home() / "demo01.csv"

# 2. Execute the training pipeline
# By setting end_dev=None, train_new_model checks the CSV implicitly 
# to calculate the cutoff boundary to be exactly 1 day before the final record.
model_functional = train_new_model(
    model_class=FunctionalForecaster,
    n_iteration=1,
    train_size=pd.Timedelta(days=7), # Only use the last 7 days of data for training
    end_dev=None, 
    data_filename=str(demo_file), # Passed to train_new_model to compute cutoff
    save_to_file=False, # Disable file writes for the example
    dataset_path=str(demo_file) # Stored in kwargs and passed to __init__
)

assert model_functional.forecaster.is_fitted is True
print("Model pipeline successfully fitted!")
Fitting model strictly on data until 2026-02-13 22:45:00+00:00
Training window length: 673 hours
Model pipeline successfully fitted!
/opt/hostedtoolcache/Python/3.13.12/x64/lib/python3.13/site-packages/spotforecast2_safe/forecaster/utils.py:792: UserWarning: `y` has a DatetimeIndex but no frequency. The frequency has been inferred from the index.
  warnings.warn(

Visualizing Prediction Quality

In safety-critical workflows, evaluating multi-step out-of-sample performance is critical. We can leverage the framework to explicitly constrain the end_dev boundary, allowing us to withhold future data. Once train_new_model completes training, we use the returned pipeline to project predictions across the held-out window and visualize the model’s reliability in distinguishing structural patterns in the demo02.csv dataset.

import pandas as pd
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_absolute_error
from spotforecast2_safe.forecaster.recursive import ForecasterRecursive
from spotforecast2_safe.data.fetch_data import fetch_data, get_package_data_home
from spotforecast2.manager.trainer_full import train_new_model
import plotly.graph_objects as go

class VisualizingForecaster:
    def __init__(self, iteration, end_dev, train_size, dataset_path=None, **kwargs):
        self.iteration = iteration
        self.end_dev = end_dev
        self.train_size = train_size
        self.dataset_path = dataset_path
        
        # Using a deeper lag window for more predictive capability
        self.forecaster = ForecasterRecursive(estimator=Ridge(), lags=24)
        self.name = "demo02_model"

    def tune(self):
        df = fetch_data(filename=self.dataset_path)
        y = df["A"].groupby(level=0).mean().asfreq("h").ffill() # Safely handle duplicates and NA gaps
        
        # Enforce hard upper cutoff
        y_train = y.loc[:self.end_dev]
        
        # Enforce lower boundary
        if self.train_size is not None:
            start_date = pd.to_datetime(self.end_dev, utc=True) - self.train_size
            y_train = y_train.loc[start_date:]
            
        print(f"Fitting model locally on {len(y_train)} points until {self.end_dev}")
        self.forecaster.fit(y=y_train)

    def get_params(self): return {}

# 1. Fetch the multi-variate continuous integration dataset "demo02.csv"
demo_file = get_package_data_home() / "demo02.csv"
df_full = fetch_data(filename=str(demo_file))
y_full = df_full["A"].groupby(level=0).mean().asfreq("h").ffill()

# 2. Establish chronological boundaries (e.g., test on the final 7 days)
test_duration = pd.Timedelta(days=7)
cutoff_date = y_full.index.max() - test_duration

# 3. Train isolated pipeline matching precise boundaries
model_vis = train_new_model(
    model_class=VisualizingForecaster,
    n_iteration=1,
    train_size=pd.Timedelta(days=60), # 60-day historical perspective
    end_dev=cutoff_date, 
    data_filename=str(demo_file), 
    save_to_file=False, 
    dataset_path=str(demo_file)
)

# 4. Extract ground truth testing window (exclusive of the cutoff)
y_test = y_full.loc[cutoff_date + pd.Timedelta(hours=1):]

# 5. Execute N-step recursive predictions
preds = model_vis.forecaster.predict(steps=len(y_test))
preds.index = y_test.index  # Align axes

# 6. Measure mathematical accuracy
mae = mean_absolute_error(y_test, preds)
print(f"Validation MAE: {mae:.3f}")

# 7. Generate interactive verification layer (plotly native view)
fig = go.Figure()
fig.add_trace(go.Scatter(x=y_test.index, y=y_test, mode="lines", name="Actual Truth"))
fig.add_trace(go.Scatter(
    x=preds.index, y=preds, mode="lines", 
    name="Forecaster Projection", 
    line=dict(dash="dash", color="orange")
))

fig.update_layout(
    title=f"demo02.csv Prediction Quality Appraisal (MAE: {mae:.3f})",
    xaxis_title="Time (UTC)",
    yaxis_title="Target Sensor: A",
    template="plotly_white",
    hovermode="x unified"
)
# fig.show() # Automatically evaluates inside Quarto output blocks
Fitting model locally on 1441 points until 1975-06-11 18:00:00+00:00
Validation MAE: 0.093

Advanced Modeling with LightGBM

While simple linear models provide a solid baseline, modern production pipelines often utilize Gradient Boosted Trees to capture non-linear relationships and complex interactions. The framework natively supports interchangeable scikit-learn compatible estimators.

Here is the exact same pipeline from above, but upgraded to use LGBMRegressor from lightgbm. Notice that the architecture of VisualizingForecaster remains entirely modular, demonstrating the flexibility of the ForecasterRecursive wrapper.

import pandas as pd
from lightgbm import LGBMRegressor
from sklearn.metrics import mean_absolute_error
from spotforecast2_safe.forecaster.recursive import ForecasterRecursive
from spotforecast2_safe.data.fetch_data import fetch_data, get_package_data_home
from spotforecast2.manager.trainer_full import train_new_model
import plotly.graph_objects as go

class LGBMVisualizingForecaster:
    def __init__(self, iteration, end_dev, train_size, dataset_path=None, **kwargs):
        self.iteration = iteration
        self.end_dev = end_dev
        self.train_size = train_size
        self.dataset_path = dataset_path
        
        # Inject LightGBM instead of Ridge
        self.forecaster = ForecasterRecursive(
            estimator=LGBMRegressor(n_estimators=100, learning_rate=0.05, random_state=42, verbose=-1), 
            lags=24
        )
        self.name = "demo02_lgbm_model"

    def tune(self):
        df = fetch_data(filename=self.dataset_path)
        y = df["A"].groupby(level=0).mean().asfreq("h").ffill()
        
        y_train = y.loc[:self.end_dev]
        if self.train_size is not None:
            start_date = pd.to_datetime(self.end_dev, utc=True) - self.train_size
            y_train = y_train.loc[start_date:]
            
        print(f"Fitting LGBM locally on {len(y_train)} points until {self.end_dev}")
        self.forecaster.fit(y=y_train)

    def get_params(self): return {}

demo_file = get_package_data_home() / "demo02.csv"
df_full = fetch_data(filename=str(demo_file))
y_full = df_full["A"].groupby(level=0).mean().asfreq("h").ffill()

test_duration = pd.Timedelta(days=7)
cutoff_date = y_full.index.max() - test_duration

model_lgbm = train_new_model(
    model_class=LGBMVisualizingForecaster,
    n_iteration=1,
    train_size=pd.Timedelta(days=60), 
    end_dev=cutoff_date, 
    data_filename=str(demo_file), 
    save_to_file=False, 
    dataset_path=str(demo_file)
)

y_test = y_full.loc[cutoff_date + pd.Timedelta(hours=1):]
preds = model_lgbm.forecaster.predict(steps=len(y_test))
preds.index = y_test.index  

mae = mean_absolute_error(y_test, preds)
print(f"LGBM Validation MAE: {mae:.3f}")

# Generate interactive verification layer
fig = go.Figure()
fig.add_trace(go.Scatter(x=y_test.index, y=y_test, mode="lines", name="Actual Truth"))
fig.add_trace(go.Scatter(
    x=preds.index, y=preds, mode="lines", 
    name="LGBM Projection", 
    line=dict(dash="dash", color="purple")
))

fig.update_layout(
    title=f"demo02.csv LightGBM Quality Appraisal (MAE: {mae:.3f})",
    xaxis_title="Time (UTC)",
    yaxis_title="Target Sensor: A",
    template="plotly_white",
    hovermode="x unified"
)
fig.update_layout(
    title=f"demo02.csv LightGBM Quality Appraisal (MAE: {mae:.3f})",
    xaxis_title="Time (UTC)",
    yaxis_title="Target Sensor: A",
    template="plotly_white",
    hovermode="x unified"
)
# fig.show() 
Fitting LGBM locally on 1441 points until 1975-06-11 18:00:00+00:00
LGBM Validation MAE: 0.098

Automated Run Orchestration with handle_training

While train_new_model handles the mechanics of model tuning and serialization, manual invocation forces you to manage version tracking and retrain cadences. For real MLOps deployments—like weekly cron jobs—you need an orchestrator that determines if a model actually needs retraining before wasting compute.

The handle_training() function serves as this smart orchestrator layer. It checks the default library cache (or a specified directory) to find the most recent version of your model. If the existing model is older than 7 days, or if no model exists, it orchestrates a new train_new_model run and increments the iteration counter automatically (e.g., <model>_forecaster_0.joblib to <model>_forecaster_1.joblib).

handle_training Arguments

Argument Type Description
model_class type The class of the forecaster model to train. (e.g., ForecasterLGBM or custom wrappers).
model_name str | None Base tracking tag used for standardizing the disk file name. If None, inferred automatically from model_class.__name__.
model_dir str | Path | None Output directory for the .joblib assets. Defaults to the framework’s .cache.
force bool If True, overrides the age check and forces a retraining iteration regardless of how recent the previous model is. Defaults to False.
train_size pd.Timedelta | None Total duration of the time series training window.
end_dev str | pd.Timestamp | None Hard chronological cutoff separating training data from future inference data.
data_filename str | None Target csv path inside the dataset dir to load via fetch_data.
**kwargs Any Key-value arguments streamed dynamically right into the model_class initialization.

Simple Example: Initializing the Pipeline

In this first example, assume we have an empty production environment. We invoke handle_training(). Because no model currently exists in our temporary directory, it will execute an initial training cycle matching iteration 0.

import tempfile
from pathlib import Path
import pandas as pd
from spotforecast2_safe.data.fetch_data import get_package_data_home
from spotforecast2.manager.trainer_full import handle_training

# We define a lightweight dummy model for demonstration.
# In reality, this would be a full ForecasterRecursive implementation.
class SimpleModel:
    def __init__(self, iteration, end_dev, train_size, **kwargs):
        self.iteration = iteration
        self.end_dev = end_dev
        self.train_size = train_size
    def tune(self):
        print(f"  [Output] Tuning triggered for iteration {self.iteration}")
    def get_params(self): return {}

demo_file = get_package_data_home() / "demo01.csv"

# Use a temporary directory as our "server disk" to avoid cluttering local files
with tempfile.TemporaryDirectory() as tmpdir:
    print("Executing standard handle_training request...")
    
    # 1. Trigger handler
    handle_training(
        model_class=SimpleModel,
        model_name="simple_demo",
        model_dir=tmpdir,
        data_filename=str(demo_file)
    )
    
    # 2. Verify disk state
    files_on_disk = list(Path(tmpdir).glob("*.joblib"))
    print(f"Files saved to disk: {[f.name for f in files_on_disk]}")
Executing standard handle_training request...
  [Output] Tuning triggered for iteration 0
Files saved to disk: ['simple_demo_forecaster_0.joblib']

Advanced Example: Forcing Retraining Updates

Now consider a scenario where time has passed, and we want to actively force the pipeline to recalibrate utilizing the force=True command. Even if the loaded model hasn’t mathematically “expired” past the 7-day default limit, handle_training detects the file, increments the iteration sequence natively, and archives the latest representation safely on disk alongside the predecessor.

import tempfile
from pathlib import Path
from spotforecast2_safe.data.fetch_data import get_package_data_home
from spotforecast2.manager.trainer_full import handle_training

class AdvancedModel:
    def __init__(self, iteration, end_dev, train_size, **kwargs):
        self.iteration = iteration
        self.end_dev = end_dev
        self.train_size = train_size
        self.kwargs = kwargs
    def tune(self): 
        print(f"  [Output] Tuned AdvancedModel {self.iteration} | Mode: {self.kwargs.get('mode')}")
    def get_params(self): return {}

demo_file = get_package_data_home() / "demo01.csv"

with tempfile.TemporaryDirectory() as tmpdir:
    # 1. Simulate the initial Monday morning cron job
    print("--- Stage 1: Initial Cron Execution ---")
    handle_training(
        model_class=AdvancedModel,
        model_name="advanced_model",
        model_dir=tmpdir,
        data_filename=str(demo_file),
        mode="baseline"
    )
    
    # 2. Simulate an emergency Friday redeployment constraint via `force`
    print("\n--- Stage 2: Forced Redeployment ---")
    handle_training(
        model_class=AdvancedModel,
        model_name="advanced_model",
        model_dir=tmpdir,
        force=True, # Bypass expiration
        data_filename=str(demo_file),
        mode="emergency_update"
    )
    
    # 3. Observe the structured historical retention
    files = sorted([f.name for f in Path(tmpdir).glob("*.joblib")])
    print(f"\nFinal Directory State:\n{files}")
--- Stage 1: Initial Cron Execution ---
  [Output] Tuned AdvancedModel 0 | Mode: baseline

--- Stage 2: Forced Redeployment ---
  [Output] Tuned AdvancedModel 1 | Mode: emergency_update

Final Directory State:
['advanced_model_forecaster_0.joblib', 'advanced_model_forecaster_1.joblib']