| """ |
| RoutingAgent: Gemini-based intelligent model routing for the agrivoltaic |
| control system. Given real-time telemetry, routes to either the FvCB |
| mechanistic model or the ML ensemble for photosynthesis prediction. |
| |
| Uses gemini-2.5-flash for low-latency (~100ms) routing decisions. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from typing import Optional |
|
|
| from src.genai_utils import get_genai_client, get_google_api_key |
|
|
| SYSTEM_PROMPT = ( |
| "You are a model routing supervisor for an agrivoltaic vineyard control system. " |
| "Given real-time telemetry, decide which photosynthesis model to use:\n" |
| "- MODEL_A (FvCB mechanistic): accurate under standard conditions (T<30C, low stress)\n" |
| "- MODEL_B (ML ensemble): handles non-linear stress, high VPD, extreme heat\n" |
| "Reply with ONLY 'MODEL_A' or 'MODEL_B'." |
| ) |
|
|
|
|
| class RoutingAgent: |
| """Model router for FvCB vs ML ensemble selection. |
| |
| Uses deterministic rules first (covers >90% of cases without any API call). |
| Falls back to Gemini only for ambiguous transition-zone conditions. |
| """ |
|
|
| |
| _TEMP_CLEAR_FVCB = 28.0 |
| _TEMP_CLEAR_ML = 32.0 |
| _VPD_CLEAR_ML = 2.5 |
| _CWSI_CLEAR_ML = 0.4 |
|
|
| def __init__( |
| self, |
| model_name: str = "gemini-2.5-flash", |
| api_key: Optional[str] = None, |
| ): |
| self.model_name = model_name |
| self._api_key = api_key |
| self._client = None |
|
|
| @property |
| def api_key(self) -> str: |
| return get_google_api_key(self._api_key) |
|
|
| @property |
| def client(self): |
| """Lazy-init the Gemini client.""" |
| if self._client is None: |
| self._client = get_genai_client(self._api_key) |
| return self._client |
|
|
| |
| |
| |
|
|
| @classmethod |
| def _rule_based_route(cls, telemetry: dict) -> Optional[str]: |
| """Return 'fvcb' or 'ml' if rules are decisive, else None.""" |
| temp = telemetry.get("temp_c") |
| vpd = telemetry.get("vpd") |
| cwsi = telemetry.get("cwsi") |
|
|
| |
| if temp is not None and temp >= cls._TEMP_CLEAR_ML: |
| return "ml" |
| if vpd is not None and vpd >= cls._VPD_CLEAR_ML: |
| return "ml" |
| if cwsi is not None and cwsi >= cls._CWSI_CLEAR_ML: |
| return "ml" |
|
|
| |
| if temp is not None and temp < cls._TEMP_CLEAR_FVCB: |
| if vpd is None or vpd < cls._VPD_CLEAR_ML: |
| if cwsi is None or cwsi < cls._CWSI_CLEAR_ML: |
| return "fvcb" |
|
|
| return None |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def _format_telemetry(telemetry: dict) -> str: |
| """Format telemetry dict into a readable prompt string.""" |
| lines = ["Current telemetry:"] |
| field_labels = { |
| "temp_c": "Air temperature", |
| "ghi_w_m2": "GHI (irradiance)", |
| "cwsi": "CWSI (crop water stress)", |
| "vpd": "VPD (vapor pressure deficit)", |
| "wind_speed_ms": "Wind speed", |
| "hour": "Hour of day", |
| } |
| for key, label in field_labels.items(): |
| if key in telemetry: |
| val = telemetry[key] |
| lines.append(f" {label}: {val}") |
| return "\n".join(lines) |
|
|
| @staticmethod |
| def _parse_response(text: str) -> str: |
| """Extract model choice from Gemini response. |
| |
| Returns 'fvcb' or 'ml'. Falls back to 'fvcb' on ambiguous response. |
| """ |
| text_upper = text.strip().upper() |
| if "MODEL_B" in text_upper: |
| return "ml" |
| return "fvcb" |
|
|
| def route(self, telemetry: dict) -> str: |
| """Route a single telemetry reading to fvcb or ml. |
| |
| Uses deterministic rules first; only calls Gemini for ambiguous cases. |
| |
| Parameters |
| ---------- |
| telemetry : dict with keys like temp_c, ghi_w_m2, cwsi, vpd, |
| wind_speed_ms, hour |
| |
| Returns |
| ------- |
| 'fvcb' or 'ml' |
| """ |
| |
| rule_result = self._rule_based_route(telemetry) |
| if rule_result is not None: |
| return rule_result |
|
|
| |
| prompt = self._format_telemetry(telemetry) |
| try: |
| response = self.client.models.generate_content( |
| model=self.model_name, |
| contents=prompt, |
| config={"system_instruction": SYSTEM_PROMPT}, |
| ) |
| return self._parse_response(response.text) |
| except Exception as e: |
| print(f"RoutingAgent: API error ({e}), falling back to fvcb") |
| return "fvcb" |
|
|
| def route_batch(self, telemetry_rows: list[dict]) -> list[str]: |
| """Route a batch of telemetry readings. |
| |
| Uses rule-based routing where possible; batches remaining ambiguous |
| rows into a single Gemini call. |
| """ |
| results = [None] * len(telemetry_rows) |
| ambiguous_indices = [] |
|
|
| |
| for i, row in enumerate(telemetry_rows): |
| rule_result = self._rule_based_route(row) |
| if rule_result is not None: |
| results[i] = rule_result |
| else: |
| ambiguous_indices.append(i) |
|
|
| |
| if ambiguous_indices: |
| lines = [ |
| "Route each of the following telemetry readings to MODEL_A or MODEL_B.", |
| "Reply with one line per reading: '<index>: MODEL_A' or '<index>: MODEL_B'.", |
| "", |
| ] |
| for idx in ambiguous_indices: |
| lines.append(f"Reading {idx}: {self._format_telemetry(telemetry_rows[idx])}") |
| lines.append("") |
|
|
| try: |
| response = self.client.models.generate_content( |
| model=self.model_name, |
| contents="\n".join(lines), |
| config={"system_instruction": SYSTEM_PROMPT}, |
| ) |
| resp_text = response.text.upper() |
| for idx in ambiguous_indices: |
| |
| if f"{idx}: MODEL_B" in resp_text or f"{idx}:MODEL_B" in resp_text: |
| results[idx] = "ml" |
| else: |
| results[idx] = "fvcb" |
| except Exception as e: |
| print(f"RoutingAgent: batch API error ({e}), falling back to fvcb") |
| for idx in ambiguous_indices: |
| results[idx] = "fvcb" |
|
|
| return results |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| sample_scenarios = [ |
| { |
| "name": "Cool morning", |
| "telemetry": { |
| "temp_c": 22.0, "ghi_w_m2": 350.0, "cwsi": 0.15, |
| "vpd": 0.8, "wind_speed_ms": 2.0, "hour": 8, |
| }, |
| }, |
| { |
| "name": "Hot afternoon, high stress", |
| "telemetry": { |
| "temp_c": 38.0, "ghi_w_m2": 950.0, "cwsi": 0.72, |
| "vpd": 3.5, "wind_speed_ms": 1.0, "hour": 14, |
| }, |
| }, |
| { |
| "name": "Moderate conditions", |
| "telemetry": { |
| "temp_c": 29.5, "ghi_w_m2": 680.0, "cwsi": 0.35, |
| "vpd": 1.8, "wind_speed_ms": 3.0, "hour": 11, |
| }, |
| }, |
| ] |
|
|
| agent = RoutingAgent() |
| print("Gemini Routing Agent — Sample Scenarios\n") |
|
|
| for scenario in sample_scenarios: |
| choice = agent.route(scenario["telemetry"]) |
| model_label = "FvCB (mechanistic)" if choice == "fvcb" else "ML ensemble" |
| print(f" {scenario['name']:30s} → {choice:4s} ({model_label})") |
|
|