""" TimeSeriesPredictor: lag-based direct multi-horizon forecasting for photosynthesis rate A. Uses daytime-session indexing to handle 12h+ nighttime gaps, with per-horizon models (XGBoost / GradientBoosting). Each growing season (May-Sep) is handled independently — sessions, lags, and targets never cross the off-season gap (Oct-Apr). """ from __future__ import annotations from typing import Optional import numpy as np import pandas as pd from sklearn.ensemble import GradientBoostingRegressor from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score try: from xgboost import XGBRegressor _HAS_XGB = True except ImportError: _HAS_XGB = False # Horizons: name -> (steps within session | None, calendar days | None) HORIZONS = { "15min": {"steps": 1, "days": None}, "1hour": {"steps": 4, "days": None}, "1day": {"steps": None, "days": 1}, "1week": {"steps": None, "days": 7}, "1month": {"steps": None, "days": 30}, } LAG_COLS = ["A", "ghi_w_m2", "air_temperature_c"] LAG_STEPS = [1, 2, 3, 4, 8, 12] ROLLING_WINDOWS = [4, 12] MAX_GAP_MINUTES = 30 class TimeSeriesPredictor: """Train one model per forecast horizon using lag features.""" def __init__(self): self.models: dict[str, object] = {} self.feature_cols: Optional[list[str]] = None self.results: dict[str, dict] = {} # ------------------------------------------------------------------ # Season splitting # ------------------------------------------------------------------ @staticmethod def assign_season(df: pd.DataFrame, ts_col: str = "timestamp_utc") -> pd.DataFrame: """Add a 'season' column (year of each row's growing season).""" out = df.copy() ts = pd.to_datetime(out[ts_col], utc=True) out["season"] = ts.dt.year return out # ------------------------------------------------------------------ # Session identification # ------------------------------------------------------------------ @staticmethod def identify_sessions(df: pd.DataFrame, ts_col: str = "timestamp_utc") -> pd.DataFrame: """Assign session_id to contiguous daytime blocks (gap <= MAX_GAP_MINUTES). Sessions are identified within each season independently.""" out = df.copy() out = out.sort_values(ts_col).reset_index(drop=True) ts = pd.to_datetime(out[ts_col], utc=True) gap = ts.diff().dt.total_seconds() / 60 out["session_id"] = (gap > MAX_GAP_MINUTES).cumsum() return out # ------------------------------------------------------------------ # Lag / rolling features # ------------------------------------------------------------------ @staticmethod def create_lag_features(df: pd.DataFrame) -> pd.DataFrame: """Create within-session lags, rolling stats, and previous-session summary. Previous-session features only link sessions within the same season.""" out = df.copy() # Per-session lags and rolling stats for col in LAG_COLS: if col not in out.columns: continue for lag in LAG_STEPS: col_name = f"{col}_lag{lag}" out[col_name] = out.groupby("session_id")[col].shift(lag) for w in ROLLING_WINDOWS: out[f"{col}_rmean{w}"] = out.groupby("session_id")[col].transform( lambda s: s.shift(1).rolling(w, min_periods=1).mean() ) if col == "A": out[f"{col}_rstd{w}"] = out.groupby("session_id")[col].transform( lambda s: s.shift(1).rolling(w, min_periods=1).std() ) # Previous-session summary for A (within same season) if "A" in out.columns and "season" in out.columns: sess_stats = out.groupby("session_id").agg( season=("season", "first"), mean_A=("A", "mean"), max_A=("A", "max"), ) # Shift within season so first session of each season gets NaN sess_stats["prev_sess_mean_A"] = sess_stats.groupby("season")["mean_A"].shift(1) sess_stats["prev_sess_max_A"] = sess_stats.groupby("season")["max_A"].shift(1) out = out.merge( sess_stats[["prev_sess_mean_A", "prev_sess_max_A"]], left_on="session_id", right_index=True, how="left", ) # Fill NaN lags at session start with prev-session end values (within season) for col in LAG_COLS: if col not in df.columns: continue sess_end = df.groupby("session_id").agg( last_val=(col, "last"), ) if "season" in df.columns: sess_season = df.groupby("session_id")["season"].first() sess_end["season"] = sess_season sess_end["prev_end"] = sess_end.groupby("season")["last_val"].shift(1) else: sess_end["prev_end"] = sess_end["last_val"].shift(1) out = out.merge( sess_end[["prev_end"]].rename(columns={"prev_end": f"_prev_end_{col}"}), left_on="session_id", right_index=True, how="left", ) for lag in LAG_STEPS: lag_col = f"{col}_lag{lag}" if lag_col in out.columns: out[lag_col] = out[lag_col].fillna(out[f"_prev_end_{col}"]) out.drop(columns=[f"_prev_end_{col}"], inplace=True) return out # ------------------------------------------------------------------ # Horizon targets # ------------------------------------------------------------------ @staticmethod def create_horizon_target(df: pd.DataFrame, horizon_name: str, ts_col: str = "timestamp_utc") -> pd.Series: """Create target column for a given horizon. Calendar-day targets only match within the same season.""" h = HORIZONS[horizon_name] ts = pd.to_datetime(df[ts_col], utc=True) if h["steps"] is not None: # Within-session shift target = df.groupby("session_id")["A"].shift(-h["steps"]) else: # Calendar-day match within same season days = h["days"] target_ts = ts + pd.Timedelta(days=days) if "season" in df.columns: # Build per-season lookup so targets don't cross seasons target = pd.Series(np.nan, index=df.index) for season, grp in df.groupby("season"): grp_ts = pd.to_datetime(grp[ts_col], utc=True) lookup = pd.Series(grp["A"].values, index=grp_ts) lookup = lookup[~lookup.index.duplicated(keep="first")] grp_target_ts = (grp_ts + pd.Timedelta(days=days)).dt.floor("15min") matched = grp_target_ts.map(lookup) target.loc[grp.index] = matched.values else: target_ts_rounded = target_ts.dt.floor("15min") lookup = pd.Series(df["A"].values, index=ts) lookup = lookup[~lookup.index.duplicated(keep="first")] target = target_ts_rounded.map(lookup) target = target.reset_index(drop=True) return target # ------------------------------------------------------------------ # Feature columns # ------------------------------------------------------------------ def _get_feature_cols(self, df: pd.DataFrame) -> list[str]: """Return numeric feature columns, excluding targets / metadata.""" exclude = {"A", "timestamp_utc", "time", "source", "session_id", "target", "season"} cols = [c for c in df.select_dtypes(include=[np.number]).columns if c not in exclude] return cols # ------------------------------------------------------------------ # Train / evaluate # ------------------------------------------------------------------ def train_all_horizons(self, df: pd.DataFrame, train_ratio: float = 0.75) -> pd.DataFrame: """Train one model per horizon, treating each season independently. Within each season the first ``train_ratio`` rows are used for training and the remainder for testing. Training data from all seasons is pooled to fit a single model per horizon, and test data from all seasons is pooled for evaluation. Per-season metrics are also reported. """ self.feature_cols = self._get_feature_cols(df) seasons = sorted(df["season"].unique()) rows = [] for horizon_name in HORIZONS: # Collect train/test splits per season all_X_train, all_y_train = [], [] all_X_test, all_y_test = [], [] season_train_n: dict[int, int] = {} season_test: dict[int, tuple] = {} for season in seasons: sdf = df[df["season"] == season].copy() target = self.create_horizon_target(sdf, horizon_name) sdf = sdf.copy() sdf["target"] = target.values sdf = sdf.dropna(subset=self.feature_cols + ["target"]) if len(sdf) < 30: continue n = int(len(sdf) * train_ratio) if n < 10 or len(sdf) - n < 5: continue X_tr = sdf[self.feature_cols].iloc[:n] y_tr = sdf["target"].iloc[:n] X_te = sdf[self.feature_cols].iloc[n:] y_te = sdf["target"].iloc[n:] all_X_train.append(X_tr) all_y_train.append(y_tr) all_X_test.append(X_te) all_y_test.append(y_te) season_train_n[season] = len(X_tr) season_test[season] = (X_te, y_te) if not all_X_train or not all_X_test: print(f" {horizon_name}: insufficient data across seasons, skipping") continue X_train = pd.concat(all_X_train) y_train = pd.concat(all_y_train) X_test = pd.concat(all_X_test) y_test = pd.concat(all_y_test) model = self._make_model() model.fit(X_train, y_train) self.models[horizon_name] = model # Overall metrics pred = model.predict(X_test) rmse = float(np.sqrt(mean_squared_error(y_test, pred))) mae = float(mean_absolute_error(y_test, pred)) r2 = float(r2_score(y_test, pred)) self.results[horizon_name] = { "predictions": pred, "y_test": y_test.values, "rmse": rmse, "mae": mae, "r2": r2, "n_train": len(X_train), "n_test": len(X_test), } rows.append({ "horizon": horizon_name, "season": "all", "approach": "time_series", "RMSE": round(rmse, 4), "MAE": round(mae, 4), "R2": round(r2, 4), "n_train": len(X_train), "n_test": len(X_test), }) print(f" {horizon_name} [all]: RMSE={rmse:.4f} MAE={mae:.4f} " f"R²={r2:.4f} (train={len(X_train)}, test={len(X_test)})") # Per-season metrics for season, (X_te_s, y_te_s) in season_test.items(): pred_s = model.predict(X_te_s) rmse_s = float(np.sqrt(mean_squared_error(y_te_s, pred_s))) mae_s = float(mean_absolute_error(y_te_s, pred_s)) r2_s = float(r2_score(y_te_s, pred_s)) rows.append({ "horizon": horizon_name, "season": str(season), "approach": "time_series", "RMSE": round(rmse_s, 4), "MAE": round(mae_s, 4), "R2": round(r2_s, 4), "n_train": season_train_n[season], "n_test": len(X_te_s), }) print(f" {horizon_name} [{season}]: RMSE={rmse_s:.4f} " f"MAE={mae_s:.4f} R²={r2_s:.4f} (test={len(X_te_s)})") return pd.DataFrame(rows) def get_comparison_with_baseline( self, baseline_metrics: pd.DataFrame ) -> pd.DataFrame: """Combine TS horizon results with cross-sectional baseline into one table.""" ts_rows = [] for horizon_name, res in self.results.items(): ts_rows.append({ "horizon": horizon_name, "season": "all", "approach": "time_series", "RMSE": round(res["rmse"], 4), "MAE": round(res["mae"], 4), "R2": round(res["r2"], 4), }) ts_df = pd.DataFrame(ts_rows) # Best cross-sectional model if not baseline_metrics.empty: best_idx = baseline_metrics["RMSE"].idxmin() best = baseline_metrics.loc[best_idx] bl_rows = [] for h in HORIZONS: bl_rows.append({ "horizon": h, "season": "all", "approach": f"cross_sectional ({best['model']})", "RMSE": round(float(best["RMSE"]), 4), "MAE": round(float(best["MAE"]), 4), "R2": round(float(best["R2"]), 4), }) bl_df = pd.DataFrame(bl_rows) return pd.concat([ts_df, bl_df], ignore_index=True) return ts_df # ------------------------------------------------------------------ # Internals # ------------------------------------------------------------------ @staticmethod def _make_model(): if _HAS_XGB: return XGBRegressor( n_estimators=300, max_depth=4, learning_rate=0.05, min_child_weight=10, reg_alpha=0.1, reg_lambda=1.0, n_jobs=-1, random_state=42, ) return GradientBoostingRegressor( n_estimators=300, max_depth=4, learning_rate=0.05, min_samples_leaf=10, random_state=42, )