| """ |
| TimeSeriesPredictor: lag-based direct multi-horizon forecasting for |
| photosynthesis rate A. Uses daytime-session indexing to handle 12h+ |
| nighttime gaps, with per-horizon models (XGBoost / GradientBoosting). |
| |
| Each growing season (May-Sep) is handled independently — sessions, lags, |
| and targets never cross the off-season gap (Oct-Apr). |
| """ |
|
|
| from __future__ import annotations |
|
|
| from typing import Optional |
|
|
| import numpy as np |
| import pandas as pd |
| from sklearn.ensemble import GradientBoostingRegressor |
| from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score |
|
|
| try: |
| from xgboost import XGBRegressor |
| _HAS_XGB = True |
| except ImportError: |
| _HAS_XGB = False |
|
|
| |
| HORIZONS = { |
| "15min": {"steps": 1, "days": None}, |
| "1hour": {"steps": 4, "days": None}, |
| "1day": {"steps": None, "days": 1}, |
| "1week": {"steps": None, "days": 7}, |
| "1month": {"steps": None, "days": 30}, |
| } |
|
|
| LAG_COLS = ["A", "ghi_w_m2", "air_temperature_c"] |
| LAG_STEPS = [1, 2, 3, 4, 8, 12] |
| ROLLING_WINDOWS = [4, 12] |
| MAX_GAP_MINUTES = 30 |
|
|
|
|
| class TimeSeriesPredictor: |
| """Train one model per forecast horizon using lag features.""" |
|
|
| def __init__(self): |
| self.models: dict[str, object] = {} |
| self.feature_cols: Optional[list[str]] = None |
| self.results: dict[str, dict] = {} |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def assign_season(df: pd.DataFrame, ts_col: str = "timestamp_utc") -> pd.DataFrame: |
| """Add a 'season' column (year of each row's growing season).""" |
| out = df.copy() |
| ts = pd.to_datetime(out[ts_col], utc=True) |
| out["season"] = ts.dt.year |
| return out |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def identify_sessions(df: pd.DataFrame, ts_col: str = "timestamp_utc") -> pd.DataFrame: |
| """Assign session_id to contiguous daytime blocks (gap <= MAX_GAP_MINUTES). |
| Sessions are identified within each season independently.""" |
| out = df.copy() |
| out = out.sort_values(ts_col).reset_index(drop=True) |
| ts = pd.to_datetime(out[ts_col], utc=True) |
| gap = ts.diff().dt.total_seconds() / 60 |
| out["session_id"] = (gap > MAX_GAP_MINUTES).cumsum() |
| return out |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def create_lag_features(df: pd.DataFrame) -> pd.DataFrame: |
| """Create within-session lags, rolling stats, and previous-session summary. |
| Previous-session features only link sessions within the same season.""" |
| out = df.copy() |
|
|
| |
| for col in LAG_COLS: |
| if col not in out.columns: |
| continue |
| for lag in LAG_STEPS: |
| col_name = f"{col}_lag{lag}" |
| out[col_name] = out.groupby("session_id")[col].shift(lag) |
| for w in ROLLING_WINDOWS: |
| out[f"{col}_rmean{w}"] = out.groupby("session_id")[col].transform( |
| lambda s: s.shift(1).rolling(w, min_periods=1).mean() |
| ) |
| if col == "A": |
| out[f"{col}_rstd{w}"] = out.groupby("session_id")[col].transform( |
| lambda s: s.shift(1).rolling(w, min_periods=1).std() |
| ) |
|
|
| |
| if "A" in out.columns and "season" in out.columns: |
| sess_stats = out.groupby("session_id").agg( |
| season=("season", "first"), |
| mean_A=("A", "mean"), |
| max_A=("A", "max"), |
| ) |
| |
| sess_stats["prev_sess_mean_A"] = sess_stats.groupby("season")["mean_A"].shift(1) |
| sess_stats["prev_sess_max_A"] = sess_stats.groupby("season")["max_A"].shift(1) |
| out = out.merge( |
| sess_stats[["prev_sess_mean_A", "prev_sess_max_A"]], |
| left_on="session_id", right_index=True, how="left", |
| ) |
|
|
| |
| for col in LAG_COLS: |
| if col not in df.columns: |
| continue |
| sess_end = df.groupby("session_id").agg( |
| last_val=(col, "last"), |
| ) |
| if "season" in df.columns: |
| sess_season = df.groupby("session_id")["season"].first() |
| sess_end["season"] = sess_season |
| sess_end["prev_end"] = sess_end.groupby("season")["last_val"].shift(1) |
| else: |
| sess_end["prev_end"] = sess_end["last_val"].shift(1) |
| out = out.merge( |
| sess_end[["prev_end"]].rename(columns={"prev_end": f"_prev_end_{col}"}), |
| left_on="session_id", right_index=True, how="left", |
| ) |
| for lag in LAG_STEPS: |
| lag_col = f"{col}_lag{lag}" |
| if lag_col in out.columns: |
| out[lag_col] = out[lag_col].fillna(out[f"_prev_end_{col}"]) |
| out.drop(columns=[f"_prev_end_{col}"], inplace=True) |
|
|
| return out |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def create_horizon_target(df: pd.DataFrame, horizon_name: str, |
| ts_col: str = "timestamp_utc") -> pd.Series: |
| """Create target column for a given horizon. |
| Calendar-day targets only match within the same season.""" |
| h = HORIZONS[horizon_name] |
| ts = pd.to_datetime(df[ts_col], utc=True) |
|
|
| if h["steps"] is not None: |
| |
| target = df.groupby("session_id")["A"].shift(-h["steps"]) |
| else: |
| |
| days = h["days"] |
| target_ts = ts + pd.Timedelta(days=days) |
|
|
| if "season" in df.columns: |
| |
| target = pd.Series(np.nan, index=df.index) |
| for season, grp in df.groupby("season"): |
| grp_ts = pd.to_datetime(grp[ts_col], utc=True) |
| lookup = pd.Series(grp["A"].values, index=grp_ts) |
| lookup = lookup[~lookup.index.duplicated(keep="first")] |
| grp_target_ts = (grp_ts + pd.Timedelta(days=days)).dt.floor("15min") |
| matched = grp_target_ts.map(lookup) |
| target.loc[grp.index] = matched.values |
| else: |
| target_ts_rounded = target_ts.dt.floor("15min") |
| lookup = pd.Series(df["A"].values, index=ts) |
| lookup = lookup[~lookup.index.duplicated(keep="first")] |
| target = target_ts_rounded.map(lookup) |
| target = target.reset_index(drop=True) |
|
|
| return target |
|
|
| |
| |
| |
|
|
| def _get_feature_cols(self, df: pd.DataFrame) -> list[str]: |
| """Return numeric feature columns, excluding targets / metadata.""" |
| exclude = {"A", "timestamp_utc", "time", "source", "session_id", |
| "target", "season"} |
| cols = [c for c in df.select_dtypes(include=[np.number]).columns if c not in exclude] |
| return cols |
|
|
| |
| |
| |
|
|
| def train_all_horizons(self, df: pd.DataFrame, train_ratio: float = 0.75) -> pd.DataFrame: |
| """Train one model per horizon, treating each season independently. |
| |
| Within each season the first ``train_ratio`` rows are used for |
| training and the remainder for testing. Training data from all |
| seasons is pooled to fit a single model per horizon, and test |
| data from all seasons is pooled for evaluation. Per-season |
| metrics are also reported. |
| """ |
| self.feature_cols = self._get_feature_cols(df) |
| seasons = sorted(df["season"].unique()) |
| rows = [] |
|
|
| for horizon_name in HORIZONS: |
| |
| all_X_train, all_y_train = [], [] |
| all_X_test, all_y_test = [], [] |
| season_train_n: dict[int, int] = {} |
| season_test: dict[int, tuple] = {} |
|
|
| for season in seasons: |
| sdf = df[df["season"] == season].copy() |
| target = self.create_horizon_target(sdf, horizon_name) |
| sdf = sdf.copy() |
| sdf["target"] = target.values |
| sdf = sdf.dropna(subset=self.feature_cols + ["target"]) |
|
|
| if len(sdf) < 30: |
| continue |
|
|
| n = int(len(sdf) * train_ratio) |
| if n < 10 or len(sdf) - n < 5: |
| continue |
|
|
| X_tr = sdf[self.feature_cols].iloc[:n] |
| y_tr = sdf["target"].iloc[:n] |
| X_te = sdf[self.feature_cols].iloc[n:] |
| y_te = sdf["target"].iloc[n:] |
|
|
| all_X_train.append(X_tr) |
| all_y_train.append(y_tr) |
| all_X_test.append(X_te) |
| all_y_test.append(y_te) |
| season_train_n[season] = len(X_tr) |
| season_test[season] = (X_te, y_te) |
|
|
| if not all_X_train or not all_X_test: |
| print(f" {horizon_name}: insufficient data across seasons, skipping") |
| continue |
|
|
| X_train = pd.concat(all_X_train) |
| y_train = pd.concat(all_y_train) |
| X_test = pd.concat(all_X_test) |
| y_test = pd.concat(all_y_test) |
|
|
| model = self._make_model() |
| model.fit(X_train, y_train) |
| self.models[horizon_name] = model |
|
|
| |
| pred = model.predict(X_test) |
| rmse = float(np.sqrt(mean_squared_error(y_test, pred))) |
| mae = float(mean_absolute_error(y_test, pred)) |
| r2 = float(r2_score(y_test, pred)) |
| self.results[horizon_name] = { |
| "predictions": pred, "y_test": y_test.values, |
| "rmse": rmse, "mae": mae, "r2": r2, |
| "n_train": len(X_train), "n_test": len(X_test), |
| } |
| rows.append({ |
| "horizon": horizon_name, "season": "all", |
| "approach": "time_series", |
| "RMSE": round(rmse, 4), "MAE": round(mae, 4), "R2": round(r2, 4), |
| "n_train": len(X_train), "n_test": len(X_test), |
| }) |
| print(f" {horizon_name} [all]: RMSE={rmse:.4f} MAE={mae:.4f} " |
| f"R²={r2:.4f} (train={len(X_train)}, test={len(X_test)})") |
|
|
| |
| for season, (X_te_s, y_te_s) in season_test.items(): |
| pred_s = model.predict(X_te_s) |
| rmse_s = float(np.sqrt(mean_squared_error(y_te_s, pred_s))) |
| mae_s = float(mean_absolute_error(y_te_s, pred_s)) |
| r2_s = float(r2_score(y_te_s, pred_s)) |
| rows.append({ |
| "horizon": horizon_name, "season": str(season), |
| "approach": "time_series", |
| "RMSE": round(rmse_s, 4), "MAE": round(mae_s, 4), |
| "R2": round(r2_s, 4), |
| "n_train": season_train_n[season], |
| "n_test": len(X_te_s), |
| }) |
| print(f" {horizon_name} [{season}]: RMSE={rmse_s:.4f} " |
| f"MAE={mae_s:.4f} R²={r2_s:.4f} (test={len(X_te_s)})") |
|
|
| return pd.DataFrame(rows) |
|
|
| def get_comparison_with_baseline( |
| self, baseline_metrics: pd.DataFrame |
| ) -> pd.DataFrame: |
| """Combine TS horizon results with cross-sectional baseline into one table.""" |
| ts_rows = [] |
| for horizon_name, res in self.results.items(): |
| ts_rows.append({ |
| "horizon": horizon_name, "season": "all", |
| "approach": "time_series", |
| "RMSE": round(res["rmse"], 4), |
| "MAE": round(res["mae"], 4), |
| "R2": round(res["r2"], 4), |
| }) |
| ts_df = pd.DataFrame(ts_rows) |
|
|
| |
| if not baseline_metrics.empty: |
| best_idx = baseline_metrics["RMSE"].idxmin() |
| best = baseline_metrics.loc[best_idx] |
| bl_rows = [] |
| for h in HORIZONS: |
| bl_rows.append({ |
| "horizon": h, "season": "all", |
| "approach": f"cross_sectional ({best['model']})", |
| "RMSE": round(float(best["RMSE"]), 4), |
| "MAE": round(float(best["MAE"]), 4), |
| "R2": round(float(best["R2"]), 4), |
| }) |
| bl_df = pd.DataFrame(bl_rows) |
| return pd.concat([ts_df, bl_df], ignore_index=True) |
| return ts_df |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def _make_model(): |
| if _HAS_XGB: |
| return XGBRegressor( |
| n_estimators=300, max_depth=4, learning_rate=0.05, |
| min_child_weight=10, reg_alpha=0.1, reg_lambda=1.0, |
| n_jobs=-1, random_state=42, |
| ) |
| return GradientBoostingRegressor( |
| n_estimators=300, max_depth=4, learning_rate=0.05, |
| min_samples_leaf=10, random_state=42, |
| ) |
|
|