| """ |
| TrackerDispatcher: send tilt commands to physical trackers and verify execution. |
| |
| Sits downstream of the CommandArbiter. When an ArbiterDecision has |
| ``dispatch=True``, the dispatcher: |
| 1. Sends the target angle to all 4 trackers via ThingsBoard RPC. |
| 2. Waits briefly, then reads actual ``angle`` telemetry. |
| 3. Confirms |actual − target| < tolerance for each tracker. |
| 4. Returns a DispatchResult with per-tracker status. |
| |
| If RPC is unavailable (e.g. customer-level API), falls back to |
| shared-attribute writes (``setAngle``, ``setMode``). |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import time |
| from dataclasses import dataclass, field |
| from datetime import datetime, timezone |
| from typing import Dict, List, Optional |
|
|
| from config.settings import ANGLE_TOLERANCE_DEG, TRACKER_ID_MAP |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| TRACKER_NAMES = list(TRACKER_ID_MAP.values()) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class TrackerResult: |
| """Execution result for a single tracker.""" |
|
|
| device_name: str |
| target_angle: float |
| actual_angle: Optional[float] = None |
| error: Optional[str] = None |
| verified: bool = False |
| method: str = "unknown" |
|
|
|
|
| @dataclass |
| class DispatchResult: |
| """Aggregate result for dispatching to all trackers.""" |
|
|
| timestamp: datetime |
| target_angle: float |
| source: str |
| trackers: List[TrackerResult] = field(default_factory=list) |
| all_verified: bool = False |
| dry_run: bool = False |
|
|
| @property |
| def n_success(self) -> int: |
| return sum(1 for t in self.trackers if t.verified) |
|
|
| @property |
| def n_failed(self) -> int: |
| return sum(1 for t in self.trackers if t.error) |
|
|
| def summary(self) -> str: |
| if self.dry_run: |
| return f"[DRY RUN] target={self.target_angle:.1f}°, source={self.source}" |
| return ( |
| f"target={self.target_angle:.1f}°, source={self.source}, " |
| f"verified={self.n_success}/{len(self.trackers)}" |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class TrackerDispatcher: |
| """Send angle commands to trackers and verify execution. |
| |
| Parameters |
| ---------- |
| tb_client : ThingsBoardClient, optional |
| Lazy-initialised if not provided. |
| tracker_names : list[str], optional |
| Override which trackers to control (default: all 4). |
| verify_timeout_sec : float |
| How long to wait before reading back actual angles. |
| angle_tolerance_deg : float |
| Maximum acceptable |actual − target| for verification. |
| dry_run : bool |
| If True, log commands but don't actually send them. |
| """ |
|
|
| def __init__( |
| self, |
| tb_client=None, |
| tracker_names: Optional[List[str]] = None, |
| verify_timeout_sec: float = 5.0, |
| angle_tolerance_deg: float = ANGLE_TOLERANCE_DEG, |
| dry_run: bool = False, |
| ): |
| self._tb = tb_client |
| self.tracker_names = tracker_names or TRACKER_NAMES |
| self.verify_timeout = verify_timeout_sec |
| self.tolerance = angle_tolerance_deg |
| self.dry_run = dry_run |
|
|
| def _client(self): |
| if self._tb is None: |
| from src.data.thingsboard_client import ThingsBoardClient |
| self._tb = ThingsBoardClient() |
| return self._tb |
|
|
| |
| |
| |
|
|
| def dispatch( |
| self, |
| decision, |
| angle_overrides: Optional[Dict[str, float]] = None, |
| ) -> DispatchResult: |
| """Send an ArbiterDecision to all trackers. |
| |
| Parameters |
| ---------- |
| decision : ArbiterDecision |
| Must have ``dispatch=True`` and ``angle`` set. |
| angle_overrides : dict, optional |
| Per-tracker angle overrides (device_name → angle). |
| Trackers not in this dict get the default ``decision.angle``. |
| Typically empty — all trackers follow the same pattern. |
| |
| Returns |
| ------- |
| DispatchResult with per-tracker verification status. |
| """ |
| now = datetime.now(tz=timezone.utc) |
| result = DispatchResult( |
| timestamp=now, |
| target_angle=decision.angle, |
| source=str(decision.source), |
| dry_run=self.dry_run, |
| ) |
|
|
| if not decision.dispatch: |
| logger.debug("Decision dispatch=False, skipping: %s", decision) |
| return result |
|
|
| for name in self.tracker_names: |
| angle = decision.angle |
| if angle_overrides and name in angle_overrides: |
| angle = angle_overrides[name] |
| tr = self._send_to_tracker(name, angle) |
| result.trackers.append(tr) |
|
|
| |
| if not self.dry_run and any(t.error is None for t in result.trackers): |
| time.sleep(self.verify_timeout) |
| self._verify_all(result) |
|
|
| result.all_verified = all(t.verified for t in result.trackers) |
| logger.info("Dispatch: %s", result.summary()) |
| return result |
|
|
| def read_current_angles(self) -> Dict[str, Optional[float]]: |
| """Read actual angle from all trackers. Useful for status display.""" |
| angles = {} |
| for name in self.tracker_names: |
| try: |
| vals = self._client().get_latest_telemetry(name, ["angle"]) |
| angles[name] = vals.get("angle") |
| except Exception as exc: |
| logger.warning("Failed to read %s angle: %s", name, exc) |
| angles[name] = None |
| return angles |
|
|
| |
| |
| |
|
|
| def _send_to_tracker(self, device_name: str, angle: float) -> TrackerResult: |
| """Send angle command to a single tracker.""" |
| tr = TrackerResult(device_name=device_name, target_angle=angle) |
|
|
| if self.dry_run: |
| tr.method = "dry_run" |
| tr.verified = True |
| logger.info("[DRY RUN] %s → %.1f°", device_name, angle) |
| return tr |
|
|
| client = self._client() |
|
|
| |
| |
| |
| |
| |
| attr_exc = None |
| for attempt in range(3): |
| try: |
| client.set_device_attributes(device_name, {"setAngle": angle, "setMode": "manual"}) |
| tr.method = "attribute" |
| return tr |
| except Exception as exc: |
| attr_exc = exc |
| if attempt < 2: |
| time.sleep(1 << attempt) |
| logger.warning("Attribute write retry %d for %s: %s", attempt + 1, device_name, exc) |
|
|
| logger.warning("Attribute write failed for %s after 3 attempts: %s, trying RPC", device_name, attr_exc) |
|
|
| |
| try: |
| client.send_rpc_command(device_name, "setAngle", angle) |
| tr.method = "rpc" |
| return tr |
| except Exception as rpc_exc: |
| tr.error = f"attribute (3 retries) and RPC both failed: {attr_exc}; {rpc_exc}" |
| logger.error("Cannot send to %s: %s", device_name, tr.error) |
| return tr |
|
|
| def _verify_all(self, result: DispatchResult) -> None: |
| """Read actual angles and compare to target.""" |
| for tr in result.trackers: |
| if tr.error: |
| continue |
| try: |
| vals = self._client().get_latest_telemetry(tr.device_name, ["angle"]) |
| tr.actual_angle = vals.get("angle") |
| if tr.actual_angle is not None: |
| diff = abs(tr.actual_angle - tr.target_angle) |
| tr.verified = diff <= self.tolerance |
| if not tr.verified: |
| logger.warning( |
| "%s: actual=%.1f° target=%.1f° (diff=%.1f° > tol=%.1f°)", |
| tr.device_name, tr.actual_angle, tr.target_angle, |
| diff, self.tolerance, |
| ) |
| except Exception as exc: |
| logger.warning("Verify failed for %s: %s", tr.device_name, exc) |
| tr.error = f"verification read failed: {exc}" |
|
|