| """ |
| ChronosForecaster: Day-ahead photosynthesis (A) forecasting using Amazon |
| Chronos-2 foundation model with native covariate support and optional |
| LoRA fine-tuning. |
| |
| Improvement history: |
| v1: Broken — daytime-only rows with hidden gaps → MAE ~8.5 |
| v2: Regular 15-min grid + predict_df + daytime eval → MAE ~1.75 (20w) |
| v3: + On-site sensor covariates (PAR, VPD, T_leaf, CO2) |
| + 14-day context (captures ~2 weeks of diurnal pattern) |
| + LoRA fine-tuning (1000 steps, lr=1e-4) |
| + Configurable covariate modes for ablation |
| → MAE 1.37 (May), 3.0-3.4 (Jun-Sep), overall beats ML baseline (2.7) |
| v4: Revisited input features: added engineered time (hour_sin/cos, doy_sin/cos) and |
| stress_risk_ims (VPD from IMS T+RH) in load_data; tried extended IMS (tdmax/tdmin). |
| Ablation on current data: best zero-shot = sensor (MAE ~3.86) or all (MAE ~3.91, R² 0.52). |
| Time/stress as covariates slightly hurt; kept 4-col IMS + sensor for \"all\". |
| """ |
|
|
| from __future__ import annotations |
|
|
| from pathlib import Path |
| from typing import Optional |
|
|
| import numpy as np |
| import pandas as pd |
| import torch |
| from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score |
|
|
| from config.settings import ( |
| PROCESSED_DIR, IMS_CACHE_DIR, OUTPUTS_DIR, GROWING_SEASON_MONTHS, |
| ) |
| from src.time_features import add_cyclical_time_features |
|
|
| |
| |
| |
|
|
| |
| |
| IMS_COVARIATE_COLS = [ |
| "ghi_w_m2", "air_temperature_c", "rh_percent", "wind_speed_ms", |
| ] |
|
|
| |
| SENSOR_COVARIATE_COLS = [ |
| "PAR_site", "VPD_site", "T_leaf_site", "CO2_site", |
| ] |
|
|
| |
| TIME_COVARIATE_COLS = ["hour_sin", "hour_cos", "doy_sin", "doy_cos"] |
|
|
| |
| STRESS_COVARIATE_COL = "stress_risk_ims" |
|
|
| |
| _SENSOR_COL_MAP = { |
| "Air1_PAR_ref": "PAR_site", |
| "Air1_VPD_ref": "VPD_site", |
| "Air1_leafTemperature_ref": "T_leaf_site", |
| "Air1_CO2_ref": "CO2_site", |
| } |
|
|
| FREQ = "15min" |
| STEPS_PER_DAY = 96 |
|
|
| |
| def _vpd_from_ims_kpa(T_c: np.ndarray, rh_percent: np.ndarray) -> np.ndarray: |
| """Saturation vapour pressure (kPa) then VPD = esat * (1 - RH/100).""" |
| esat = 0.611 * np.exp(17.27 * T_c / (T_c + 237.3)) |
| return esat * (1.0 - np.clip(rh_percent, 0, 100) / 100.0) |
|
|
|
|
| |
| |
| COVARIATE_MODES = { |
| "none": {"past": [], "future": []}, |
| "ims": {"past": IMS_COVARIATE_COLS, "future": IMS_COVARIATE_COLS}, |
| "sensor": {"past": SENSOR_COVARIATE_COLS, "future": []}, |
| "all": { |
| "past": IMS_COVARIATE_COLS + SENSOR_COVARIATE_COLS, |
| "future": IMS_COVARIATE_COLS, |
| }, |
| } |
|
|
|
|
| class ChronosForecaster: |
| """Day-ahead A forecaster using Chronos-2 with configurable covariates.""" |
|
|
| def __init__( |
| self, |
| model_name: str = "amazon/chronos-2", |
| device: str = "mps", |
| context_days: int = 14, |
| ): |
| self.model_name = model_name |
| self.device = device |
| self.context_steps = context_days * STEPS_PER_DAY |
| self._pipeline = None |
|
|
| @property |
| def pipeline(self): |
| """Lazy-load Chronos-2 pipeline on first use.""" |
| if self._pipeline is None: |
| from chronos import Chronos2Pipeline |
|
|
| self._pipeline = Chronos2Pipeline.from_pretrained( |
| self.model_name, |
| device_map=self.device, |
| dtype=torch.float32, |
| ) |
| return self._pipeline |
|
|
| @pipeline.setter |
| def pipeline(self, value): |
| """Allow setting pipeline (e.g. after fine-tuning).""" |
| self._pipeline = value |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def load_data( |
| labels_path: Optional[Path] = None, |
| ims_path: Optional[Path] = None, |
| sensor_path: Optional[Path] = None, |
| growing_season_only: bool = True, |
| ) -> pd.DataFrame: |
| """Load labels + IMS + on-site sensors, merge, resample to regular grid. |
| |
| Growing-season-only mode (default) drops Oct-Apr dormancy months, |
| concatenating seasons into a continuous series with season boundaries |
| marked by a 'season' column. |
| """ |
| from config.settings import DATA_DIR, SEYMOUR_DIR |
|
|
| labels_path = labels_path or PROCESSED_DIR / "stage1_labels.csv" |
| ims_path = ims_path or IMS_CACHE_DIR / "ims_merged_15min.csv" |
| sensor_path = sensor_path or SEYMOUR_DIR / "sensors_wide.csv" |
|
|
| |
| labels = pd.read_csv(labels_path, parse_dates=["time"]) |
| labels.rename(columns={"time": "timestamp_utc"}, inplace=True) |
| labels["timestamp_utc"] = pd.to_datetime(labels["timestamp_utc"], utc=True) |
|
|
| |
| ims = pd.read_csv(ims_path, parse_dates=["timestamp_utc"]) |
| ims["timestamp_utc"] = pd.to_datetime(ims["timestamp_utc"], utc=True) |
|
|
| |
| raw_cols = ["time"] + list(_SENSOR_COL_MAP.keys()) |
| sensors = pd.read_csv(sensor_path, usecols=raw_cols, parse_dates=["time"]) |
| sensors.rename(columns={"time": "timestamp_utc", **_SENSOR_COL_MAP}, inplace=True) |
| sensors["timestamp_utc"] = pd.to_datetime(sensors["timestamp_utc"], utc=True) |
|
|
| |
| merged = labels.merge(ims, on="timestamp_utc", how="inner") |
| merged = merged.merge(sensors, on="timestamp_utc", how="left") |
| merged.sort_values("timestamp_utc", inplace=True) |
| merged.set_index("timestamp_utc", inplace=True) |
|
|
| |
| full_idx = pd.date_range( |
| merged.index.min(), merged.index.max(), freq=FREQ, tz="UTC", |
| ) |
| resampled = merged.reindex(full_idx) |
| resampled.index.name = "timestamp_utc" |
|
|
| |
| resampled["A"] = resampled["A"].fillna(0.0) |
| all_cov_cols = [ |
| c for c in IMS_COVARIATE_COLS + SENSOR_COVARIATE_COLS |
| if c in resampled.columns |
| ] |
| for col in all_cov_cols: |
| resampled[col] = ( |
| resampled[col].interpolate(method="time").ffill().bfill() |
| ) |
| if col in ("ghi_w_m2", "PAR_site"): |
| resampled[col] = resampled[col].clip(lower=0) |
|
|
| |
| resampled = add_cyclical_time_features(resampled, index_is_timestamp=True) |
|
|
| |
| if "air_temperature_c" in resampled.columns and "rh_percent" in resampled.columns: |
| vpd_ims = _vpd_from_ims_kpa( |
| resampled["air_temperature_c"].values, |
| resampled["rh_percent"].values, |
| ) |
| resampled[STRESS_COVARIATE_COL] = np.clip(vpd_ims / 6.0, 0.0, 1.0) |
|
|
| resampled.reset_index(inplace=True) |
|
|
| |
| if growing_season_only: |
| resampled["month"] = resampled["timestamp_utc"].dt.month |
| resampled = resampled[ |
| resampled["month"].isin(GROWING_SEASON_MONTHS) |
| ].copy() |
| resampled.drop(columns=["month"], inplace=True) |
| resampled.reset_index(drop=True, inplace=True) |
|
|
| |
| resampled["season"] = resampled["timestamp_utc"].dt.year |
|
|
| return resampled |
|
|
| @staticmethod |
| def load_sparse_data( |
| labels_path: Optional[Path] = None, |
| ims_path: Optional[Path] = None, |
| ) -> pd.DataFrame: |
| """Load original daytime-only merged data (no resampling). |
| Used to identify daytime timestamps for evaluation masking. |
| """ |
| labels_path = labels_path or PROCESSED_DIR / "stage1_labels.csv" |
| ims_path = ims_path or IMS_CACHE_DIR / "ims_merged_15min.csv" |
|
|
| labels = pd.read_csv(labels_path, parse_dates=["time"]) |
| labels.rename(columns={"time": "timestamp_utc"}, inplace=True) |
| labels["timestamp_utc"] = pd.to_datetime(labels["timestamp_utc"], utc=True) |
|
|
| ims = pd.read_csv(ims_path, parse_dates=["timestamp_utc"]) |
| ims["timestamp_utc"] = pd.to_datetime(ims["timestamp_utc"], utc=True) |
|
|
| merged = labels.merge(ims, on="timestamp_utc", how="inner") |
| merged.sort_values("timestamp_utc", inplace=True) |
| merged.reset_index(drop=True, inplace=True) |
| return merged |
|
|
| |
| |
| |
|
|
| def forecast_day( |
| self, |
| df: pd.DataFrame, |
| context_end_idx: int, |
| prediction_length: int = STEPS_PER_DAY, |
| covariate_mode: str = "all", |
| ) -> pd.DataFrame: |
| """Forecast next prediction_length steps using predict_df API. |
| |
| covariate_mode: 'none', 'ims', 'sensor', or 'all' |
| """ |
| mode_cfg = COVARIATE_MODES[covariate_mode] |
| past_cols = [c for c in mode_cfg["past"] if c in df.columns] |
| future_cols = [c for c in mode_cfg["future"] if c in df.columns] |
|
|
| ctx_start = max(0, context_end_idx - self.context_steps) |
| ctx = df.iloc[ctx_start:context_end_idx].copy() |
|
|
| |
| hist = ctx[["timestamp_utc", "A"]].copy() |
| hist.rename(columns={"timestamp_utc": "timestamp", "A": "target"}, inplace=True) |
| hist["item_id"] = "A" |
| for col in past_cols: |
| hist[col] = ctx[col].values |
|
|
| |
| future_df = None |
| if future_cols: |
| fwd = df.iloc[context_end_idx : context_end_idx + prediction_length] |
| if len(fwd) >= prediction_length: |
| future_df = fwd[["timestamp_utc"]].copy() |
| future_df.rename(columns={"timestamp_utc": "timestamp"}, inplace=True) |
| future_df["item_id"] = "A" |
| for col in future_cols: |
| future_df[col] = fwd[col].values |
|
|
| result = self.pipeline.predict_df( |
| df=hist, |
| future_df=future_df, |
| id_column="item_id", |
| timestamp_column="timestamp", |
| target="target", |
| prediction_length=prediction_length, |
| quantile_levels=[0.1, 0.5, 0.9], |
| ) |
|
|
| fwd_timestamps = df["timestamp_utc"].iloc[ |
| context_end_idx : context_end_idx + prediction_length |
| ].values |
|
|
| out = pd.DataFrame({ |
| "timestamp_utc": fwd_timestamps[:len(result)], |
| "median": result["0.5"].values, |
| "low_10": result["0.1"].values, |
| "high_90": result["0.9"].values, |
| }) |
| return out |
|
|
| |
| |
| |
|
|
| def finetune( |
| self, |
| df: pd.DataFrame, |
| train_ratio: float = 0.75, |
| prediction_length: int = STEPS_PER_DAY, |
| covariate_mode: str = "all", |
| num_steps: int = 500, |
| learning_rate: float = 1e-5, |
| batch_size: Optional[int] = None, |
| output_dir: Optional[str] = None, |
| ) -> None: |
| """LoRA fine-tune Chronos-2 on the training portion of the data. |
| |
| Uses the dict API for fit() with past and future covariates. |
| Only the training portion (before train_ratio split) is used — |
| no data leakage. |
| """ |
| split_idx = int(len(df) * train_ratio) |
| train_df = df.iloc[:split_idx].copy() |
|
|
| mode_cfg = COVARIATE_MODES[covariate_mode] |
| past_cols = [c for c in mode_cfg["past"] if c in df.columns] |
| future_cols = [c for c in mode_cfg["future"] if c in df.columns] |
|
|
| |
| |
| min_window = self.context_steps + prediction_length |
| inputs = [] |
|
|
| |
| stride = prediction_length |
| for end_idx in range(min_window, len(train_df), stride): |
| ctx_start = end_idx - min_window |
| ctx_end = end_idx - prediction_length |
|
|
| target = train_df["A"].iloc[ctx_start:ctx_end].values.astype(np.float32) |
| entry: dict = {"target": target} |
|
|
| if past_cols: |
| past_covs = {} |
| for col in past_cols: |
| past_covs[col] = ( |
| train_df[col].iloc[ctx_start:ctx_end].values.astype(np.float32) |
| ) |
| entry["past_covariates"] = past_covs |
|
|
| if future_cols: |
| future_covs = {} |
| for col in future_cols: |
| |
| future_covs[col] = ( |
| train_df[col].iloc[ctx_end:end_idx].values.astype(np.float32) |
| ) |
| entry["future_covariates"] = future_covs |
|
|
| inputs.append(entry) |
|
|
| if not inputs: |
| print("Not enough training data for fine-tuning.") |
| return |
|
|
| |
| val_split = int(len(inputs) * 0.9) |
| train_inputs = inputs[:val_split] |
| val_inputs = inputs[val_split:] if val_split < len(inputs) else None |
|
|
| output_dir = output_dir or str(OUTPUTS_DIR / "chronos_finetuned") |
| effective_batch = batch_size if batch_size is not None else min(32, len(train_inputs)) |
|
|
| print(f"Fine-tuning with LoRA: {len(train_inputs)} train windows, " |
| f"{len(val_inputs) if val_inputs else 0} val windows, " |
| f"{num_steps} steps, batch_size={effective_batch}") |
|
|
| finetuned = self.pipeline.fit( |
| inputs=train_inputs, |
| prediction_length=prediction_length, |
| validation_inputs=val_inputs, |
| finetune_mode="lora", |
| learning_rate=learning_rate, |
| num_steps=num_steps, |
| batch_size=effective_batch, |
| output_dir=output_dir, |
| ) |
|
|
| self.pipeline = finetuned |
| print(f"Fine-tuning complete. Model saved → {output_dir}") |
|
|
| |
| |
| |
|
|
| def benchmark( |
| self, |
| df: Optional[pd.DataFrame] = None, |
| train_ratio: float = 0.75, |
| prediction_length: int = STEPS_PER_DAY, |
| max_test_days: Optional[int] = None, |
| covariate_modes: Optional[list[str]] = None, |
| ) -> pd.DataFrame: |
| """Walk-forward evaluation across covariate modes. |
| |
| Predicts 96 steps (24h) on the regular grid, evaluates ONLY on |
| daytime steps where actual A > 0. |
| """ |
| if df is None: |
| df = self.load_data() |
|
|
| if covariate_modes is None: |
| covariate_modes = ["none", "ims", "sensor", "all"] |
|
|
| sparse = self.load_sparse_data() |
| daytime_timestamps = set(sparse["timestamp_utc"]) |
|
|
| split_idx = int(len(df) * train_ratio) |
| test_starts = list(range(split_idx, len(df) - prediction_length, prediction_length)) |
| if max_test_days is not None: |
| test_starts = test_starts[:max_test_days] |
|
|
| results = {} |
| for mode in covariate_modes: |
| all_actual, all_pred = [], [] |
|
|
| for start_idx in test_starts: |
| forecast_df = self.forecast_day( |
| df, start_idx, prediction_length, covariate_mode=mode, |
| ) |
|
|
| actual_slice = df.iloc[start_idx : start_idx + prediction_length] |
| if len(actual_slice) < prediction_length: |
| continue |
|
|
| daytime_mask = actual_slice["timestamp_utc"].isin(daytime_timestamps).values |
| daytime_mask = daytime_mask[:len(forecast_df)] |
|
|
| if daytime_mask.sum() < 5: |
| continue |
|
|
| actual_day = actual_slice["A"].values[:len(forecast_df)][daytime_mask] |
| pred_day = np.clip(forecast_df["median"].values[daytime_mask], 0, None) |
|
|
| all_actual.append(actual_day) |
| all_pred.append(pred_day) |
|
|
| if not all_actual: |
| continue |
|
|
| actual_flat = np.concatenate(all_actual) |
| pred_flat = np.concatenate(all_pred) |
|
|
| results[mode] = { |
| "MAE": round(float(mean_absolute_error(actual_flat, pred_flat)), 4), |
| "RMSE": round( |
| float(np.sqrt(mean_squared_error(actual_flat, pred_flat))), 4 |
| ), |
| "R2": round(float(r2_score(actual_flat, pred_flat)), 4), |
| "n_windows": len(all_actual), |
| "n_steps": len(actual_flat), |
| } |
| print(f" {mode:12s}: MAE={results[mode]['MAE']:.4f} " |
| f"RMSE={results[mode]['RMSE']:.4f} R²={results[mode]['R2']:.4f} " |
| f"({results[mode]['n_windows']} windows, " |
| f"{results[mode]['n_steps']} daytime steps)") |
|
|
| comparison = pd.DataFrame(results).T |
| comparison.index.name = "mode" |
| comparison.reset_index(inplace=True) |
|
|
| |
| ml_baseline = pd.DataFrame([{ |
| "mode": "ML baseline (best)", |
| "MAE": 2.7, |
| "RMSE": np.nan, |
| "R2": np.nan, |
| "n_windows": np.nan, |
| "n_steps": np.nan, |
| }]) |
| comparison = pd.concat([comparison, ml_baseline], ignore_index=True) |
|
|
| OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) |
| comparison.to_csv(OUTPUTS_DIR / "chronos_benchmark.csv", index=False) |
| print(f"Saved benchmark → {OUTPUTS_DIR / 'chronos_benchmark.csv'}") |
|
|
| return comparison |
|
|
| |
| |
| |
|
|
| def plot_sample_forecast( |
| self, |
| df: Optional[pd.DataFrame] = None, |
| test_day_idx: int = 0, |
| train_ratio: float = 0.75, |
| prediction_length: int = STEPS_PER_DAY, |
| ) -> None: |
| """Generate a sample forecast plot with confidence bands.""" |
| import matplotlib.pyplot as plt |
|
|
| if df is None: |
| df = self.load_data() |
|
|
| split_idx = int(len(df) * train_ratio) |
| start_idx = split_idx + test_day_idx * prediction_length |
|
|
| if start_idx + prediction_length > len(df): |
| print("Not enough data for sample forecast plot.") |
| return |
|
|
| forecast_df = self.forecast_day( |
| df, start_idx, prediction_length, covariate_mode="all", |
| ) |
| actual = df["A"].iloc[start_idx : start_idx + prediction_length].values |
|
|
| fig, ax = plt.subplots(figsize=(12, 5)) |
| hours = np.arange(len(forecast_df)) * 0.25 |
|
|
| ax.plot(hours, actual[:len(forecast_df)], "k-", linewidth=1.5, label="Actual A") |
| ax.plot( |
| hours, np.clip(forecast_df["median"].values, 0, None), |
| "b-", linewidth=1.5, label="Chronos-2 median", |
| ) |
| ax.fill_between( |
| hours, |
| np.clip(forecast_df["low_10"].values, 0, None), |
| forecast_df["high_90"].values, |
| alpha=0.25, color="steelblue", label="10-90% CI", |
| ) |
| ax.set_xlabel("Hours ahead") |
| ax.set_ylabel("A (umol CO2 m-2 s-1)") |
| ax.axhline(0, color="gray", linewidth=0.5, linestyle="--") |
|
|
| ts = df["timestamp_utc"].iloc[start_idx] |
| ax.set_title(f"Chronos-2 Day-Ahead Forecast — {ts:%Y-%m-%d %H:%M}") |
| ax.legend() |
| ax.grid(True, alpha=0.3) |
|
|
| OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) |
| fig.savefig( |
| OUTPUTS_DIR / "chronos_forecast_sample.png", dpi=150, bbox_inches="tight", |
| ) |
| plt.close(fig) |
| print(f"Saved plot → {OUTPUTS_DIR / 'chronos_forecast_sample.png'}") |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import argparse |
|
|
| parser = argparse.ArgumentParser(description="Chronos-2 day-ahead A forecasting") |
| parser.add_argument("--device", default="mps", help="torch device") |
| parser.add_argument("--context-days", type=int, default=14, help="context window in days") |
| parser.add_argument("--max-days", type=int, default=None, help="limit test windows") |
| parser.add_argument("--plot", action="store_true", help="generate sample forecast plot") |
| parser.add_argument( |
| "--finetune", action="store_true", |
| help="LoRA fine-tune before benchmarking", |
| ) |
| parser.add_argument("--ft-steps", type=int, default=500, help="fine-tuning steps") |
| parser.add_argument( |
| "--modes", nargs="+", default=["none", "ims", "sensor", "all"], |
| help="covariate modes to benchmark", |
| ) |
| args = parser.parse_args() |
|
|
| forecaster = ChronosForecaster( |
| device=args.device, context_days=args.context_days, |
| ) |
|
|
| print("Loading data (growing-season grid + on-site sensors)...") |
| df = forecaster.load_data() |
| print(f" Grid: {len(df)} rows, seasons: {sorted(df['season'].unique())}") |
|
|
| if args.finetune: |
| print(f"\nLoRA fine-tuning ({args.ft_steps} steps)...") |
| forecaster.finetune(df, num_steps=args.ft_steps, covariate_mode="all") |
|
|
| print("\nRunning walk-forward benchmark (daytime-only evaluation)...") |
| results = forecaster.benchmark( |
| df, max_test_days=args.max_days, covariate_modes=args.modes, |
| ) |
| print(f"\n{results.to_string(index=False)}") |
|
|
| if args.plot: |
| print("\nGenerating sample forecast plot...") |
| forecaster.plot_sample_forecast(df) |
|
|