File size: 4,812 Bytes
938949f d81e798 6d7f91e d81e798 6d7f91e d81e798 938949f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | """
ControlLoop single-tick worker.
Entry point for GitHub Actions cron (every 15 min).
Usage:
python -m backend.workers.control_tick
python -m backend.workers.control_tick --dry-run
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
from datetime import datetime, timezone
from pathlib import Path
# Ensure project root is on sys.path
PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
# Load .env if present (local dev)
try:
from dotenv import load_dotenv
load_dotenv(PROJECT_ROOT / ".env")
except ImportError:
pass
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("control_tick")
def main():
parser = argparse.ArgumentParser(description="Run one ControlLoop tick")
parser.add_argument("--dry-run", action="store_true", help="Compute decisions without dispatching")
args = parser.parse_args()
from src.control_loop import ControlLoop
from src.data.redis_cache import get_redis
log.info("Starting control tick (dry_run=%s)", args.dry_run)
loop = ControlLoop(dry_run=args.dry_run)
result = loop.tick()
# Serialise result
result_dict = result.__dict__ if hasattr(result, "__dict__") else {"raw": str(result)}
result_dict["_timestamp"] = datetime.now(timezone.utc).isoformat()
result_dict["_dry_run"] = args.dry_run
# Store in Redis for the API to read
redis = get_redis()
if redis:
# Convert to JSON-safe dict
safe = json.loads(json.dumps(result_dict, default=str))
redis.set_json("control:last_tick", safe, ttl=1200) # 20 min TTL
log.info("Tick result saved to Redis")
# Also persist budget state for the /control/budget endpoint
from config.settings import MAX_ENERGY_REDUCTION_PCT
budget_info = {
"energy_cost_kwh": result_dict.get("energy_cost_kwh", 0),
"budget_spent_kwh": result_dict.get("budget_spent_kwh", 0),
"budget_remaining_kwh": result_dict.get("budget_remaining_kwh", 0),
"daily_budget_kwh": 25.0 * MAX_ENERGY_REDUCTION_PCT / 100.0,
"stage": result_dict.get("stage_id", "unknown"),
"last_updated": datetime.now(timezone.utc).isoformat(),
}
redis.set_json("control:budget", budget_info, ttl=1800) # 30 min TTL
else:
log.warning("Redis not available — tick result not persisted")
log.info("Tick complete: %s", json.dumps(result_dict, default=str, indent=2)[:500])
# Budget alert: warn if >80% spent before 14:00 IST
_check_budget_alert(result_dict)
def _check_budget_alert(tick: dict) -> None:
"""Log a warning (visible in GitHub Actions) if budget is nearly exhausted."""
import os
try:
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
now_israel = now_utc + timedelta(hours=2) # approximate IST
remaining = tick.get("budget_remaining_kwh", None)
if remaining is None or remaining == 0:
return # no budget data or dormant season
# Only alert before 14:00 IST (still daylight hours left)
if now_israel.hour >= 14:
return
# Get today's total budget from Redis
from src.data.redis_cache import get_redis
redis = get_redis()
if not redis:
return
budget_data = redis.get_json("control:budget")
if not budget_data or "plan" not in budget_data:
return
plan = budget_data["plan"]
total = sum(plan.get("slot_budgets", {}).values()) + plan.get("daily_margin_remaining_kWh", 0)
spent = plan.get("cumulative_spent", 0)
if total > 0 and spent / (total + spent) > 0.8:
log.warning(
"BUDGET ALERT: %.1f%% of daily budget spent before %02d:00 IST "
"(spent=%.3f kWh, remaining=%.3f kWh)",
spent / (total + spent) * 100,
now_israel.hour,
spent,
remaining,
)
# Future: send webhook/email here
webhook_url = os.environ.get("BUDGET_ALERT_WEBHOOK")
if webhook_url:
import requests
requests.post(webhook_url, json={
"text": f"SolarWine Budget Alert: {spent/(total+spent)*100:.0f}% spent before {now_israel.hour}:00 IST",
"spent_kwh": round(spent, 3),
"remaining_kwh": round(remaining, 3),
}, timeout=5)
except Exception as exc:
log.debug("Budget alert check failed: %s", exc)
if __name__ == "__main__":
main()
|