Data Imputation and Gap Weighting

How to handle missing values and minimize their impact on model performance.

In real-world time series forecasting, data gaps are inevitable. While simple imputation (like forward-filling) can provide values for missing entries, it often introduces localized noise or bias. spotforecast2-safe provides a systematic way to handle these gaps using the WeightFunction and get_missing_weights utilities.

The Problem: Data Gaps and Model Lags

Most time series models (like ForecasterRecursive) use historical values (lags) as features. If a single data point is missing and imputed, every subsequent prediction that uses that point in its lag window is potentially compromised.

The Solution: Weighted Imputation

Instead of just filling gaps, we calculate a “weight” for each data point. Points that were recently imputed or are too close to a gap are given a weight of 0, effectively excluding them from the training objective.

Key Components

  • get_missing_weights: Analyzes a DataFrame for missing values, performs deterministic imputation, and calculates a 0/1 weight series based on a window_size.
  • WeightFunction: A serializable (picklable) wrapper for these weights, allowing them to be passed to model training pipelines.

Implementation Example

The following example demonstrates how to prepare data with gaps for safe training and generate a forecast.

import pandas as pd
import numpy as np
from lightgbm import LGBMRegressor
from spotforecast2_safe.forecaster.recursive import ForecasterRecursive
from spotforecast2_safe.preprocessing.imputation import get_missing_weights, WeightFunction

# 1. Synthetic hourly series with two consecutive gaps
np.random.seed(42)
n = 200
dates = pd.date_range('2020-01-01', periods=n, freq='h')
values = np.sin(np.arange(n) * 2 * np.pi / 24) + np.random.randn(n) * 0.1
data = pd.DataFrame({'y': values}, index=dates)
data.loc[data.index[50], 'y'] = np.nan
data.loc[data.index[51], 'y'] = np.nan

print(f"NaNs before imputation: {data['y'].isna().sum()}")

# 2. Impute gaps and compute safety weights
# window_size=12 → the gap itself plus the next 12 observations are down-weighted to 0
filled_data, weights = get_missing_weights(data, window_size=12)

print(f"NaNs after imputation:  {filled_data['y'].isna().sum()}")
print(f"Zero-weight samples:    {(weights == 0).sum()}  "
      f"(gap + {12}-step lag window masked out)")

# 3. Wrap weights in a picklable WeightFunction
weight_func = WeightFunction(weights)

# 4. Build forecaster — weight_func down-weights imputed regions during training
forecaster = ForecasterRecursive(
    estimator=LGBMRegressor(n_jobs=1, verbose=-1, random_state=42),
    lags=12,
    weight_func=weight_func,
)
forecaster.fit(filled_data['y'])

# 5. Forecast the next 24 hours
forecast = forecaster.predict(steps=24)

# 6. Results table
results = pd.DataFrame({
    'timestamp': forecast.index,
    'forecast':  forecast.values.round(4),
})
results.set_index('timestamp')
NaNs before imputation: 2
NaNs after imputation:  0
Zero-weight samples:    14  (gap + 12-step lag window masked out)
forecast
timestamp
2020-01-09 08:00:00 0.8213
2020-01-09 09:00:00 0.7379
2020-01-09 10:00:00 0.5530
2020-01-09 11:00:00 0.2825
2020-01-09 12:00:00 0.0396
2020-01-09 13:00:00 -0.3173
2020-01-09 14:00:00 -0.6113
2020-01-09 15:00:00 -0.7076
2020-01-09 16:00:00 -0.8231
2020-01-09 17:00:00 -0.9425
2020-01-09 18:00:00 -0.9796
2020-01-09 19:00:00 -0.9678
2020-01-09 20:00:00 -0.8754
2020-01-09 21:00:00 -0.7605
2020-01-09 22:00:00 -0.4435
2020-01-09 23:00:00 -0.2832
2020-01-10 00:00:00 0.0053
2020-01-10 01:00:00 0.1809
2020-01-10 02:00:00 0.5853
2020-01-10 03:00:00 0.6046
2020-01-10 04:00:00 0.8878
2020-01-10 05:00:00 1.0020
2020-01-10 06:00:00 0.9964
2020-01-10 07:00:00 0.9290

