| """ |
| Intelligent Source Pool Manager |
| Manages source pools, rotation, and automatic failover |
| """ |
|
|
| import json |
| from datetime import datetime, timedelta |
| from typing import Optional, List, Dict, Any |
| from threading import Lock |
| from sqlalchemy.orm import Session |
|
|
| from database.models import ( |
| SourcePool, PoolMember, RotationHistory, RotationState, |
| Provider, RateLimitUsage |
| ) |
| from monitoring.rate_limiter import rate_limiter |
| from utils.logger import setup_logger |
|
|
| logger = setup_logger("source_pool_manager") |
|
|
|
|
| class SourcePoolManager: |
| """ |
| Manages source pools and intelligent rotation |
| """ |
|
|
| def __init__(self, db_session: Session): |
| """ |
| Initialize source pool manager |
| |
| Args: |
| db_session: Database session |
| """ |
| self.db = db_session |
| self.lock = Lock() |
| logger.info("Source Pool Manager initialized") |
|
|
| def create_pool( |
| self, |
| name: str, |
| category: str, |
| description: Optional[str] = None, |
| rotation_strategy: str = "round_robin" |
| ) -> SourcePool: |
| """ |
| Create a new source pool |
| |
| Args: |
| name: Pool name |
| category: Pool category |
| description: Pool description |
| rotation_strategy: Rotation strategy (round_robin, least_used, priority) |
| |
| Returns: |
| Created SourcePool |
| """ |
| with self.lock: |
| pool = SourcePool( |
| name=name, |
| category=category, |
| description=description, |
| rotation_strategy=rotation_strategy, |
| enabled=True |
| ) |
| self.db.add(pool) |
| self.db.commit() |
| self.db.refresh(pool) |
|
|
| |
| state = RotationState( |
| pool_id=pool.id, |
| current_provider_id=None, |
| rotation_count=0 |
| ) |
| self.db.add(state) |
| self.db.commit() |
|
|
| logger.info(f"Created source pool: {name} (strategy: {rotation_strategy})") |
| return pool |
|
|
| def add_to_pool( |
| self, |
| pool_id: int, |
| provider_id: int, |
| priority: int = 1, |
| weight: int = 1 |
| ) -> PoolMember: |
| """ |
| Add a provider to a pool |
| |
| Args: |
| pool_id: Pool ID |
| provider_id: Provider ID |
| priority: Provider priority (higher = better) |
| weight: Provider weight for weighted rotation |
| |
| Returns: |
| Created PoolMember |
| """ |
| with self.lock: |
| member = PoolMember( |
| pool_id=pool_id, |
| provider_id=provider_id, |
| priority=priority, |
| weight=weight, |
| enabled=True, |
| use_count=0, |
| success_count=0, |
| failure_count=0 |
| ) |
| self.db.add(member) |
| self.db.commit() |
| self.db.refresh(member) |
|
|
| logger.info(f"Added provider {provider_id} to pool {pool_id}") |
| return member |
|
|
| def get_next_provider( |
| self, |
| pool_id: int, |
| exclude_rate_limited: bool = True |
| ) -> Optional[Provider]: |
| """ |
| Get next provider from pool based on rotation strategy |
| |
| Args: |
| pool_id: Pool ID |
| exclude_rate_limited: Exclude rate-limited providers |
| |
| Returns: |
| Next Provider or None if none available |
| """ |
| with self.lock: |
| |
| pool = self.db.query(SourcePool).filter_by(id=pool_id).first() |
| if not pool or not pool.enabled: |
| logger.warning(f"Pool {pool_id} not found or disabled") |
| return None |
|
|
| |
| members = ( |
| self.db.query(PoolMember) |
| .filter_by(pool_id=pool_id, enabled=True) |
| .join(Provider) |
| .filter(Provider.id == PoolMember.provider_id) |
| .all() |
| ) |
|
|
| if not members: |
| logger.warning(f"No enabled members in pool {pool_id}") |
| return None |
|
|
| |
| if exclude_rate_limited: |
| available_members = [] |
| for member in members: |
| provider = self.db.query(Provider).get(member.provider_id) |
| can_use, _ = rate_limiter.can_make_request(provider.name) |
| if can_use: |
| available_members.append(member) |
|
|
| if not available_members: |
| logger.warning(f"All providers in pool {pool_id} are rate-limited") |
| |
| available_members = members |
| else: |
| available_members = members |
|
|
| |
| selected_member = self._select_by_strategy( |
| pool.rotation_strategy, |
| available_members |
| ) |
|
|
| if not selected_member: |
| return None |
|
|
| |
| state = self.db.query(RotationState).filter_by(pool_id=pool_id).first() |
| if not state: |
| state = RotationState(pool_id=pool_id) |
| self.db.add(state) |
|
|
| |
| old_provider_id = state.current_provider_id |
| if old_provider_id != selected_member.provider_id: |
| self._record_rotation( |
| pool_id=pool_id, |
| from_provider_id=old_provider_id, |
| to_provider_id=selected_member.provider_id, |
| reason="rotation" |
| ) |
|
|
| |
| state.current_provider_id = selected_member.provider_id |
| state.last_rotation = datetime.utcnow() |
| state.rotation_count += 1 |
|
|
| |
| selected_member.last_used = datetime.utcnow() |
| selected_member.use_count += 1 |
|
|
| self.db.commit() |
|
|
| provider = self.db.query(Provider).get(selected_member.provider_id) |
| logger.info( |
| f"Selected provider {provider.name} from pool {pool.name} " |
| f"(strategy: {pool.rotation_strategy})" |
| ) |
| return provider |
|
|
| def _select_by_strategy( |
| self, |
| strategy: str, |
| members: List[PoolMember] |
| ) -> Optional[PoolMember]: |
| """ |
| Select a pool member based on rotation strategy |
| |
| Args: |
| strategy: Rotation strategy |
| members: Available pool members |
| |
| Returns: |
| Selected PoolMember |
| """ |
| if not members: |
| return None |
|
|
| if strategy == "priority": |
| |
| return max(members, key=lambda m: m.priority) |
|
|
| elif strategy == "least_used": |
| |
| return min(members, key=lambda m: m.use_count) |
|
|
| elif strategy == "weighted": |
| |
| |
| return max(members, key=lambda m: m.weight * (1.0 / (m.use_count + 1))) |
|
|
| else: |
| |
| never_used = [m for m in members if m.last_used is None] |
| if never_used: |
| return never_used[0] |
| return min(members, key=lambda m: m.last_used) |
|
|
| def _record_rotation( |
| self, |
| pool_id: int, |
| from_provider_id: Optional[int], |
| to_provider_id: int, |
| reason: str, |
| notes: Optional[str] = None |
| ): |
| """ |
| Record a rotation event |
| |
| Args: |
| pool_id: Pool ID |
| from_provider_id: Previous provider ID |
| to_provider_id: New provider ID |
| reason: Rotation reason |
| notes: Additional notes |
| """ |
| rotation = RotationHistory( |
| pool_id=pool_id, |
| from_provider_id=from_provider_id, |
| to_provider_id=to_provider_id, |
| rotation_reason=reason, |
| success=True, |
| notes=notes |
| ) |
| self.db.add(rotation) |
| self.db.commit() |
|
|
| def failover( |
| self, |
| pool_id: int, |
| failed_provider_id: int, |
| reason: str = "failure" |
| ) -> Optional[Provider]: |
| """ |
| Perform failover from a failed provider |
| |
| Args: |
| pool_id: Pool ID |
| failed_provider_id: Failed provider ID |
| reason: Failure reason |
| |
| Returns: |
| Next available provider |
| """ |
| with self.lock: |
| logger.warning( |
| f"Failover triggered for provider {failed_provider_id} " |
| f"in pool {pool_id}. Reason: {reason}" |
| ) |
|
|
| |
| member = ( |
| self.db.query(PoolMember) |
| .filter_by(pool_id=pool_id, provider_id=failed_provider_id) |
| .first() |
| ) |
| if member: |
| member.failure_count += 1 |
| self.db.commit() |
|
|
| |
| pool = self.db.query(SourcePool).filter_by(id=pool_id).first() |
| if not pool: |
| return None |
|
|
| members = ( |
| self.db.query(PoolMember) |
| .filter_by(pool_id=pool_id, enabled=True) |
| .filter(PoolMember.provider_id != failed_provider_id) |
| .all() |
| ) |
|
|
| if not members: |
| logger.error(f"No alternative providers available in pool {pool_id}") |
| return None |
|
|
| |
| selected_member = self._select_by_strategy( |
| pool.rotation_strategy, |
| members |
| ) |
|
|
| if not selected_member: |
| return None |
|
|
| |
| self._record_rotation( |
| pool_id=pool_id, |
| from_provider_id=failed_provider_id, |
| to_provider_id=selected_member.provider_id, |
| reason=reason, |
| notes=f"Automatic failover from provider {failed_provider_id}" |
| ) |
|
|
| |
| state = self.db.query(RotationState).filter_by(pool_id=pool_id).first() |
| if state: |
| state.current_provider_id = selected_member.provider_id |
| state.last_rotation = datetime.utcnow() |
| state.rotation_count += 1 |
|
|
| |
| selected_member.last_used = datetime.utcnow() |
| selected_member.use_count += 1 |
|
|
| self.db.commit() |
|
|
| provider = self.db.query(Provider).get(selected_member.provider_id) |
| logger.info(f"Failover successful: switched to provider {provider.name}") |
| return provider |
|
|
| def record_success(self, pool_id: int, provider_id: int): |
| """ |
| Record successful use of a provider |
| |
| Args: |
| pool_id: Pool ID |
| provider_id: Provider ID |
| """ |
| with self.lock: |
| member = ( |
| self.db.query(PoolMember) |
| .filter_by(pool_id=pool_id, provider_id=provider_id) |
| .first() |
| ) |
| if member: |
| member.success_count += 1 |
| self.db.commit() |
|
|
| def record_failure(self, pool_id: int, provider_id: int): |
| """ |
| Record failed use of a provider |
| |
| Args: |
| pool_id: Pool ID |
| provider_id: Provider ID |
| """ |
| with self.lock: |
| member = ( |
| self.db.query(PoolMember) |
| .filter_by(pool_id=pool_id, provider_id=provider_id) |
| .first() |
| ) |
| if member: |
| member.failure_count += 1 |
| self.db.commit() |
|
|
| def get_pool_status(self, pool_id: int) -> Optional[Dict[str, Any]]: |
| """ |
| Get comprehensive pool status |
| |
| Args: |
| pool_id: Pool ID |
| |
| Returns: |
| Pool status dictionary |
| """ |
| with self.lock: |
| pool = self.db.query(SourcePool).filter_by(id=pool_id).first() |
| if not pool: |
| return None |
|
|
| |
| state = self.db.query(RotationState).filter_by(pool_id=pool_id).first() |
|
|
| |
| current_provider = None |
| if state and state.current_provider_id: |
| provider = self.db.query(Provider).get(state.current_provider_id) |
| if provider: |
| current_provider = { |
| "id": provider.id, |
| "name": provider.name, |
| "status": "active" |
| } |
|
|
| |
| members = [] |
| pool_members = self.db.query(PoolMember).filter_by(pool_id=pool_id).all() |
|
|
| for member in pool_members: |
| provider = self.db.query(Provider).get(member.provider_id) |
| if not provider: |
| continue |
|
|
| |
| rate_status = rate_limiter.get_status(provider.name) |
| rate_limit_info = None |
| if rate_status: |
| rate_limit_info = { |
| "usage": rate_status['current_usage'], |
| "limit": rate_status['limit_value'], |
| "percentage": rate_status['percentage'], |
| "status": rate_status['status'] |
| } |
|
|
| success_rate = 0 |
| if member.use_count > 0: |
| success_rate = (member.success_count / member.use_count) * 100 |
|
|
| members.append({ |
| "provider_id": provider.id, |
| "provider_name": provider.name, |
| "priority": member.priority, |
| "weight": member.weight, |
| "enabled": member.enabled, |
| "use_count": member.use_count, |
| "success_count": member.success_count, |
| "failure_count": member.failure_count, |
| "success_rate": round(success_rate, 2), |
| "last_used": member.last_used.isoformat() if member.last_used else None, |
| "rate_limit": rate_limit_info |
| }) |
|
|
| |
| recent_rotations = ( |
| self.db.query(RotationHistory) |
| .filter_by(pool_id=pool_id) |
| .order_by(RotationHistory.timestamp.desc()) |
| .limit(10) |
| .all() |
| ) |
|
|
| rotation_list = [] |
| for rotation in recent_rotations: |
| from_provider = None |
| if rotation.from_provider_id: |
| from_prov = self.db.query(Provider).get(rotation.from_provider_id) |
| from_provider = from_prov.name if from_prov else None |
|
|
| to_prov = self.db.query(Provider).get(rotation.to_provider_id) |
| to_provider = to_prov.name if to_prov else None |
|
|
| rotation_list.append({ |
| "timestamp": rotation.timestamp.isoformat(), |
| "from_provider": from_provider, |
| "to_provider": to_provider, |
| "reason": rotation.rotation_reason, |
| "success": rotation.success |
| }) |
|
|
| return { |
| "pool_id": pool.id, |
| "pool_name": pool.name, |
| "category": pool.category, |
| "description": pool.description, |
| "rotation_strategy": pool.rotation_strategy, |
| "enabled": pool.enabled, |
| "current_provider": current_provider, |
| "total_rotations": state.rotation_count if state else 0, |
| "last_rotation": state.last_rotation.isoformat() if state and state.last_rotation else None, |
| "members": members, |
| "recent_rotations": rotation_list |
| } |
|
|
| def get_all_pools_status(self) -> List[Dict[str, Any]]: |
| """ |
| Get status of all pools |
| |
| Returns: |
| List of pool status dictionaries |
| """ |
| pools = self.db.query(SourcePool).all() |
| return [ |
| self.get_pool_status(pool.id) |
| for pool in pools |
| if self.get_pool_status(pool.id) |
| ] |
|
|