""" RoutingAgent: Gemini-based intelligent model routing for the agrivoltaic control system. Given real-time telemetry, routes to either the FvCB mechanistic model or the ML ensemble for photosynthesis prediction. Uses gemini-2.5-flash for low-latency (~100ms) routing decisions. """ from __future__ import annotations from typing import Optional from src.genai_utils import get_genai_client, get_google_api_key SYSTEM_PROMPT = ( "You are a model routing supervisor for an agrivoltaic vineyard control system. " "Given real-time telemetry, decide which photosynthesis model to use:\n" "- MODEL_A (FvCB mechanistic): accurate under standard conditions (T<30C, low stress)\n" "- MODEL_B (ML ensemble): handles non-linear stress, high VPD, extreme heat\n" "Reply with ONLY 'MODEL_A' or 'MODEL_B'." ) class RoutingAgent: """Model router for FvCB vs ML ensemble selection. Uses deterministic rules first (covers >90% of cases without any API call). Falls back to Gemini only for ambiguous transition-zone conditions. """ # Thresholds for rule-based routing (avoids API calls) _TEMP_CLEAR_FVCB = 28.0 # clearly FvCB territory _TEMP_CLEAR_ML = 32.0 # clearly ML territory _VPD_CLEAR_ML = 2.5 # high VPD → ML _CWSI_CLEAR_ML = 0.4 # water stress → ML def __init__( self, model_name: str = "gemini-2.5-flash", api_key: Optional[str] = None, ): self.model_name = model_name self._api_key = api_key self._client = None @property def api_key(self) -> str: return get_google_api_key(self._api_key) @property def client(self): """Lazy-init the Gemini client.""" if self._client is None: self._client = get_genai_client(self._api_key) return self._client # ------------------------------------------------------------------ # Rule-based fast path (no API call) # ------------------------------------------------------------------ @classmethod def _rule_based_route(cls, telemetry: dict) -> Optional[str]: """Return 'fvcb' or 'ml' if rules are decisive, else None.""" temp = telemetry.get("temp_c") vpd = telemetry.get("vpd") cwsi = telemetry.get("cwsi") # High stress signals → ML (no ambiguity) if temp is not None and temp >= cls._TEMP_CLEAR_ML: return "ml" if vpd is not None and vpd >= cls._VPD_CLEAR_ML: return "ml" if cwsi is not None and cwsi >= cls._CWSI_CLEAR_ML: return "ml" # Clearly cool/calm → FvCB if temp is not None and temp < cls._TEMP_CLEAR_FVCB: if vpd is None or vpd < cls._VPD_CLEAR_ML: if cwsi is None or cwsi < cls._CWSI_CLEAR_ML: return "fvcb" return None # transition zone — need LLM # ------------------------------------------------------------------ # Gemini routing (only for ambiguous cases) # ------------------------------------------------------------------ @staticmethod def _format_telemetry(telemetry: dict) -> str: """Format telemetry dict into a readable prompt string.""" lines = ["Current telemetry:"] field_labels = { "temp_c": "Air temperature", "ghi_w_m2": "GHI (irradiance)", "cwsi": "CWSI (crop water stress)", "vpd": "VPD (vapor pressure deficit)", "wind_speed_ms": "Wind speed", "hour": "Hour of day", } for key, label in field_labels.items(): if key in telemetry: val = telemetry[key] lines.append(f" {label}: {val}") return "\n".join(lines) @staticmethod def _parse_response(text: str) -> str: """Extract model choice from Gemini response. Returns 'fvcb' or 'ml'. Falls back to 'fvcb' on ambiguous response. """ text_upper = text.strip().upper() if "MODEL_B" in text_upper: return "ml" return "fvcb" def route(self, telemetry: dict) -> str: """Route a single telemetry reading to fvcb or ml. Uses deterministic rules first; only calls Gemini for ambiguous cases. Parameters ---------- telemetry : dict with keys like temp_c, ghi_w_m2, cwsi, vpd, wind_speed_ms, hour Returns ------- 'fvcb' or 'ml' """ # Fast path: rule-based (no API call) rule_result = self._rule_based_route(telemetry) if rule_result is not None: return rule_result # Slow path: Gemini for transition-zone ambiguity prompt = self._format_telemetry(telemetry) try: response = self.client.models.generate_content( model=self.model_name, contents=prompt, config={"system_instruction": SYSTEM_PROMPT}, ) return self._parse_response(response.text) except Exception as e: print(f"RoutingAgent: API error ({e}), falling back to fvcb") return "fvcb" def route_batch(self, telemetry_rows: list[dict]) -> list[str]: """Route a batch of telemetry readings. Uses rule-based routing where possible; batches remaining ambiguous rows into a single Gemini call. """ results = [None] * len(telemetry_rows) ambiguous_indices = [] # First pass: rule-based for i, row in enumerate(telemetry_rows): rule_result = self._rule_based_route(row) if rule_result is not None: results[i] = rule_result else: ambiguous_indices.append(i) # Second pass: single batched Gemini call for ambiguous rows if ambiguous_indices: lines = [ "Route each of the following telemetry readings to MODEL_A or MODEL_B.", "Reply with one line per reading: ': MODEL_A' or ': MODEL_B'.", "", ] for idx in ambiguous_indices: lines.append(f"Reading {idx}: {self._format_telemetry(telemetry_rows[idx])}") lines.append("") try: response = self.client.models.generate_content( model=self.model_name, contents="\n".join(lines), config={"system_instruction": SYSTEM_PROMPT}, ) resp_text = response.text.upper() for idx in ambiguous_indices: # Look for this index's answer in the response if f"{idx}: MODEL_B" in resp_text or f"{idx}:MODEL_B" in resp_text: results[idx] = "ml" else: results[idx] = "fvcb" except Exception as e: print(f"RoutingAgent: batch API error ({e}), falling back to fvcb") for idx in ambiguous_indices: results[idx] = "fvcb" return results # ---------------------------------------------------------------------- # CLI entry point # ---------------------------------------------------------------------- if __name__ == "__main__": sample_scenarios = [ { "name": "Cool morning", "telemetry": { "temp_c": 22.0, "ghi_w_m2": 350.0, "cwsi": 0.15, "vpd": 0.8, "wind_speed_ms": 2.0, "hour": 8, }, }, { "name": "Hot afternoon, high stress", "telemetry": { "temp_c": 38.0, "ghi_w_m2": 950.0, "cwsi": 0.72, "vpd": 3.5, "wind_speed_ms": 1.0, "hour": 14, }, }, { "name": "Moderate conditions", "telemetry": { "temp_c": 29.5, "ghi_w_m2": 680.0, "cwsi": 0.35, "vpd": 1.8, "wind_speed_ms": 3.0, "hour": 11, }, }, ] agent = RoutingAgent() print("Gemini Routing Agent — Sample Scenarios\n") for scenario in sample_scenarios: choice = agent.route(scenario["telemetry"]) model_label = "FvCB (mechanistic)" if choice == "fvcb" else "ML ensemble" print(f" {scenario['name']:30s} → {choice:4s} ({model_label})")