Internal Logic

The get_missing_weights function uses a rolling maximum approach to propagate the “missing” status across the window_size. This ensures that if any part of a model’s feature vector (of size window_size) touches an imputed value, that specific sample is ignored during training.

Safety First

The WeightFunction is designed to be deterministic and picklable. This is crucial for safety-critical systems where models must be serialized and loaded in different environments (e.g., training in a batch environment and predicting in an edge device) without loss of context.

The Weight Function Explained

To make the mechanism concrete, this section walks through the exact numerical values at each step using a small, readable example: 30 hourly observations, two consecutive gaps at positions 10–11, and window_size=5.

Step 1 — Raw data with gaps

import pandas as pd
import numpy as np
from spotforecast2_safe.preprocessing.imputation import get_missing_weights, WeightFunction
from spotforecast2_safe.forecaster.recursive import ForecasterRecursive
from lightgbm import LGBMRegressor

np.random.seed(42)
n = 30
dates = pd.date_range('2020-01-01', periods=n, freq='h')
values = np.sin(np.arange(n) * 2 * np.pi / 24) + np.random.randn(n) * 0.1
data = pd.DataFrame({'y': values.round(4)}, index=dates)

# Inject two consecutive gaps
data.loc[data.index[10], 'y'] = np.nan
data.loc[data.index[11], 'y'] = np.nan

data.iloc[7:18]   # rows around the gap
y
2020-01-01 07:00:00 1.0427
2020-01-01 08:00:00 0.8191
2020-01-01 09:00:00 0.7614
2020-01-01 10:00:00 NaN
2020-01-01 11:00:00 NaN
2020-01-01 12:00:00 0.0242
2020-01-01 13:00:00 -0.4501
2020-01-01 14:00:00 -0.6725
2020-01-01 15:00:00 -0.7633
2020-01-01 16:00:00 -0.9673
2020-01-01 17:00:00 -0.9345

The last known good value before the gap is 0.7614 (row 9, 09:00).

Step 2 — Imputation and weight propagation

get_missing_weights forward-fills the gap with the last known value, then applies a rolling maximum of width window_size + 1 over a binary “was-NaN” mask. Any row whose rolling window touches an imputed value receives weight 0.

filled_data, weights = get_missing_weights(data, window_size=5)

combined = filled_data.copy()
combined['weight'] = weights
combined.iloc[7:20]
y weight
2020-01-01 07:00:00 1.0427 1.0
2020-01-01 08:00:00 0.8191 1.0
2020-01-01 09:00:00 0.7614 1.0
2020-01-01 10:00:00 0.7614 0.0
2020-01-01 11:00:00 0.7614 0.0
2020-01-01 12:00:00 0.0242 0.0
2020-01-01 13:00:00 -0.4501 0.0
2020-01-01 14:00:00 -0.6725 0.0
2020-01-01 15:00:00 -0.7633 0.0
2020-01-01 16:00:00 -0.9673 0.0
2020-01-01 17:00:00 -0.9345 1.0
2020-01-01 18:00:00 -1.0908 1.0
2020-01-01 19:00:00 -1.1072 1.0

The contaminated zone covers exactly 2 gap rows + 5 lag-window rows = 7 zeros.

Note

Why extend the mask by window_size? A lag-3 model predicting at 12:00 uses features [lag_1=11:00, lag_2=10:00, lag_3=09:00]. Both 10:00 and 11:00 are imputed, so the entire training row for 12:00 is corrupt. The mask must reach forward by the full lag depth.

Step 3 — From weights to X_train and sample_weight

ForecasterRecursive._create_train_X_y() converts the series into a lag matrix. With lags=3 the training matrix starts at row 3 and the row indices become timestamps:

