api / src /forecasting /ts_predictor.py
Eli Safra
Deploy SolarWine API (FastAPI + Docker, port 7860)
938949f
"""
TimeSeriesPredictor: lag-based direct multi-horizon forecasting for
photosynthesis rate A. Uses daytime-session indexing to handle 12h+
nighttime gaps, with per-horizon models (XGBoost / GradientBoosting).
Each growing season (May-Sep) is handled independently — sessions, lags,
and targets never cross the off-season gap (Oct-Apr).
"""
from __future__ import annotations
from typing import Optional
import numpy as np
import pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
try:
from xgboost import XGBRegressor
_HAS_XGB = True
except ImportError:
_HAS_XGB = False
# Horizons: name -> (steps within session | None, calendar days | None)
HORIZONS = {
"15min": {"steps": 1, "days": None},
"1hour": {"steps": 4, "days": None},
"1day": {"steps": None, "days": 1},
"1week": {"steps": None, "days": 7},
"1month": {"steps": None, "days": 30},
}
LAG_COLS = ["A", "ghi_w_m2", "air_temperature_c"]
LAG_STEPS = [1, 2, 3, 4, 8, 12]
ROLLING_WINDOWS = [4, 12]
MAX_GAP_MINUTES = 30
class TimeSeriesPredictor:
"""Train one model per forecast horizon using lag features."""
def __init__(self):
self.models: dict[str, object] = {}
self.feature_cols: Optional[list[str]] = None
self.results: dict[str, dict] = {}
# ------------------------------------------------------------------
# Season splitting
# ------------------------------------------------------------------
@staticmethod
def assign_season(df: pd.DataFrame, ts_col: str = "timestamp_utc") -> pd.DataFrame:
"""Add a 'season' column (year of each row's growing season)."""
out = df.copy()
ts = pd.to_datetime(out[ts_col], utc=True)
out["season"] = ts.dt.year
return out
# ------------------------------------------------------------------
# Session identification
# ------------------------------------------------------------------
@staticmethod
def identify_sessions(df: pd.DataFrame, ts_col: str = "timestamp_utc") -> pd.DataFrame:
"""Assign session_id to contiguous daytime blocks (gap <= MAX_GAP_MINUTES).
Sessions are identified within each season independently."""
out = df.copy()
out = out.sort_values(ts_col).reset_index(drop=True)
ts = pd.to_datetime(out[ts_col], utc=True)
gap = ts.diff().dt.total_seconds() / 60
out["session_id"] = (gap > MAX_GAP_MINUTES).cumsum()
return out
# ------------------------------------------------------------------
# Lag / rolling features
# ------------------------------------------------------------------
@staticmethod
def create_lag_features(df: pd.DataFrame) -> pd.DataFrame:
"""Create within-session lags, rolling stats, and previous-session summary.
Previous-session features only link sessions within the same season."""
out = df.copy()
# Per-session lags and rolling stats
for col in LAG_COLS:
if col not in out.columns:
continue
for lag in LAG_STEPS:
col_name = f"{col}_lag{lag}"
out[col_name] = out.groupby("session_id")[col].shift(lag)
for w in ROLLING_WINDOWS:
out[f"{col}_rmean{w}"] = out.groupby("session_id")[col].transform(
lambda s: s.shift(1).rolling(w, min_periods=1).mean()
)
if col == "A":
out[f"{col}_rstd{w}"] = out.groupby("session_id")[col].transform(
lambda s: s.shift(1).rolling(w, min_periods=1).std()
)
# Previous-session summary for A (within same season)
if "A" in out.columns and "season" in out.columns:
sess_stats = out.groupby("session_id").agg(
season=("season", "first"),
mean_A=("A", "mean"),
max_A=("A", "max"),
)
# Shift within season so first session of each season gets NaN
sess_stats["prev_sess_mean_A"] = sess_stats.groupby("season")["mean_A"].shift(1)
sess_stats["prev_sess_max_A"] = sess_stats.groupby("season")["max_A"].shift(1)
out = out.merge(
sess_stats[["prev_sess_mean_A", "prev_sess_max_A"]],
left_on="session_id", right_index=True, how="left",
)
# Fill NaN lags at session start with prev-session end values (within season)
for col in LAG_COLS:
if col not in df.columns:
continue
sess_end = df.groupby("session_id").agg(
last_val=(col, "last"),
)
if "season" in df.columns:
sess_season = df.groupby("session_id")["season"].first()
sess_end["season"] = sess_season
sess_end["prev_end"] = sess_end.groupby("season")["last_val"].shift(1)
else:
sess_end["prev_end"] = sess_end["last_val"].shift(1)
out = out.merge(
sess_end[["prev_end"]].rename(columns={"prev_end": f"_prev_end_{col}"}),
left_on="session_id", right_index=True, how="left",
)
for lag in LAG_STEPS:
lag_col = f"{col}_lag{lag}"
if lag_col in out.columns:
out[lag_col] = out[lag_col].fillna(out[f"_prev_end_{col}"])
out.drop(columns=[f"_prev_end_{col}"], inplace=True)
return out
# ------------------------------------------------------------------
# Horizon targets
# ------------------------------------------------------------------
@staticmethod
def create_horizon_target(df: pd.DataFrame, horizon_name: str,
ts_col: str = "timestamp_utc") -> pd.Series:
"""Create target column for a given horizon.
Calendar-day targets only match within the same season."""
h = HORIZONS[horizon_name]
ts = pd.to_datetime(df[ts_col], utc=True)
if h["steps"] is not None:
# Within-session shift
target = df.groupby("session_id")["A"].shift(-h["steps"])
else:
# Calendar-day match within same season
days = h["days"]
target_ts = ts + pd.Timedelta(days=days)
if "season" in df.columns:
# Build per-season lookup so targets don't cross seasons
target = pd.Series(np.nan, index=df.index)
for season, grp in df.groupby("season"):
grp_ts = pd.to_datetime(grp[ts_col], utc=True)
lookup = pd.Series(grp["A"].values, index=grp_ts)
lookup = lookup[~lookup.index.duplicated(keep="first")]
grp_target_ts = (grp_ts + pd.Timedelta(days=days)).dt.floor("15min")
matched = grp_target_ts.map(lookup)
target.loc[grp.index] = matched.values
else:
target_ts_rounded = target_ts.dt.floor("15min")
lookup = pd.Series(df["A"].values, index=ts)
lookup = lookup[~lookup.index.duplicated(keep="first")]
target = target_ts_rounded.map(lookup)
target = target.reset_index(drop=True)
return target
# ------------------------------------------------------------------
# Feature columns
# ------------------------------------------------------------------
def _get_feature_cols(self, df: pd.DataFrame) -> list[str]:
"""Return numeric feature columns, excluding targets / metadata."""
exclude = {"A", "timestamp_utc", "time", "source", "session_id",
"target", "season"}
cols = [c for c in df.select_dtypes(include=[np.number]).columns if c not in exclude]
return cols
# ------------------------------------------------------------------
# Train / evaluate
# ------------------------------------------------------------------
def train_all_horizons(self, df: pd.DataFrame, train_ratio: float = 0.75) -> pd.DataFrame:
"""Train one model per horizon, treating each season independently.
Within each season the first ``train_ratio`` rows are used for
training and the remainder for testing. Training data from all
seasons is pooled to fit a single model per horizon, and test
data from all seasons is pooled for evaluation. Per-season
metrics are also reported.
"""
self.feature_cols = self._get_feature_cols(df)
seasons = sorted(df["season"].unique())
rows = []
for horizon_name in HORIZONS:
# Collect train/test splits per season
all_X_train, all_y_train = [], []
all_X_test, all_y_test = [], []
season_train_n: dict[int, int] = {}
season_test: dict[int, tuple] = {}
for season in seasons:
sdf = df[df["season"] == season].copy()
target = self.create_horizon_target(sdf, horizon_name)
sdf = sdf.copy()
sdf["target"] = target.values
sdf = sdf.dropna(subset=self.feature_cols + ["target"])
if len(sdf) < 30:
continue
n = int(len(sdf) * train_ratio)
if n < 10 or len(sdf) - n < 5:
continue
X_tr = sdf[self.feature_cols].iloc[:n]
y_tr = sdf["target"].iloc[:n]
X_te = sdf[self.feature_cols].iloc[n:]
y_te = sdf["target"].iloc[n:]
all_X_train.append(X_tr)
all_y_train.append(y_tr)
all_X_test.append(X_te)
all_y_test.append(y_te)
season_train_n[season] = len(X_tr)
season_test[season] = (X_te, y_te)
if not all_X_train or not all_X_test:
print(f" {horizon_name}: insufficient data across seasons, skipping")
continue
X_train = pd.concat(all_X_train)
y_train = pd.concat(all_y_train)
X_test = pd.concat(all_X_test)
y_test = pd.concat(all_y_test)
model = self._make_model()
model.fit(X_train, y_train)
self.models[horizon_name] = model
# Overall metrics
pred = model.predict(X_test)
rmse = float(np.sqrt(mean_squared_error(y_test, pred)))
mae = float(mean_absolute_error(y_test, pred))
r2 = float(r2_score(y_test, pred))
self.results[horizon_name] = {
"predictions": pred, "y_test": y_test.values,
"rmse": rmse, "mae": mae, "r2": r2,
"n_train": len(X_train), "n_test": len(X_test),
}
rows.append({
"horizon": horizon_name, "season": "all",
"approach": "time_series",
"RMSE": round(rmse, 4), "MAE": round(mae, 4), "R2": round(r2, 4),
"n_train": len(X_train), "n_test": len(X_test),
})
print(f" {horizon_name} [all]: RMSE={rmse:.4f} MAE={mae:.4f} "
f"R²={r2:.4f} (train={len(X_train)}, test={len(X_test)})")
# Per-season metrics
for season, (X_te_s, y_te_s) in season_test.items():
pred_s = model.predict(X_te_s)
rmse_s = float(np.sqrt(mean_squared_error(y_te_s, pred_s)))
mae_s = float(mean_absolute_error(y_te_s, pred_s))
r2_s = float(r2_score(y_te_s, pred_s))
rows.append({
"horizon": horizon_name, "season": str(season),
"approach": "time_series",
"RMSE": round(rmse_s, 4), "MAE": round(mae_s, 4),
"R2": round(r2_s, 4),
"n_train": season_train_n[season],
"n_test": len(X_te_s),
})
print(f" {horizon_name} [{season}]: RMSE={rmse_s:.4f} "
f"MAE={mae_s:.4f} R²={r2_s:.4f} (test={len(X_te_s)})")
return pd.DataFrame(rows)
def get_comparison_with_baseline(
self, baseline_metrics: pd.DataFrame
) -> pd.DataFrame:
"""Combine TS horizon results with cross-sectional baseline into one table."""
ts_rows = []
for horizon_name, res in self.results.items():
ts_rows.append({
"horizon": horizon_name, "season": "all",
"approach": "time_series",
"RMSE": round(res["rmse"], 4),
"MAE": round(res["mae"], 4),
"R2": round(res["r2"], 4),
})
ts_df = pd.DataFrame(ts_rows)
# Best cross-sectional model
if not baseline_metrics.empty:
best_idx = baseline_metrics["RMSE"].idxmin()
best = baseline_metrics.loc[best_idx]
bl_rows = []
for h in HORIZONS:
bl_rows.append({
"horizon": h, "season": "all",
"approach": f"cross_sectional ({best['model']})",
"RMSE": round(float(best["RMSE"]), 4),
"MAE": round(float(best["MAE"]), 4),
"R2": round(float(best["R2"]), 4),
})
bl_df = pd.DataFrame(bl_rows)
return pd.concat([ts_df, bl_df], ignore_index=True)
return ts_df
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
@staticmethod
def _make_model():
if _HAS_XGB:
return XGBRegressor(
n_estimators=300, max_depth=4, learning_rate=0.05,
min_child_weight=10, reg_alpha=0.1, reg_lambda=1.0,
n_jobs=-1, random_state=42,
)
return GradientBoostingRegressor(
n_estimators=300, max_depth=4, learning_rate=0.05,
min_samples_leaf=10, random_state=42,
)