| """ |
| Data flow monitor — checks IMS and ThingsBoard data freshness. |
| |
| Returns per-source status (green / yellow / red) with age and messages. |
| Used by the /api/health/data-sources endpoint and the email alerter. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| from datetime import datetime, timezone |
| from typing import Any |
|
|
| from config.settings import ( |
| IMS_STALE_YELLOW_MIN, |
| IMS_STALE_RED_MIN, |
| TB_STALE_YELLOW_MIN, |
| TB_STALE_RED_MIN, |
| ENERGY_STALE_YELLOW_MIN, |
| ENERGY_STALE_RED_MIN, |
| ) |
| from src.data.data_providers import DataHub |
|
|
| log = logging.getLogger("solarwine.monitor") |
|
|
|
|
| def _classify(age_minutes: float | None, yellow: float, red: float) -> str: |
| """Return green / yellow / red based on age thresholds.""" |
| if age_minutes is None: |
| return "red" |
| if age_minutes < yellow: |
| return "green" |
| if age_minutes < red: |
| return "yellow" |
| return "red" |
|
|
|
|
| def _status_message(source: str, status: str, age: float | None) -> str: |
| if status == "green": |
| return f"{source} data is fresh" |
| if age is not None: |
| return f"{source} data is {age:.0f} min old" |
| return f"{source} data is unavailable" |
|
|
|
|
| class DataFlowMonitor: |
| """Computes per-source data flow status.""" |
|
|
| def check_all(self, hub: DataHub) -> dict[str, Any]: |
| now_iso = datetime.now(timezone.utc).isoformat() |
| sources: dict[str, dict] = {} |
|
|
| |
| sources["ims_weather"] = self._check_ims(hub) |
|
|
| |
| sources["tb_sensors"] = self._check_tb_sensors(hub) |
|
|
| |
| sources["tb_energy"] = self._check_tb_energy(hub) |
|
|
| |
| statuses = [s["status"] for s in sources.values()] |
| if "red" in statuses: |
| overall = "red" |
| elif "yellow" in statuses: |
| overall = "yellow" |
| else: |
| overall = "green" |
|
|
| return { |
| "overall": overall, |
| "checked_at": now_iso, |
| "sources": sources, |
| } |
|
|
| def _check_ims(self, hub: DataHub) -> dict: |
| try: |
| wx = hub.weather.get_current() |
| if wx and "error" not in wx: |
| age = float(wx.get("age_minutes", -1)) |
| if age < 0: |
| log.warning("IMS weather returned negative age_minutes — data provider issue") |
| age = None |
| status = _classify(age, IMS_STALE_YELLOW_MIN, IMS_STALE_RED_MIN) |
| return { |
| "status": status, |
| "age_minutes": round(age, 1) if age is not None else None, |
| "last_reading": wx.get("timestamp_local") or wx.get("timestamp_utc"), |
| "message": _status_message("IMS weather", status, age), |
| } |
| return { |
| "status": "red", |
| "age_minutes": None, |
| "last_reading": None, |
| "message": f"IMS weather error: {wx.get('error', 'unavailable')}", |
| } |
| except Exception as exc: |
| log.warning("IMS health check failed: %s", exc) |
| return { |
| "status": "red", |
| "age_minutes": None, |
| "last_reading": None, |
| "message": f"IMS weather unreachable: {exc}", |
| } |
|
|
| def _check_tb_sensors(self, hub: DataHub) -> dict: |
| try: |
| snap = hub.vine_sensors.get_snapshot(light=True) |
| if snap and "error" not in snap: |
| stale = snap.get("staleness_minutes") |
| age = float(stale) if stale is not None else None |
| status = _classify(age, TB_STALE_YELLOW_MIN, TB_STALE_RED_MIN) |
| return { |
| "status": status, |
| "age_minutes": round(age, 1) if age is not None else None, |
| "last_reading": snap.get("timestamp"), |
| "message": _status_message("ThingsBoard sensors", status, age), |
| } |
| return { |
| "status": "red", |
| "age_minutes": None, |
| "last_reading": None, |
| "message": f"TB sensors error: {snap.get('error', 'unavailable')}", |
| } |
| except Exception as exc: |
| log.warning("TB sensors health check failed: %s", exc) |
| return { |
| "status": "red", |
| "age_minutes": None, |
| "last_reading": None, |
| "message": f"TB sensors unreachable: {exc}", |
| } |
|
|
| def _check_tb_energy(self, hub: DataHub) -> dict: |
| try: |
| en = hub.energy.get_current() |
| if en and "error" not in en: |
| power = en.get("power_kw") |
| |
| age = float(en["age_minutes"]) if "age_minutes" in en else None |
| status = _classify(age, ENERGY_STALE_YELLOW_MIN, ENERGY_STALE_RED_MIN) |
| if age is None: |
| |
| status = "green" |
| return { |
| "status": status, |
| "age_minutes": round(age, 1) if age is not None else None, |
| "power_kw": round(float(power), 2) if power is not None else None, |
| "message": _status_message("Energy telemetry", status, age), |
| } |
| return { |
| "status": "red", |
| "age_minutes": None, |
| "power_kw": None, |
| "message": f"Energy error: {en.get('error', 'unavailable')}", |
| } |
| except Exception as exc: |
| log.warning("Energy health check failed: %s", exc) |
| return { |
| "status": "red", |
| "age_minutes": None, |
| "power_kw": None, |
| "message": f"Energy unreachable: {exc}", |
| } |
|
|