forecaster_small = ForecasterRecursive(
    estimator=LGBMRegressor(n_jobs=1, verbose=-1, random_state=42),
    lags=3,
    weight_func=WeightFunction(weights),
)

X_train, y_train, *_ = forecaster_small._create_train_X_y(filled_data['y'])
sample_weight = forecaster_small.create_sample_weights(X_train)

# Combine everything for inspection
inspection = X_train.copy()
inspection['y_target']      = y_train
inspection['sample_weight'] = sample_weight
inspection.iloc[7:18]
lag_1 lag_2 lag_3 y_target sample_weight
2020-01-01 10:00:00 0.7614 0.8191 1.0427 0.7614 0.0
2020-01-01 11:00:00 0.7614 0.7614 0.8191 0.7614 0.0
2020-01-01 12:00:00 0.7614 0.7614 0.7614 0.0242 0.0
2020-01-01 13:00:00 0.0242 0.7614 0.7614 -0.4501 0.0
2020-01-01 14:00:00 -0.4501 0.0242 0.7614 -0.6725 0.0
2020-01-01 15:00:00 -0.6725 -0.4501 0.0242 -0.7633 0.0
2020-01-01 16:00:00 -0.7633 -0.6725 -0.4501 -0.9673 0.0
2020-01-01 17:00:00 -0.9673 -0.7633 -0.6725 -0.9345 1.0
2020-01-01 18:00:00 -0.9345 -0.9673 -0.7633 -1.0908 1.0
2020-01-01 19:00:00 -1.0908 -0.9345 -0.9673 -1.1072 1.0
2020-01-01 20:00:00 -1.1072 -1.0908 -0.9345 -0.7195 1.0

weight_func is called with X_train.index (the timestamps). It looks up each timestamp in the pre-computed weights Series and returns the corresponding 0.0 or 1.0. Rows where sample_weight == 0 are present in X_train but have zero influence on the fitted model.

Step 4 — How the estimator uses sample_weight

Inside ForecasterRecursive.fit() the call path is:

fit(y, exog)
  └─ _create_train_X_y(y)          → X_train, y_train
  └─ create_sample_weights(X_train) → weight_func(X_train.index)  → array([1,1,...,0,0,...,1])
  └─ estimator.fit(X_train, y_train, sample_weight=array)

LGBMRegressor (and any sklearn-compatible estimator) interprets sample_weight=0 as “skip this observation entirely”. The imputed rows are physically present in the training matrix but contribute zero gradient to the model.

# Show the weight distribution summary
sw_series = pd.Series(sample_weight, index=X_train.index, name='sample_weight')
print(f"Total training rows : {len(sw_series)}")
print(f"Active   (weight=1) : {(sw_series == 1).sum()}")
print(f"Excluded (weight=0) : {(sw_series == 0).sum()}")
print()
print("Excluded timestamps:")
print(sw_series[sw_series == 0].index.tolist())
Total training rows : 27
Active   (weight=1) : 20
Excluded (weight=0) : 7

Excluded timestamps:
[Timestamp('2020-01-01 10:00:00'), Timestamp('2020-01-01 11:00:00'), Timestamp('2020-01-01 12:00:00'), Timestamp('2020-01-01 13:00:00'), Timestamp('2020-01-01 14:00:00'), Timestamp('2020-01-01 15:00:00'), Timestamp('2020-01-01 16:00:00')]

Why a callable — not an array?

You might wonder: if we already have the weights Series, why not pass it directly to the forecaster instead of wrapping it in a callable? There are three concrete reasons.

Reason 1 — Length mismatch: the lag matrix is shorter than the original series.

get_missing_weights returns one weight per row of the original series (length n). ForecasterRecursive._create_train_X_y discards the first max(lags) rows because they cannot be fully populated with lag features. The resulting X_train has length n − max(lags), which changes with every different lags setting.

