""" TrackerDispatcher: send tilt commands to physical trackers and verify execution. Sits downstream of the CommandArbiter. When an ArbiterDecision has ``dispatch=True``, the dispatcher: 1. Sends the target angle to all 4 trackers via ThingsBoard RPC. 2. Waits briefly, then reads actual ``angle`` telemetry. 3. Confirms |actual − target| < tolerance for each tracker. 4. Returns a DispatchResult with per-tracker status. If RPC is unavailable (e.g. customer-level API), falls back to shared-attribute writes (``setAngle``, ``setMode``). """ from __future__ import annotations import logging import time from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Dict, List, Optional from config.settings import ANGLE_TOLERANCE_DEG, TRACKER_ID_MAP logger = logging.getLogger(__name__) # All tracker device names from the canonical ID map TRACKER_NAMES = list(TRACKER_ID_MAP.values()) # --------------------------------------------------------------------------- # Result containers # --------------------------------------------------------------------------- @dataclass class TrackerResult: """Execution result for a single tracker.""" device_name: str target_angle: float actual_angle: Optional[float] = None error: Optional[str] = None verified: bool = False # True if |actual − target| < tolerance method: str = "unknown" # "rpc", "attribute", "dry_run" @dataclass class DispatchResult: """Aggregate result for dispatching to all trackers.""" timestamp: datetime target_angle: float source: str # from ArbiterDecision.source trackers: List[TrackerResult] = field(default_factory=list) all_verified: bool = False dry_run: bool = False @property def n_success(self) -> int: return sum(1 for t in self.trackers if t.verified) @property def n_failed(self) -> int: return sum(1 for t in self.trackers if t.error) def summary(self) -> str: if self.dry_run: return f"[DRY RUN] target={self.target_angle:.1f}°, source={self.source}" return ( f"target={self.target_angle:.1f}°, source={self.source}, " f"verified={self.n_success}/{len(self.trackers)}" ) # --------------------------------------------------------------------------- # Dispatcher # --------------------------------------------------------------------------- class TrackerDispatcher: """Send angle commands to trackers and verify execution. Parameters ---------- tb_client : ThingsBoardClient, optional Lazy-initialised if not provided. tracker_names : list[str], optional Override which trackers to control (default: all 4). verify_timeout_sec : float How long to wait before reading back actual angles. angle_tolerance_deg : float Maximum acceptable |actual − target| for verification. dry_run : bool If True, log commands but don't actually send them. """ def __init__( self, tb_client=None, tracker_names: Optional[List[str]] = None, verify_timeout_sec: float = 5.0, angle_tolerance_deg: float = ANGLE_TOLERANCE_DEG, dry_run: bool = False, ): self._tb = tb_client self.tracker_names = tracker_names or TRACKER_NAMES self.verify_timeout = verify_timeout_sec self.tolerance = angle_tolerance_deg self.dry_run = dry_run def _client(self): if self._tb is None: from src.data.thingsboard_client import ThingsBoardClient self._tb = ThingsBoardClient() return self._tb # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def dispatch( self, decision, angle_overrides: Optional[Dict[str, float]] = None, ) -> DispatchResult: """Send an ArbiterDecision to all trackers. Parameters ---------- decision : ArbiterDecision Must have ``dispatch=True`` and ``angle`` set. angle_overrides : dict, optional Per-tracker angle overrides (device_name → angle). Trackers not in this dict get the default ``decision.angle``. Typically empty — all trackers follow the same pattern. Returns ------- DispatchResult with per-tracker verification status. """ now = datetime.now(tz=timezone.utc) result = DispatchResult( timestamp=now, target_angle=decision.angle, source=str(decision.source), dry_run=self.dry_run, ) if not decision.dispatch: logger.debug("Decision dispatch=False, skipping: %s", decision) return result for name in self.tracker_names: angle = decision.angle if angle_overrides and name in angle_overrides: angle = angle_overrides[name] tr = self._send_to_tracker(name, angle) result.trackers.append(tr) # Verify after a brief wait (let controller process the command) if not self.dry_run and any(t.error is None for t in result.trackers): time.sleep(self.verify_timeout) self._verify_all(result) result.all_verified = all(t.verified for t in result.trackers) logger.info("Dispatch: %s", result.summary()) return result def read_current_angles(self) -> Dict[str, Optional[float]]: """Read actual angle from all trackers. Useful for status display.""" angles = {} for name in self.tracker_names: try: vals = self._client().get_latest_telemetry(name, ["angle"]) angles[name] = vals.get("angle") except Exception as exc: logger.warning("Failed to read %s angle: %s", name, exc) angles[name] = None return angles # ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ def _send_to_tracker(self, device_name: str, angle: float) -> TrackerResult: """Send angle command to a single tracker.""" tr = TrackerResult(device_name=device_name, target_angle=angle) if self.dry_run: tr.method = "dry_run" tr.verified = True logger.info("[DRY RUN] %s → %.1f°", device_name, angle) return tr client = self._client() # Shared attribute write is the reliable method — tracker controllers # poll setAngle/setMode from shared attributes on their update cycle. # RPC requires the device to be online for real-time communication, # which is not guaranteed. # Retry attribute write with exponential backoff (1s, 2s) attr_exc = None for attempt in range(3): try: client.set_device_attributes(device_name, {"setAngle": angle, "setMode": "manual"}) tr.method = "attribute" return tr except Exception as exc: attr_exc = exc if attempt < 2: time.sleep(1 << attempt) # 1s, 2s logger.warning("Attribute write retry %d for %s: %s", attempt + 1, device_name, exc) logger.warning("Attribute write failed for %s after 3 attempts: %s, trying RPC", device_name, attr_exc) # Fallback: try RPC (may timeout if device is offline) try: client.send_rpc_command(device_name, "setAngle", angle) tr.method = "rpc" return tr except Exception as rpc_exc: tr.error = f"attribute (3 retries) and RPC both failed: {attr_exc}; {rpc_exc}" logger.error("Cannot send to %s: %s", device_name, tr.error) return tr def _verify_all(self, result: DispatchResult) -> None: """Read actual angles and compare to target.""" for tr in result.trackers: if tr.error: continue try: vals = self._client().get_latest_telemetry(tr.device_name, ["angle"]) tr.actual_angle = vals.get("angle") if tr.actual_angle is not None: diff = abs(tr.actual_angle - tr.target_angle) tr.verified = diff <= self.tolerance if not tr.verified: logger.warning( "%s: actual=%.1f° target=%.1f° (diff=%.1f° > tol=%.1f°)", tr.device_name, tr.actual_angle, tr.target_angle, diff, self.tolerance, ) except Exception as exc: logger.warning("Verify failed for %s: %s", tr.device_name, exc) tr.error = f"verification read failed: {exc}"