| """ |
| OperationalModes: weather protection, heat shield, and harvest mode. |
| |
| These override normal engine output at the P1/P2 priority level |
| in the CommandArbiter. Each mode returns a dict that the arbiter |
| recognises as ``weather_override`` or ``harvest_active``. |
| |
| Modes |
| ----- |
| - **WindStow**: panels go flat (0°) when wind exceeds threshold. |
| - **HailStow**: panels go flat (0°) during hail events. |
| - **HeatShield**: maximum shading offset regardless of budget |
| when air temperature AND CWSI exceed emergency thresholds. |
| - **HarvestMode**: panels park vertical (90°) for machine clearance. |
| Activated/deactivated manually or by schedule. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| from dataclasses import dataclass |
| from datetime import date, datetime, time, timezone |
| from typing import Optional |
|
|
| from config.settings import ( |
| HEAT_SHIELD_CWSI, |
| HEAT_SHIELD_TEMP_C, |
| WIND_STOW_SPEED_MS, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class ModeOverride: |
| """Result from an operational mode check.""" |
|
|
| active: bool |
| mode: str |
| target_angle: float |
| reason: str = "" |
| bypass_budget: bool = False |
| bypass_hysteresis: bool = False |
|
|
| def to_weather_override(self) -> Optional[dict]: |
| """Convert to the dict format CommandArbiter.arbitrate() expects.""" |
| if not self.active: |
| return None |
| return { |
| "target_angle": self.target_angle, |
| "reason": self.reason, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def check_wind_stow( |
| wind_speed_ms: float, |
| threshold: float = WIND_STOW_SPEED_MS, |
| ) -> ModeOverride: |
| """Flat stow if wind exceeds threshold.""" |
| if wind_speed_ms >= threshold: |
| return ModeOverride( |
| active=True, |
| mode="wind_stow", |
| target_angle=0.0, |
| reason=f"wind stow: {wind_speed_ms:.1f} m/s >= {threshold:.0f} m/s", |
| bypass_budget=True, |
| bypass_hysteresis=True, |
| ) |
| return ModeOverride(active=False, mode="wind_stow", target_angle=0.0) |
|
|
|
|
| def check_hail_stow(hail_detected: bool) -> ModeOverride: |
| """Flat stow during hail.""" |
| if hail_detected: |
| return ModeOverride( |
| active=True, |
| mode="hail_stow", |
| target_angle=0.0, |
| reason="hail detected — flat stow", |
| bypass_budget=True, |
| bypass_hysteresis=True, |
| ) |
| return ModeOverride(active=False, mode="hail_stow", target_angle=0.0) |
|
|
|
|
| def check_heat_shield( |
| air_temp_c: float, |
| cwsi: Optional[float] = None, |
| temp_threshold: float = HEAT_SHIELD_TEMP_C, |
| cwsi_threshold: float = HEAT_SHIELD_CWSI, |
| max_offset_deg: float = 20.0, |
| theta_astro: float = 0.0, |
| ) -> ModeOverride: |
| """Emergency heat shield: maximum shade offset regardless of budget. |
| |
| Activates when BOTH air temperature AND CWSI exceed their thresholds. |
| If CWSI is unavailable, activates on temperature alone at +2°C above threshold. |
| """ |
| temp_exceeded = air_temp_c >= temp_threshold |
|
|
| if cwsi is not None: |
| cwsi_exceeded = cwsi >= cwsi_threshold |
| activate = temp_exceeded and cwsi_exceeded |
| reason = (f"heat shield: T={air_temp_c:.1f}°C >= {temp_threshold:.0f}°C, " |
| f"CWSI={cwsi:.2f} >= {cwsi_threshold:.2f}") |
| else: |
| |
| activate = air_temp_c >= temp_threshold + 2.0 |
| reason = (f"heat shield (no CWSI): T={air_temp_c:.1f}°C >= " |
| f"{temp_threshold + 2.0:.0f}°C") |
|
|
| if activate: |
| return ModeOverride( |
| active=True, |
| mode="heat_shield", |
| target_angle=theta_astro + max_offset_deg, |
| reason=reason, |
| bypass_budget=True, |
| bypass_hysteresis=False, |
| ) |
| return ModeOverride(active=False, mode="heat_shield", target_angle=0.0) |
|
|
|
|
| |
| |
| |
|
|
| class HarvestMode: |
| """Manages harvest parking state. |
| |
| Harvest mode is a manual toggle (operator activates it before |
| sending machines into the vineyard). Can also be scheduled. |
| """ |
|
|
| def __init__(self): |
| self._active = False |
| self._scheduled_dates: list[date] = [] |
|
|
| def activate(self) -> None: |
| self._active = True |
| logger.info("Harvest mode ACTIVATED — panels will park vertical") |
|
|
| def deactivate(self) -> None: |
| self._active = False |
| logger.info("Harvest mode DEACTIVATED — normal control resumed") |
|
|
| def set_schedule(self, dates: list[date]) -> None: |
| self._scheduled_dates = sorted(dates) |
| logger.info("Harvest schedule set: %s", [str(d) for d in dates]) |
|
|
| def check(self, current_date: Optional[date] = None) -> ModeOverride: |
| today = current_date or date.today() |
| active = self._active or today in self._scheduled_dates |
| if active: |
| return ModeOverride( |
| active=True, |
| mode="harvest", |
| target_angle=90.0, |
| reason="harvest mode — panels parked vertical for machine clearance", |
| bypass_budget=True, |
| bypass_hysteresis=True, |
| ) |
| return ModeOverride(active=False, mode="harvest", target_angle=0.0) |
|
|
|
|
| |
| |
| |
|
|
| class OperationalModeChecker: |
| """Run all mode checks in priority order. |
| |
| Returns the highest-priority active mode, or None if all clear. |
| |
| Priority: wind_stow > hail_stow > harvest > heat_shield |
| """ |
|
|
| def __init__(self): |
| self.harvest = HarvestMode() |
|
|
| def check_all( |
| self, |
| wind_speed_ms: Optional[float] = None, |
| hail_detected: bool = False, |
| air_temp_c: Optional[float] = None, |
| cwsi: Optional[float] = None, |
| theta_astro: float = 0.0, |
| current_date: Optional[date] = None, |
| ) -> Optional[ModeOverride]: |
| """Check all operational modes in priority order. |
| |
| Returns the first active override, or None. |
| """ |
| |
| if wind_speed_ms is not None: |
| result = check_wind_stow(wind_speed_ms) |
| if result.active: |
| logger.warning("Mode override: %s", result.reason) |
| return result |
|
|
| |
| if hail_detected: |
| result = check_hail_stow(True) |
| if result.active: |
| logger.warning("Mode override: %s", result.reason) |
| return result |
|
|
| |
| harvest_result = self.harvest.check(current_date) |
| if harvest_result.active: |
| logger.info("Mode override: %s", harvest_result.reason) |
| return harvest_result |
|
|
| |
| |
| if air_temp_c is not None: |
| result = check_heat_shield( |
| air_temp_c=air_temp_c, |
| cwsi=cwsi, |
| theta_astro=theta_astro, |
| ) |
| if result.active: |
| logger.warning("Mode override: %s", result.reason) |
| return result |
|
|
| return None |
|
|