print(f"len(filled_data) = {len(filled_data)}   ← original series, one weight per row")
print(f"len(weights)     = {len(weights)}   ← same")
print()
print(f"{'lags':>6}  {'X_train rows':>12}  {'would need array length':>22}  {'actual weights length':>20}")
print("-" * 66)
for lags in [1, 3, 7, 12]:
    f_tmp = ForecasterRecursive(
        estimator=LGBMRegressor(verbose=-1),
        lags=lags,
        weight_func=WeightFunction(weights),
    )
    X_tmp, *_ = f_tmp._create_train_X_y(filled_data['y'])
    print(f"{lags:>6}  {len(X_tmp):>12}  {len(X_tmp):>22}  {len(weights):>20}  {'✗ mismatch' if len(X_tmp) != len(weights) else '✓'}")
len(filled_data) = 30   ← original series, one weight per row
len(weights)     = 30   ← same

  lags  X_train rows  would need array length  actual weights length
------------------------------------------------------------------
     1            29                      29                    30  ✗ mismatch
     3            27                      27                    30  ✗ mismatch
     7            23                      23                    30  ✗ mismatch
    12            18                      18                    30  ✗ mismatch

A plain array passed at construction time has no way to know which rows were dropped. A callable receives X_train.index — the exact timestamps that survived — and performs a label-based lookup, returning a correctly sized array regardless of lags.

Reason 2 — Index-based lookup, not positional.

The forecaster calls weight_func(X_train.index) where X_train.index is a DatetimeIndex. The function maps each timestamp to its pre-computed weight by label:

wf = WeightFunction(weights)

# Query a window that spans the gap boundary (mix of 1s and 0s)
sample_idx = X_train.index[5:8]      # hours 08:00–10:00 — crosses the gap at 10:00
print("queried timestamps:", sample_idx.tolist())
print("returned weights:  ", wf(sample_idx).tolist())

# When *all* queried timestamps fall inside the zero-weight zone,
# WeightFunction returns None instead of an all-zero array.
# ForecasterRecursive.create_sample_weights treats None as "use uniform weights",
# preventing a ValueError when the entire training window is penalised.
all_zero_idx = X_train.index[7:10]   # hours 10:00–12:00 — entirely in the gap zone
result = wf(all_zero_idx)
print(f"\nall-zero window → {result!r}  (None = uniform weights fallback)")
WeightFunction: all sample weights for the requested index are zero (the window falls entirely within gap-penalty zones). Returning None so ForecasterRecursive uses uniform weighting.
queried timestamps: [Timestamp('2020-01-01 08:00:00'), Timestamp('2020-01-01 09:00:00'), Timestamp('2020-01-01 10:00:00')]
returned weights:   [1.0, 1.0, 0.0]

all-zero window → None  (None = uniform weights fallback)

A positional array weights[7:10] would silently return the wrong values because the lag-matrix offset shifts all positions by max(lags).

Reason 3 — Picklability for model serialization.

Trained ForecasterRecursive instances are saved with persistence.dump() (joblib). Any closure or lambda that captures a local variable fails at serialization time:

import pickle

# A lambda closure over a local variable — NOT serializable
def make_closure():
    w = weights.copy()
    return lambda index: w.reindex(index).values

try:
    pickle.dumps(make_closure())
    print("lambda closure: picklable")
except AttributeError as e:
    print(f"lambda closure: NOT picklable — {e}")

# WeightFunction stores only the Series — fully serializable
wf_roundtrip = pickle.loads(pickle.dumps(WeightFunction(weights)))
print(f"WeightFunction:  picklable — spot-check: {wf_roundtrip(weights.index[:3]).tolist()}")
lambda closure: NOT picklable — Can't get local object 'make_closure.<locals>.<lambda>'
WeightFunction:  picklable — spot-check: [1.0, 1.0, 1.0]

WeightFunction is a plain class with a single __init__ attribute (weights_series). Pickle serializes it without any closure or scope dependency, making it safe to store alongside the trained model and reload in a completely separate process.