""" Data flow monitor — checks IMS and ThingsBoard data freshness. Returns per-source status (green / yellow / red) with age and messages. Used by the /api/health/data-sources endpoint and the email alerter. """ from __future__ import annotations import logging from datetime import datetime, timezone from typing import Any from config.settings import ( IMS_STALE_YELLOW_MIN, IMS_STALE_RED_MIN, TB_STALE_YELLOW_MIN, TB_STALE_RED_MIN, ENERGY_STALE_YELLOW_MIN, ENERGY_STALE_RED_MIN, ) from src.data.data_providers import DataHub log = logging.getLogger("solarwine.monitor") def _classify(age_minutes: float | None, yellow: float, red: float) -> str: """Return green / yellow / red based on age thresholds.""" if age_minutes is None: return "red" if age_minutes < yellow: return "green" if age_minutes < red: return "yellow" return "red" def _status_message(source: str, status: str, age: float | None) -> str: if status == "green": return f"{source} data is fresh" if age is not None: return f"{source} data is {age:.0f} min old" return f"{source} data is unavailable" class DataFlowMonitor: """Computes per-source data flow status.""" def check_all(self, hub: DataHub) -> dict[str, Any]: now_iso = datetime.now(timezone.utc).isoformat() sources: dict[str, dict] = {} # --- IMS Weather --- sources["ims_weather"] = self._check_ims(hub) # --- ThingsBoard Sensors --- sources["tb_sensors"] = self._check_tb_sensors(hub) # --- ThingsBoard Energy --- sources["tb_energy"] = self._check_tb_energy(hub) # Overall status: worst of all sources statuses = [s["status"] for s in sources.values()] if "red" in statuses: overall = "red" elif "yellow" in statuses: overall = "yellow" else: overall = "green" return { "overall": overall, "checked_at": now_iso, "sources": sources, } def _check_ims(self, hub: DataHub) -> dict: try: wx = hub.weather.get_current() if wx and "error" not in wx: age = float(wx.get("age_minutes", -1)) if age < 0: log.warning("IMS weather returned negative age_minutes — data provider issue") age = None status = _classify(age, IMS_STALE_YELLOW_MIN, IMS_STALE_RED_MIN) return { "status": status, "age_minutes": round(age, 1) if age is not None else None, "last_reading": wx.get("timestamp_local") or wx.get("timestamp_utc"), "message": _status_message("IMS weather", status, age), } return { "status": "red", "age_minutes": None, "last_reading": None, "message": f"IMS weather error: {wx.get('error', 'unavailable')}", } except Exception as exc: log.warning("IMS health check failed: %s", exc) return { "status": "red", "age_minutes": None, "last_reading": None, "message": f"IMS weather unreachable: {exc}", } def _check_tb_sensors(self, hub: DataHub) -> dict: try: snap = hub.vine_sensors.get_snapshot(light=True) if snap and "error" not in snap: stale = snap.get("staleness_minutes") age = float(stale) if stale is not None else None status = _classify(age, TB_STALE_YELLOW_MIN, TB_STALE_RED_MIN) return { "status": status, "age_minutes": round(age, 1) if age is not None else None, "last_reading": snap.get("timestamp"), "message": _status_message("ThingsBoard sensors", status, age), } return { "status": "red", "age_minutes": None, "last_reading": None, "message": f"TB sensors error: {snap.get('error', 'unavailable')}", } except Exception as exc: log.warning("TB sensors health check failed: %s", exc) return { "status": "red", "age_minutes": None, "last_reading": None, "message": f"TB sensors unreachable: {exc}", } def _check_tb_energy(self, hub: DataHub) -> dict: try: en = hub.energy.get_current() if en and "error" not in en: power = en.get("power_kw") # Energy doesn't always expose age_minutes — infer from cache TTL age = float(en["age_minutes"]) if "age_minutes" in en else None status = _classify(age, ENERGY_STALE_YELLOW_MIN, ENERGY_STALE_RED_MIN) if age is None: # If no age but we have data, assume green (just fetched) status = "green" return { "status": status, "age_minutes": round(age, 1) if age is not None else None, "power_kw": round(float(power), 2) if power is not None else None, "message": _status_message("Energy telemetry", status, age), } return { "status": "red", "age_minutes": None, "power_kw": None, "message": f"Energy error: {en.get('error', 'unavailable')}", } except Exception as exc: log.warning("Energy health check failed: %s", exc) return { "status": "red", "age_minutes": None, "power_kw": None, "message": f"Energy unreachable: {exc}", }