api / src /tracker_dispatcher.py
safraeli's picture
Update src/tracker_dispatcher.py: circuit breaker, weather fix, TB health fix
cfddb41 verified
"""
TrackerDispatcher: send tilt commands to physical trackers and verify execution.
Sits downstream of the CommandArbiter. When an ArbiterDecision has
``dispatch=True``, the dispatcher:
1. Sends the target angle to all 4 trackers via ThingsBoard RPC.
2. Waits briefly, then reads actual ``angle`` telemetry.
3. Confirms |actual − target| < tolerance for each tracker.
4. Returns a DispatchResult with per-tracker status.
If RPC is unavailable (e.g. customer-level API), falls back to
shared-attribute writes (``setAngle``, ``setMode``).
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Dict, List, Optional
from config.settings import ANGLE_TOLERANCE_DEG, TRACKER_ID_MAP
logger = logging.getLogger(__name__)
# All tracker device names from the canonical ID map
TRACKER_NAMES = list(TRACKER_ID_MAP.values())
# ---------------------------------------------------------------------------
# Result containers
# ---------------------------------------------------------------------------
@dataclass
class TrackerResult:
"""Execution result for a single tracker."""
device_name: str
target_angle: float
actual_angle: Optional[float] = None
error: Optional[str] = None
verified: bool = False # True if |actual − target| < tolerance
method: str = "unknown" # "rpc", "attribute", "dry_run"
@dataclass
class DispatchResult:
"""Aggregate result for dispatching to all trackers."""
timestamp: datetime
target_angle: float
source: str # from ArbiterDecision.source
trackers: List[TrackerResult] = field(default_factory=list)
all_verified: bool = False
dry_run: bool = False
@property
def n_success(self) -> int:
return sum(1 for t in self.trackers if t.verified)
@property
def n_failed(self) -> int:
return sum(1 for t in self.trackers if t.error)
def summary(self) -> str:
if self.dry_run:
return f"[DRY RUN] target={self.target_angle:.1f}°, source={self.source}"
return (
f"target={self.target_angle:.1f}°, source={self.source}, "
f"verified={self.n_success}/{len(self.trackers)}"
)
# ---------------------------------------------------------------------------
# Dispatcher
# ---------------------------------------------------------------------------
class TrackerDispatcher:
"""Send angle commands to trackers and verify execution.
Parameters
----------
tb_client : ThingsBoardClient, optional
Lazy-initialised if not provided.
tracker_names : list[str], optional
Override which trackers to control (default: all 4).
verify_timeout_sec : float
How long to wait before reading back actual angles.
angle_tolerance_deg : float
Maximum acceptable |actual − target| for verification.
dry_run : bool
If True, log commands but don't actually send them.
"""
def __init__(
self,
tb_client=None,
tracker_names: Optional[List[str]] = None,
verify_timeout_sec: float = 5.0,
angle_tolerance_deg: float = ANGLE_TOLERANCE_DEG,
dry_run: bool = False,
):
self._tb = tb_client
self.tracker_names = tracker_names or TRACKER_NAMES
self.verify_timeout = verify_timeout_sec
self.tolerance = angle_tolerance_deg
self.dry_run = dry_run
def _client(self):
if self._tb is None:
from src.data.thingsboard_client import ThingsBoardClient
self._tb = ThingsBoardClient()
return self._tb
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def dispatch(
self,
decision,
angle_overrides: Optional[Dict[str, float]] = None,
) -> DispatchResult:
"""Send an ArbiterDecision to all trackers.
Parameters
----------
decision : ArbiterDecision
Must have ``dispatch=True`` and ``angle`` set.
angle_overrides : dict, optional
Per-tracker angle overrides (device_name → angle).
Trackers not in this dict get the default ``decision.angle``.
Typically empty — all trackers follow the same pattern.
Returns
-------
DispatchResult with per-tracker verification status.
"""
now = datetime.now(tz=timezone.utc)
result = DispatchResult(
timestamp=now,
target_angle=decision.angle,
source=str(decision.source),
dry_run=self.dry_run,
)
if not decision.dispatch:
logger.debug("Decision dispatch=False, skipping: %s", decision)
return result
for name in self.tracker_names:
angle = decision.angle
if angle_overrides and name in angle_overrides:
angle = angle_overrides[name]
tr = self._send_to_tracker(name, angle)
result.trackers.append(tr)
# Verify after a brief wait (let controller process the command)
if not self.dry_run and any(t.error is None for t in result.trackers):
time.sleep(self.verify_timeout)
self._verify_all(result)
result.all_verified = all(t.verified for t in result.trackers)
logger.info("Dispatch: %s", result.summary())
return result
def read_current_angles(self) -> Dict[str, Optional[float]]:
"""Read actual angle from all trackers. Useful for status display."""
angles = {}
for name in self.tracker_names:
try:
vals = self._client().get_latest_telemetry(name, ["angle"])
angles[name] = vals.get("angle")
except Exception as exc:
logger.warning("Failed to read %s angle: %s", name, exc)
angles[name] = None
return angles
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _send_to_tracker(self, device_name: str, angle: float) -> TrackerResult:
"""Send angle command to a single tracker."""
tr = TrackerResult(device_name=device_name, target_angle=angle)
if self.dry_run:
tr.method = "dry_run"
tr.verified = True
logger.info("[DRY RUN] %s → %.1f°", device_name, angle)
return tr
client = self._client()
# Shared attribute write is the reliable method — tracker controllers
# poll setAngle/setMode from shared attributes on their update cycle.
# RPC requires the device to be online for real-time communication,
# which is not guaranteed.
# Retry attribute write with exponential backoff (1s, 2s)
attr_exc = None
for attempt in range(3):
try:
client.set_device_attributes(device_name, {"setAngle": angle, "setMode": "manual"})
tr.method = "attribute"
return tr
except Exception as exc:
attr_exc = exc
if attempt < 2:
time.sleep(1 << attempt) # 1s, 2s
logger.warning("Attribute write retry %d for %s: %s", attempt + 1, device_name, exc)
logger.warning("Attribute write failed for %s after 3 attempts: %s, trying RPC", device_name, attr_exc)
# Fallback: try RPC (may timeout if device is offline)
try:
client.send_rpc_command(device_name, "setAngle", angle)
tr.method = "rpc"
return tr
except Exception as rpc_exc:
tr.error = f"attribute (3 retries) and RPC both failed: {attr_exc}; {rpc_exc}"
logger.error("Cannot send to %s: %s", device_name, tr.error)
return tr
def _verify_all(self, result: DispatchResult) -> None:
"""Read actual angles and compare to target."""
for tr in result.trackers:
if tr.error:
continue
try:
vals = self._client().get_latest_telemetry(tr.device_name, ["angle"])
tr.actual_angle = vals.get("angle")
if tr.actual_angle is not None:
diff = abs(tr.actual_angle - tr.target_angle)
tr.verified = diff <= self.tolerance
if not tr.verified:
logger.warning(
"%s: actual=%.1f° target=%.1f° (diff=%.1f° > tol=%.1f°)",
tr.device_name, tr.actual_angle, tr.target_angle,
diff, self.tolerance,
)
except Exception as exc:
logger.warning("Verify failed for %s: %s", tr.device_name, exc)
tr.error = f"verification read failed: {exc}"