| """ |
| Rate Limit Tracking Module |
| Manages rate limits per provider with in-memory tracking |
| """ |
|
|
| import time |
| from datetime import datetime, timedelta |
| from typing import Dict, Optional, Tuple |
| from threading import Lock |
| from utils.logger import setup_logger |
|
|
| logger = setup_logger("rate_limiter") |
|
|
|
|
| class RateLimiter: |
| """ |
| Rate limiter with per-provider tracking |
| """ |
|
|
| def __init__(self): |
| """Initialize rate limiter""" |
| self.limits: Dict[str, Dict] = {} |
| self.lock = Lock() |
|
|
| def configure_limit( |
| self, |
| provider: str, |
| limit_type: str, |
| limit_value: int |
| ): |
| """ |
| Configure rate limit for a provider |
| |
| Args: |
| provider: Provider name |
| limit_type: Type of limit (per_minute, per_hour, per_day, per_second) |
| limit_value: Maximum requests allowed |
| """ |
| with self.lock: |
| |
| now = datetime.now() |
| if limit_type == "per_second": |
| reset_time = now + timedelta(seconds=1) |
| elif limit_type == "per_minute": |
| reset_time = now + timedelta(minutes=1) |
| elif limit_type == "per_hour": |
| reset_time = now + timedelta(hours=1) |
| elif limit_type == "per_day": |
| reset_time = now + timedelta(days=1) |
| else: |
| logger.warning(f"Unknown limit type {limit_type} for {provider}") |
| reset_time = now + timedelta(minutes=1) |
|
|
| self.limits[provider] = { |
| "limit_type": limit_type, |
| "limit_value": limit_value, |
| "current_usage": 0, |
| "reset_time": reset_time, |
| "last_request_time": None |
| } |
|
|
| logger.info(f"Configured rate limit for {provider}: {limit_value} {limit_type}") |
|
|
| def can_make_request(self, provider: str) -> Tuple[bool, Optional[str]]: |
| """ |
| Check if request can be made without exceeding rate limit |
| |
| Args: |
| provider: Provider name |
| |
| Returns: |
| Tuple of (can_proceed, reason_if_blocked) |
| """ |
| with self.lock: |
| if provider not in self.limits: |
| |
| return True, None |
|
|
| limit_info = self.limits[provider] |
| now = datetime.now() |
|
|
| |
| if now >= limit_info["reset_time"]: |
| self._reset_limit(provider) |
| limit_info = self.limits[provider] |
|
|
| |
| if limit_info["current_usage"] < limit_info["limit_value"]: |
| return True, None |
| else: |
| seconds_until_reset = (limit_info["reset_time"] - now).total_seconds() |
| return False, f"Rate limit reached. Reset in {int(seconds_until_reset)}s" |
|
|
| def record_request(self, provider: str): |
| """ |
| Record a request against the rate limit |
| |
| Args: |
| provider: Provider name |
| """ |
| with self.lock: |
| if provider not in self.limits: |
| logger.warning(f"Recording request for unconfigured provider: {provider}") |
| return |
|
|
| limit_info = self.limits[provider] |
| now = datetime.now() |
|
|
| |
| if now >= limit_info["reset_time"]: |
| self._reset_limit(provider) |
| limit_info = self.limits[provider] |
|
|
| |
| limit_info["current_usage"] += 1 |
| limit_info["last_request_time"] = now |
|
|
| |
| percentage = (limit_info["current_usage"] / limit_info["limit_value"]) * 100 |
| if percentage >= 80: |
| logger.warning( |
| f"Rate limit warning for {provider}: {percentage:.1f}% used " |
| f"({limit_info['current_usage']}/{limit_info['limit_value']})" |
| ) |
|
|
| def _reset_limit(self, provider: str): |
| """ |
| Reset rate limit counter |
| |
| Args: |
| provider: Provider name |
| """ |
| if provider not in self.limits: |
| return |
|
|
| limit_info = self.limits[provider] |
| limit_type = limit_info["limit_type"] |
| now = datetime.now() |
|
|
| |
| if limit_type == "per_second": |
| reset_time = now + timedelta(seconds=1) |
| elif limit_type == "per_minute": |
| reset_time = now + timedelta(minutes=1) |
| elif limit_type == "per_hour": |
| reset_time = now + timedelta(hours=1) |
| elif limit_type == "per_day": |
| reset_time = now + timedelta(days=1) |
| else: |
| reset_time = now + timedelta(minutes=1) |
|
|
| limit_info["current_usage"] = 0 |
| limit_info["reset_time"] = reset_time |
|
|
| logger.debug(f"Reset rate limit for {provider}. Next reset: {reset_time}") |
|
|
| def get_status(self, provider: str) -> Optional[Dict]: |
| """ |
| Get current rate limit status for provider |
| |
| Args: |
| provider: Provider name |
| |
| Returns: |
| Dict with limit info or None if not configured |
| """ |
| with self.lock: |
| if provider not in self.limits: |
| return None |
|
|
| limit_info = self.limits[provider] |
| now = datetime.now() |
|
|
| |
| if now >= limit_info["reset_time"]: |
| self._reset_limit(provider) |
| limit_info = self.limits[provider] |
|
|
| percentage = (limit_info["current_usage"] / limit_info["limit_value"]) * 100 if limit_info["limit_value"] > 0 else 0 |
| seconds_until_reset = max(0, (limit_info["reset_time"] - now).total_seconds()) |
|
|
| status = "ok" |
| if percentage >= 100: |
| status = "blocked" |
| elif percentage >= 80: |
| status = "warning" |
|
|
| return { |
| "provider": provider, |
| "limit_type": limit_info["limit_type"], |
| "limit_value": limit_info["limit_value"], |
| "current_usage": limit_info["current_usage"], |
| "percentage": round(percentage, 1), |
| "reset_time": limit_info["reset_time"].isoformat(), |
| "reset_in_seconds": int(seconds_until_reset), |
| "status": status, |
| "last_request_time": limit_info["last_request_time"].isoformat() if limit_info["last_request_time"] else None |
| } |
|
|
| def get_all_statuses(self) -> Dict[str, Dict]: |
| """ |
| Get rate limit status for all providers |
| |
| Returns: |
| Dict mapping provider names to their rate limit status |
| """ |
| with self.lock: |
| return { |
| provider: self.get_status(provider) |
| for provider in self.limits.keys() |
| } |
|
|
| def remove_limit(self, provider: str): |
| """ |
| Remove rate limit configuration for provider |
| |
| Args: |
| provider: Provider name |
| """ |
| with self.lock: |
| if provider in self.limits: |
| del self.limits[provider] |
| logger.info(f"Removed rate limit for {provider}") |
|
|
|
|
| |
| rate_limiter = RateLimiter() |
|
|