""" OperationalModes: weather protection, heat shield, and harvest mode. These override normal engine output at the P1/P2 priority level in the CommandArbiter. Each mode returns a dict that the arbiter recognises as ``weather_override`` or ``harvest_active``. Modes ----- - **WindStow**: panels go flat (0°) when wind exceeds threshold. - **HailStow**: panels go flat (0°) during hail events. - **HeatShield**: maximum shading offset regardless of budget when air temperature AND CWSI exceed emergency thresholds. - **HarvestMode**: panels park vertical (90°) for machine clearance. Activated/deactivated manually or by schedule. """ from __future__ import annotations import logging from dataclasses import dataclass from datetime import date, datetime, time, timezone from typing import Optional from config.settings import ( HEAT_SHIELD_CWSI, HEAT_SHIELD_TEMP_C, WIND_STOW_SPEED_MS, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Mode results # --------------------------------------------------------------------------- @dataclass class ModeOverride: """Result from an operational mode check.""" active: bool mode: str # "wind_stow" | "hail_stow" | "heat_shield" | "harvest" target_angle: float # angle to command reason: str = "" bypass_budget: bool = False # True → ignore energy budget bypass_hysteresis: bool = False # True → skip hysteresis filter def to_weather_override(self) -> Optional[dict]: """Convert to the dict format CommandArbiter.arbitrate() expects.""" if not self.active: return None return { "target_angle": self.target_angle, "reason": self.reason, } # --------------------------------------------------------------------------- # Individual mode checkers # --------------------------------------------------------------------------- def check_wind_stow( wind_speed_ms: float, threshold: float = WIND_STOW_SPEED_MS, ) -> ModeOverride: """Flat stow if wind exceeds threshold.""" if wind_speed_ms >= threshold: return ModeOverride( active=True, mode="wind_stow", target_angle=0.0, reason=f"wind stow: {wind_speed_ms:.1f} m/s >= {threshold:.0f} m/s", bypass_budget=True, bypass_hysteresis=True, ) return ModeOverride(active=False, mode="wind_stow", target_angle=0.0) def check_hail_stow(hail_detected: bool) -> ModeOverride: """Flat stow during hail.""" if hail_detected: return ModeOverride( active=True, mode="hail_stow", target_angle=0.0, reason="hail detected — flat stow", bypass_budget=True, bypass_hysteresis=True, ) return ModeOverride(active=False, mode="hail_stow", target_angle=0.0) def check_heat_shield( air_temp_c: float, cwsi: Optional[float] = None, temp_threshold: float = HEAT_SHIELD_TEMP_C, cwsi_threshold: float = HEAT_SHIELD_CWSI, max_offset_deg: float = 20.0, theta_astro: float = 0.0, ) -> ModeOverride: """Emergency heat shield: maximum shade offset regardless of budget. Activates when BOTH air temperature AND CWSI exceed their thresholds. If CWSI is unavailable, activates on temperature alone at +2°C above threshold. """ temp_exceeded = air_temp_c >= temp_threshold if cwsi is not None: cwsi_exceeded = cwsi >= cwsi_threshold activate = temp_exceeded and cwsi_exceeded reason = (f"heat shield: T={air_temp_c:.1f}°C >= {temp_threshold:.0f}°C, " f"CWSI={cwsi:.2f} >= {cwsi_threshold:.2f}") else: # Without CWSI, require a higher temperature activate = air_temp_c >= temp_threshold + 2.0 reason = (f"heat shield (no CWSI): T={air_temp_c:.1f}°C >= " f"{temp_threshold + 2.0:.0f}°C") if activate: return ModeOverride( active=True, mode="heat_shield", target_angle=theta_astro + max_offset_deg, reason=reason, bypass_budget=True, bypass_hysteresis=False, ) return ModeOverride(active=False, mode="heat_shield", target_angle=0.0) # --------------------------------------------------------------------------- # Harvest mode # --------------------------------------------------------------------------- class HarvestMode: """Manages harvest parking state. Harvest mode is a manual toggle (operator activates it before sending machines into the vineyard). Can also be scheduled. """ def __init__(self): self._active = False self._scheduled_dates: list[date] = [] def activate(self) -> None: self._active = True logger.info("Harvest mode ACTIVATED — panels will park vertical") def deactivate(self) -> None: self._active = False logger.info("Harvest mode DEACTIVATED — normal control resumed") def set_schedule(self, dates: list[date]) -> None: self._scheduled_dates = sorted(dates) logger.info("Harvest schedule set: %s", [str(d) for d in dates]) def check(self, current_date: Optional[date] = None) -> ModeOverride: today = current_date or date.today() active = self._active or today in self._scheduled_dates if active: return ModeOverride( active=True, mode="harvest", target_angle=90.0, reason="harvest mode — panels parked vertical for machine clearance", bypass_budget=True, bypass_hysteresis=True, ) return ModeOverride(active=False, mode="harvest", target_angle=0.0) # --------------------------------------------------------------------------- # Composite checker # --------------------------------------------------------------------------- class OperationalModeChecker: """Run all mode checks in priority order. Returns the highest-priority active mode, or None if all clear. Priority: wind_stow > hail_stow > harvest > heat_shield """ def __init__(self): self.harvest = HarvestMode() def check_all( self, wind_speed_ms: Optional[float] = None, hail_detected: bool = False, air_temp_c: Optional[float] = None, cwsi: Optional[float] = None, theta_astro: float = 0.0, current_date: Optional[date] = None, ) -> Optional[ModeOverride]: """Check all operational modes in priority order. Returns the first active override, or None. """ # P1a: Wind stow if wind_speed_ms is not None: result = check_wind_stow(wind_speed_ms) if result.active: logger.warning("Mode override: %s", result.reason) return result # P1b: Hail stow if hail_detected: result = check_hail_stow(True) if result.active: logger.warning("Mode override: %s", result.reason) return result # P2: Harvest harvest_result = self.harvest.check(current_date) if harvest_result.active: logger.info("Mode override: %s", harvest_result.reason) return harvest_result # Heat shield (lower priority than harvest — don't shade # while machines are in the vineyard) if air_temp_c is not None: result = check_heat_shield( air_temp_c=air_temp_c, cwsi=cwsi, theta_astro=theta_astro, ) if result.active: logger.warning("Mode override: %s", result.reason) return result return None