| """ |
| BaselinePredictor: hybrid FvCB + ML photosynthesis baseline for day-ahead planning. |
| |
| Provides a single ``predict_day()`` method that: |
| 1. Runs FvCB (Farquhar–Greer–Weedon) for each slot using forecast weather |
| 2. Optionally runs a trained ML model for the same slots |
| 3. Uses the RoutingAgent's rule-based logic to pick the better prediction per slot |
| 4. Returns a 96-slot profile of predicted photosynthesis rate A (µmol CO₂ m⁻² s⁻¹) |
| |
| This feeds into the DayAheadPlanner to estimate crop value for each slot, |
| replacing the current temperature-only heuristic with an actual photosynthesis |
| prediction that captures the Rubisco transition more accurately. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import math |
| from datetime import date |
| from typing import List, Optional |
|
|
| import numpy as np |
|
|
| from config.settings import SEMILLON_TRANSITION_TEMP_C |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class BaselinePredictor: |
| """Hybrid FvCB + ML photosynthesis prediction for day-ahead planning. |
| |
| Parameters |
| ---------- |
| fvcb_model : FarquharModel, optional |
| Lazy-initialised if not provided. |
| ml_predictor : PhotosynthesisPredictor, optional |
| Trained ML model. If None, FvCB-only mode is used. |
| routing_agent : RoutingAgent, optional |
| Model router for per-slot FvCB/ML selection. |
| If None, uses rule-based routing only (no API calls). |
| """ |
|
|
| def __init__( |
| self, |
| fvcb_model=None, |
| ml_predictor=None, |
| routing_agent=None, |
| ): |
| self._fvcb = fvcb_model |
| self._ml = ml_predictor |
| self._router = routing_agent |
|
|
| @property |
| def fvcb(self): |
| if self._fvcb is None: |
| from src.models.farquhar_model import FarquharModel |
| self._fvcb = FarquharModel() |
| return self._fvcb |
|
|
| |
| |
| |
|
|
| def predict_day( |
| self, |
| forecast_temps: List[float], |
| forecast_ghi: List[float], |
| co2_ppm: float = 400.0, |
| rh_pct: float = 40.0, |
| ) -> List[float]: |
| """Predict photosynthesis rate A for each 15-min slot. |
| |
| Parameters |
| ---------- |
| forecast_temps : list of 96 floats |
| Forecast air temperature (°C) per slot. |
| forecast_ghi : list of 96 floats |
| Forecast GHI (W/m²) per slot. |
| co2_ppm : float |
| Atmospheric CO₂ concentration (default 400 ppm). |
| rh_pct : float |
| Relative humidity (%) for VPD estimation (default 40%). |
| |
| Returns |
| ------- |
| list of 96 floats |
| Predicted net photosynthesis A (µmol CO₂ m⁻² s⁻¹) per slot. |
| 0.0 for nighttime slots. |
| """ |
| assert len(forecast_temps) == 96 and len(forecast_ghi) == 96 |
|
|
| |
| fvcb_predictions = self._predict_fvcb( |
| forecast_temps, forecast_ghi, co2_ppm, rh_pct, |
| ) |
|
|
| |
| if self._ml is None: |
| return fvcb_predictions |
|
|
| |
| ml_predictions = self._predict_ml(forecast_temps, forecast_ghi) |
|
|
| |
| predictions = self._route_predictions( |
| forecast_temps, forecast_ghi, |
| fvcb_predictions, ml_predictions, |
| ) |
|
|
| return predictions |
|
|
| |
| |
| |
|
|
| def _predict_fvcb( |
| self, |
| temps: List[float], |
| ghis: List[float], |
| co2_ppm: float, |
| rh_pct: float, |
| ) -> List[float]: |
| """Run FvCB for each slot. Returns 96 A values.""" |
| predictions = [] |
| for i in range(96): |
| temp = temps[i] |
| ghi = ghis[i] |
|
|
| |
| if ghi < 50: |
| predictions.append(0.0) |
| continue |
|
|
| |
| par = ghi * 2.0 |
|
|
| |
| tleaf = temp + 2.0 |
|
|
| |
| vpd = self._estimate_vpd(temp, rh_pct) |
|
|
| try: |
| result = self.fvcb.calc_photosynthesis_semillon( |
| PAR=par, |
| Tleaf=tleaf, |
| CO2=co2_ppm, |
| VPD=vpd, |
| Tair=temp, |
| ) |
| |
| A = result[0] if isinstance(result, tuple) else result |
| predictions.append(max(0.0, float(A))) |
| except Exception as exc: |
| logger.debug("FvCB failed at slot %d: %s", i, exc) |
| predictions.append(0.0) |
|
|
| return predictions |
|
|
| @staticmethod |
| def _estimate_vpd(tair_c: float, rh_pct: float) -> float: |
| """Estimate VPD (kPa) from air temperature and relative humidity.""" |
| |
| es = 0.6108 * math.exp(17.27 * tair_c / (tair_c + 237.3)) |
| ea = es * rh_pct / 100.0 |
| return max(0.0, es - ea) |
|
|
| |
| |
| |
|
|
| def _predict_ml( |
| self, |
| temps: List[float], |
| ghis: List[float], |
| ) -> List[float]: |
| """Run ML model for each slot. Returns 96 A values.""" |
| if self._ml is None: |
| return [0.0] * 96 |
|
|
| try: |
| import pandas as pd |
|
|
| |
| hours = [i * 0.25 for i in range(96)] |
| df = pd.DataFrame({ |
| "air_temperature_c": temps, |
| "ghi_w_m2": ghis, |
| "hour": [int(h) for h in hours], |
| "minute": [int((h % 1) * 60) for h in hours], |
| }) |
|
|
| |
| best_model = None |
| best_mae = float("inf") |
| for name, result in self._ml.results.items(): |
| if result.get("mae", float("inf")) < best_mae: |
| best_mae = result["mae"] |
| best_model = name |
|
|
| if best_model and best_model in self._ml.models: |
| model = self._ml.models[best_model] |
| |
| feature_cols = [c for c in df.columns if c in getattr(model, "feature_names_in_", df.columns)] |
| if feature_cols: |
| preds = model.predict(df[feature_cols]) |
| return [max(0.0, float(p)) for p in preds] |
|
|
| except Exception as exc: |
| logger.warning("ML prediction failed: %s", exc) |
|
|
| return [0.0] * 96 |
|
|
| |
| |
| |
|
|
| def _route_predictions( |
| self, |
| temps: List[float], |
| ghis: List[float], |
| fvcb_preds: List[float], |
| ml_preds: List[float], |
| ) -> List[float]: |
| """Pick FvCB or ML per slot using routing logic.""" |
| from src.chatbot.routing_agent import RoutingAgent |
|
|
| predictions = [] |
| for i in range(96): |
| telemetry = { |
| "temp_c": temps[i], |
| "ghi_w_m2": ghis[i], |
| "hour": i // 4, |
| } |
|
|
| |
| choice = RoutingAgent._rule_based_route(telemetry) |
| if choice is None: |
| |
| a = 0.6 * fvcb_preds[i] + 0.4 * ml_preds[i] |
| elif choice == "ml": |
| a = ml_preds[i] |
| else: |
| a = fvcb_preds[i] |
|
|
| predictions.append(a) |
|
|
| return predictions |
|
|