api / src /chatbot /routing_agent.py
Eli Safra
Deploy SolarWine API (FastAPI + Docker, port 7860)
938949f
"""
RoutingAgent: Gemini-based intelligent model routing for the agrivoltaic
control system. Given real-time telemetry, routes to either the FvCB
mechanistic model or the ML ensemble for photosynthesis prediction.
Uses gemini-2.5-flash for low-latency (~100ms) routing decisions.
"""
from __future__ import annotations
from typing import Optional
from src.genai_utils import get_genai_client, get_google_api_key
SYSTEM_PROMPT = (
"You are a model routing supervisor for an agrivoltaic vineyard control system. "
"Given real-time telemetry, decide which photosynthesis model to use:\n"
"- MODEL_A (FvCB mechanistic): accurate under standard conditions (T<30C, low stress)\n"
"- MODEL_B (ML ensemble): handles non-linear stress, high VPD, extreme heat\n"
"Reply with ONLY 'MODEL_A' or 'MODEL_B'."
)
class RoutingAgent:
"""Model router for FvCB vs ML ensemble selection.
Uses deterministic rules first (covers >90% of cases without any API call).
Falls back to Gemini only for ambiguous transition-zone conditions.
"""
# Thresholds for rule-based routing (avoids API calls)
_TEMP_CLEAR_FVCB = 28.0 # clearly FvCB territory
_TEMP_CLEAR_ML = 32.0 # clearly ML territory
_VPD_CLEAR_ML = 2.5 # high VPD → ML
_CWSI_CLEAR_ML = 0.4 # water stress → ML
def __init__(
self,
model_name: str = "gemini-2.5-flash",
api_key: Optional[str] = None,
):
self.model_name = model_name
self._api_key = api_key
self._client = None
@property
def api_key(self) -> str:
return get_google_api_key(self._api_key)
@property
def client(self):
"""Lazy-init the Gemini client."""
if self._client is None:
self._client = get_genai_client(self._api_key)
return self._client
# ------------------------------------------------------------------
# Rule-based fast path (no API call)
# ------------------------------------------------------------------
@classmethod
def _rule_based_route(cls, telemetry: dict) -> Optional[str]:
"""Return 'fvcb' or 'ml' if rules are decisive, else None."""
temp = telemetry.get("temp_c")
vpd = telemetry.get("vpd")
cwsi = telemetry.get("cwsi")
# High stress signals → ML (no ambiguity)
if temp is not None and temp >= cls._TEMP_CLEAR_ML:
return "ml"
if vpd is not None and vpd >= cls._VPD_CLEAR_ML:
return "ml"
if cwsi is not None and cwsi >= cls._CWSI_CLEAR_ML:
return "ml"
# Clearly cool/calm → FvCB
if temp is not None and temp < cls._TEMP_CLEAR_FVCB:
if vpd is None or vpd < cls._VPD_CLEAR_ML:
if cwsi is None or cwsi < cls._CWSI_CLEAR_ML:
return "fvcb"
return None # transition zone — need LLM
# ------------------------------------------------------------------
# Gemini routing (only for ambiguous cases)
# ------------------------------------------------------------------
@staticmethod
def _format_telemetry(telemetry: dict) -> str:
"""Format telemetry dict into a readable prompt string."""
lines = ["Current telemetry:"]
field_labels = {
"temp_c": "Air temperature",
"ghi_w_m2": "GHI (irradiance)",
"cwsi": "CWSI (crop water stress)",
"vpd": "VPD (vapor pressure deficit)",
"wind_speed_ms": "Wind speed",
"hour": "Hour of day",
}
for key, label in field_labels.items():
if key in telemetry:
val = telemetry[key]
lines.append(f" {label}: {val}")
return "\n".join(lines)
@staticmethod
def _parse_response(text: str) -> str:
"""Extract model choice from Gemini response.
Returns 'fvcb' or 'ml'. Falls back to 'fvcb' on ambiguous response.
"""
text_upper = text.strip().upper()
if "MODEL_B" in text_upper:
return "ml"
return "fvcb"
def route(self, telemetry: dict) -> str:
"""Route a single telemetry reading to fvcb or ml.
Uses deterministic rules first; only calls Gemini for ambiguous cases.
Parameters
----------
telemetry : dict with keys like temp_c, ghi_w_m2, cwsi, vpd,
wind_speed_ms, hour
Returns
-------
'fvcb' or 'ml'
"""
# Fast path: rule-based (no API call)
rule_result = self._rule_based_route(telemetry)
if rule_result is not None:
return rule_result
# Slow path: Gemini for transition-zone ambiguity
prompt = self._format_telemetry(telemetry)
try:
response = self.client.models.generate_content(
model=self.model_name,
contents=prompt,
config={"system_instruction": SYSTEM_PROMPT},
)
return self._parse_response(response.text)
except Exception as e:
print(f"RoutingAgent: API error ({e}), falling back to fvcb")
return "fvcb"
def route_batch(self, telemetry_rows: list[dict]) -> list[str]:
"""Route a batch of telemetry readings.
Uses rule-based routing where possible; batches remaining ambiguous
rows into a single Gemini call.
"""
results = [None] * len(telemetry_rows)
ambiguous_indices = []
# First pass: rule-based
for i, row in enumerate(telemetry_rows):
rule_result = self._rule_based_route(row)
if rule_result is not None:
results[i] = rule_result
else:
ambiguous_indices.append(i)
# Second pass: single batched Gemini call for ambiguous rows
if ambiguous_indices:
lines = [
"Route each of the following telemetry readings to MODEL_A or MODEL_B.",
"Reply with one line per reading: '<index>: MODEL_A' or '<index>: MODEL_B'.",
"",
]
for idx in ambiguous_indices:
lines.append(f"Reading {idx}: {self._format_telemetry(telemetry_rows[idx])}")
lines.append("")
try:
response = self.client.models.generate_content(
model=self.model_name,
contents="\n".join(lines),
config={"system_instruction": SYSTEM_PROMPT},
)
resp_text = response.text.upper()
for idx in ambiguous_indices:
# Look for this index's answer in the response
if f"{idx}: MODEL_B" in resp_text or f"{idx}:MODEL_B" in resp_text:
results[idx] = "ml"
else:
results[idx] = "fvcb"
except Exception as e:
print(f"RoutingAgent: batch API error ({e}), falling back to fvcb")
for idx in ambiguous_indices:
results[idx] = "fvcb"
return results
# ----------------------------------------------------------------------
# CLI entry point
# ----------------------------------------------------------------------
if __name__ == "__main__":
sample_scenarios = [
{
"name": "Cool morning",
"telemetry": {
"temp_c": 22.0, "ghi_w_m2": 350.0, "cwsi": 0.15,
"vpd": 0.8, "wind_speed_ms": 2.0, "hour": 8,
},
},
{
"name": "Hot afternoon, high stress",
"telemetry": {
"temp_c": 38.0, "ghi_w_m2": 950.0, "cwsi": 0.72,
"vpd": 3.5, "wind_speed_ms": 1.0, "hour": 14,
},
},
{
"name": "Moderate conditions",
"telemetry": {
"temp_c": 29.5, "ghi_w_m2": 680.0, "cwsi": 0.35,
"vpd": 1.8, "wind_speed_ms": 3.0, "hour": 11,
},
},
]
agent = RoutingAgent()
print("Gemini Routing Agent — Sample Scenarios\n")
for scenario in sample_scenarios:
choice = agent.route(scenario["telemetry"])
model_label = "FvCB (mechanistic)" if choice == "fvcb" else "ML ensemble"
print(f" {scenario['name']:30s} → {choice:4s} ({model_label})")