api / backend /workers /control_tick.py
safraeli's picture
Fix race conditions, error handling, timezone, divergence check
6d7f91e verified
"""
ControlLoop single-tick worker.
Entry point for GitHub Actions cron (every 15 min).
Usage:
python -m backend.workers.control_tick
python -m backend.workers.control_tick --dry-run
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
from datetime import datetime, timezone
from pathlib import Path
# Ensure project root is on sys.path
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
# Load .env if present (local dev)
try:
from dotenv import load_dotenv
load_dotenv(PROJECT_ROOT / ".env")
except ImportError:
pass
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("control_tick")
def main():
parser = argparse.ArgumentParser(description="Run one ControlLoop tick")
parser.add_argument("--dry-run", action="store_true", help="Compute decisions without dispatching")
args = parser.parse_args()
from src.control_loop import ControlLoop
from src.data.redis_cache import get_redis
log.info("Starting control tick (dry_run=%s)", args.dry_run)
loop = ControlLoop(dry_run=args.dry_run)
result = loop.tick()
# Serialise result
result_dict = result.__dict__ if hasattr(result, "__dict__") else {"raw": str(result)}
result_dict["_timestamp"] = datetime.now(timezone.utc).isoformat()
result_dict["_dry_run"] = args.dry_run
# Store in Redis for the API to read
redis = get_redis()
if redis:
# Convert to JSON-safe dict
safe = json.loads(json.dumps(result_dict, default=str))
redis.set_json("control:last_tick", safe, ttl=1200) # 20 min TTL
log.info("Tick result saved to Redis")
# Also persist budget state for the /control/budget endpoint
from config.settings import MAX_ENERGY_REDUCTION_PCT
budget_info = {
"energy_cost_kwh": result_dict.get("energy_cost_kwh", 0),
"budget_spent_kwh": result_dict.get("budget_spent_kwh", 0),
"budget_remaining_kwh": result_dict.get("budget_remaining_kwh", 0),
"daily_budget_kwh": 25.0 * MAX_ENERGY_REDUCTION_PCT / 100.0,
"stage": result_dict.get("stage_id", "unknown"),
"last_updated": datetime.now(timezone.utc).isoformat(),
}
redis.set_json("control:budget", budget_info, ttl=1800) # 30 min TTL
else:
log.warning("Redis not available — tick result not persisted")
log.info("Tick complete: %s", json.dumps(result_dict, default=str, indent=2)[:500])
# Budget alert: warn if >80% spent before 14:00 IST
_check_budget_alert(result_dict)
def _check_budget_alert(tick: dict) -> None:
"""Log a warning (visible in GitHub Actions) if budget is nearly exhausted."""
import os
try:
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
now_israel = now_utc + timedelta(hours=2) # approximate IST
remaining = tick.get("budget_remaining_kwh", None)
if remaining is None or remaining == 0:
return # no budget data or dormant season
# Only alert before 14:00 IST (still daylight hours left)
if now_israel.hour >= 14:
return
# Get today's total budget from Redis
from src.data.redis_cache import get_redis
redis = get_redis()
if not redis:
return
budget_data = redis.get_json("control:budget")
if not budget_data or "plan" not in budget_data:
return
plan = budget_data["plan"]
total = sum(plan.get("slot_budgets", {}).values()) + plan.get("daily_margin_remaining_kWh", 0)
spent = plan.get("cumulative_spent", 0)
if total > 0 and spent / (total + spent) > 0.8:
log.warning(
"BUDGET ALERT: %.1f%% of daily budget spent before %02d:00 IST "
"(spent=%.3f kWh, remaining=%.3f kWh)",
spent / (total + spent) * 100,
now_israel.hour,
spent,
remaining,
)
# Future: send webhook/email here
webhook_url = os.environ.get("BUDGET_ALERT_WEBHOOK")
if webhook_url:
import requests
requests.post(webhook_url, json={
"text": f"SolarWine Budget Alert: {spent/(total+spent)*100:.0f}% spent before {now_israel.hour}:00 IST",
"spent_kwh": round(spent, 3),
"remaining_kwh": round(remaining, 3),
}, timeout=5)
except Exception as exc:
log.debug("Budget alert check failed: %s", exc)
if __name__ == "__main__":
main()