api / src /operational_modes.py
Eli Safra
Deploy SolarWine API (FastAPI + Docker, port 7860)
938949f
"""
OperationalModes: weather protection, heat shield, and harvest mode.
These override normal engine output at the P1/P2 priority level
in the CommandArbiter. Each mode returns a dict that the arbiter
recognises as ``weather_override`` or ``harvest_active``.
Modes
-----
- **WindStow**: panels go flat (0°) when wind exceeds threshold.
- **HailStow**: panels go flat (0°) during hail events.
- **HeatShield**: maximum shading offset regardless of budget
when air temperature AND CWSI exceed emergency thresholds.
- **HarvestMode**: panels park vertical (90°) for machine clearance.
Activated/deactivated manually or by schedule.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import date, datetime, time, timezone
from typing import Optional
from config.settings import (
HEAT_SHIELD_CWSI,
HEAT_SHIELD_TEMP_C,
WIND_STOW_SPEED_MS,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Mode results
# ---------------------------------------------------------------------------
@dataclass
class ModeOverride:
"""Result from an operational mode check."""
active: bool
mode: str # "wind_stow" | "hail_stow" | "heat_shield" | "harvest"
target_angle: float # angle to command
reason: str = ""
bypass_budget: bool = False # True → ignore energy budget
bypass_hysteresis: bool = False # True → skip hysteresis filter
def to_weather_override(self) -> Optional[dict]:
"""Convert to the dict format CommandArbiter.arbitrate() expects."""
if not self.active:
return None
return {
"target_angle": self.target_angle,
"reason": self.reason,
}
# ---------------------------------------------------------------------------
# Individual mode checkers
# ---------------------------------------------------------------------------
def check_wind_stow(
wind_speed_ms: float,
threshold: float = WIND_STOW_SPEED_MS,
) -> ModeOverride:
"""Flat stow if wind exceeds threshold."""
if wind_speed_ms >= threshold:
return ModeOverride(
active=True,
mode="wind_stow",
target_angle=0.0,
reason=f"wind stow: {wind_speed_ms:.1f} m/s >= {threshold:.0f} m/s",
bypass_budget=True,
bypass_hysteresis=True,
)
return ModeOverride(active=False, mode="wind_stow", target_angle=0.0)
def check_hail_stow(hail_detected: bool) -> ModeOverride:
"""Flat stow during hail."""
if hail_detected:
return ModeOverride(
active=True,
mode="hail_stow",
target_angle=0.0,
reason="hail detected — flat stow",
bypass_budget=True,
bypass_hysteresis=True,
)
return ModeOverride(active=False, mode="hail_stow", target_angle=0.0)
def check_heat_shield(
air_temp_c: float,
cwsi: Optional[float] = None,
temp_threshold: float = HEAT_SHIELD_TEMP_C,
cwsi_threshold: float = HEAT_SHIELD_CWSI,
max_offset_deg: float = 20.0,
theta_astro: float = 0.0,
) -> ModeOverride:
"""Emergency heat shield: maximum shade offset regardless of budget.
Activates when BOTH air temperature AND CWSI exceed their thresholds.
If CWSI is unavailable, activates on temperature alone at +2°C above threshold.
"""
temp_exceeded = air_temp_c >= temp_threshold
if cwsi is not None:
cwsi_exceeded = cwsi >= cwsi_threshold
activate = temp_exceeded and cwsi_exceeded
reason = (f"heat shield: T={air_temp_c:.1f}°C >= {temp_threshold:.0f}°C, "
f"CWSI={cwsi:.2f} >= {cwsi_threshold:.2f}")
else:
# Without CWSI, require a higher temperature
activate = air_temp_c >= temp_threshold + 2.0
reason = (f"heat shield (no CWSI): T={air_temp_c:.1f}°C >= "
f"{temp_threshold + 2.0:.0f}°C")
if activate:
return ModeOverride(
active=True,
mode="heat_shield",
target_angle=theta_astro + max_offset_deg,
reason=reason,
bypass_budget=True,
bypass_hysteresis=False,
)
return ModeOverride(active=False, mode="heat_shield", target_angle=0.0)
# ---------------------------------------------------------------------------
# Harvest mode
# ---------------------------------------------------------------------------
class HarvestMode:
"""Manages harvest parking state.
Harvest mode is a manual toggle (operator activates it before
sending machines into the vineyard). Can also be scheduled.
"""
def __init__(self):
self._active = False
self._scheduled_dates: list[date] = []
def activate(self) -> None:
self._active = True
logger.info("Harvest mode ACTIVATED — panels will park vertical")
def deactivate(self) -> None:
self._active = False
logger.info("Harvest mode DEACTIVATED — normal control resumed")
def set_schedule(self, dates: list[date]) -> None:
self._scheduled_dates = sorted(dates)
logger.info("Harvest schedule set: %s", [str(d) for d in dates])
def check(self, current_date: Optional[date] = None) -> ModeOverride:
today = current_date or date.today()
active = self._active or today in self._scheduled_dates
if active:
return ModeOverride(
active=True,
mode="harvest",
target_angle=90.0,
reason="harvest mode — panels parked vertical for machine clearance",
bypass_budget=True,
bypass_hysteresis=True,
)
return ModeOverride(active=False, mode="harvest", target_angle=0.0)
# ---------------------------------------------------------------------------
# Composite checker
# ---------------------------------------------------------------------------
class OperationalModeChecker:
"""Run all mode checks in priority order.
Returns the highest-priority active mode, or None if all clear.
Priority: wind_stow > hail_stow > harvest > heat_shield
"""
def __init__(self):
self.harvest = HarvestMode()
def check_all(
self,
wind_speed_ms: Optional[float] = None,
hail_detected: bool = False,
air_temp_c: Optional[float] = None,
cwsi: Optional[float] = None,
theta_astro: float = 0.0,
current_date: Optional[date] = None,
) -> Optional[ModeOverride]:
"""Check all operational modes in priority order.
Returns the first active override, or None.
"""
# P1a: Wind stow
if wind_speed_ms is not None:
result = check_wind_stow(wind_speed_ms)
if result.active:
logger.warning("Mode override: %s", result.reason)
return result
# P1b: Hail stow
if hail_detected:
result = check_hail_stow(True)
if result.active:
logger.warning("Mode override: %s", result.reason)
return result
# P2: Harvest
harvest_result = self.harvest.check(current_date)
if harvest_result.active:
logger.info("Mode override: %s", harvest_result.reason)
return harvest_result
# Heat shield (lower priority than harvest — don't shade
# while machines are in the vineyard)
if air_temp_c is not None:
result = check_heat_shield(
air_temp_c=air_temp_c,
cwsi=cwsi,
theta_astro=theta_astro,
)
if result.active:
logger.warning("Mode override: %s", result.reason)
return result
return None