""" ControlLoop single-tick worker. Entry point for GitHub Actions cron (every 15 min). Usage: python -m backend.workers.control_tick python -m backend.workers.control_tick --dry-run """ from __future__ import annotations import argparse import json import logging import sys from datetime import datetime, timezone from pathlib import Path # Ensure project root is on sys.path PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) # Load .env if present (local dev) try: from dotenv import load_dotenv load_dotenv(PROJECT_ROOT / ".env") except ImportError: pass logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) log = logging.getLogger("control_tick") def main(): parser = argparse.ArgumentParser(description="Run one ControlLoop tick") parser.add_argument("--dry-run", action="store_true", help="Compute decisions without dispatching") args = parser.parse_args() from src.control_loop import ControlLoop from src.data.redis_cache import get_redis log.info("Starting control tick (dry_run=%s)", args.dry_run) loop = ControlLoop(dry_run=args.dry_run) result = loop.tick() # Serialise result result_dict = result.__dict__ if hasattr(result, "__dict__") else {"raw": str(result)} result_dict["_timestamp"] = datetime.now(timezone.utc).isoformat() result_dict["_dry_run"] = args.dry_run # Store in Redis for the API to read redis = get_redis() if redis: # Convert to JSON-safe dict safe = json.loads(json.dumps(result_dict, default=str)) redis.set_json("control:last_tick", safe, ttl=1200) # 20 min TTL log.info("Tick result saved to Redis") # Also persist budget state for the /control/budget endpoint from config.settings import MAX_ENERGY_REDUCTION_PCT budget_info = { "energy_cost_kwh": result_dict.get("energy_cost_kwh", 0), "budget_spent_kwh": result_dict.get("budget_spent_kwh", 0), "budget_remaining_kwh": result_dict.get("budget_remaining_kwh", 0), "daily_budget_kwh": 25.0 * MAX_ENERGY_REDUCTION_PCT / 100.0, "stage": result_dict.get("stage_id", "unknown"), "last_updated": datetime.now(timezone.utc).isoformat(), } redis.set_json("control:budget", budget_info, ttl=1800) # 30 min TTL else: log.warning("Redis not available — tick result not persisted") log.info("Tick complete: %s", json.dumps(result_dict, default=str, indent=2)[:500]) # Budget alert: warn if >80% spent before 14:00 IST _check_budget_alert(result_dict) def _check_budget_alert(tick: dict) -> None: """Log a warning (visible in GitHub Actions) if budget is nearly exhausted.""" import os try: from datetime import datetime, timezone, timedelta now_utc = datetime.now(timezone.utc) now_israel = now_utc + timedelta(hours=2) # approximate IST remaining = tick.get("budget_remaining_kwh", None) if remaining is None or remaining == 0: return # no budget data or dormant season # Only alert before 14:00 IST (still daylight hours left) if now_israel.hour >= 14: return # Get today's total budget from Redis from src.data.redis_cache import get_redis redis = get_redis() if not redis: return budget_data = redis.get_json("control:budget") if not budget_data or "plan" not in budget_data: return plan = budget_data["plan"] total = sum(plan.get("slot_budgets", {}).values()) + plan.get("daily_margin_remaining_kWh", 0) spent = plan.get("cumulative_spent", 0) if total > 0 and spent / (total + spent) > 0.8: log.warning( "BUDGET ALERT: %.1f%% of daily budget spent before %02d:00 IST " "(spent=%.3f kWh, remaining=%.3f kWh)", spent / (total + spent) * 100, now_israel.hour, spent, remaining, ) # Future: send webhook/email here webhook_url = os.environ.get("BUDGET_ALERT_WEBHOOK") if webhook_url: import requests requests.post(webhook_url, json={ "text": f"SolarWine Budget Alert: {spent/(total+spent)*100:.0f}% spent before {now_israel.hour}:00 IST", "spent_kwh": round(spent, 3), "remaining_kwh": round(remaining, 3), }, timeout=5) except Exception as exc: log.debug("Budget alert check failed: %s", exc) if __name__ == "__main__": main()