| """ |
| Token pool management with load balancing and round-robin mechanism |
| """ |
|
|
| import os |
| import time |
| import threading |
| from typing import List, Optional, Dict, Any, Set |
| from dataclasses import dataclass, field |
|
|
|
|
| def debug_log(message: str, *args) -> None: |
| """Log debug message if debug mode is enabled""" |
| |
| try: |
| from app.core.config import settings |
| if settings.DEBUG_LOGGING: |
| if args: |
| print(f"[DEBUG] {message % args}") |
| else: |
| print(f"[DEBUG] {message}") |
| except: |
| |
| print(f"[DEBUG] {message}") |
|
|
|
|
| @dataclass |
| class TokenInfo: |
| """Token information with failure tracking""" |
| token: str |
| failure_count: int = 0 |
| is_active: bool = True |
| last_failure_time: Optional[float] = None |
| last_used_time: Optional[float] = None |
|
|
|
|
| class TokenManager: |
| """Token pool manager with load balancing and failure handling""" |
| |
| def __init__(self, token_file_path: str = None): |
| try: |
| from app.core.config import settings |
| self.token_file_path = token_file_path or getattr(settings, 'TOKEN_FILE_PATH', './tokens.txt') |
| self.max_failures = getattr(settings, 'TOKEN_MAX_FAILURES', 3) |
| self.reload_interval = getattr(settings, 'TOKEN_RELOAD_INTERVAL', 60) |
| except ImportError: |
| |
| self.token_file_path = token_file_path or './tokens.txt' |
| self.max_failures = 3 |
| self.reload_interval = 60 |
| |
| self.tokens: List[TokenInfo] = [] |
| self.current_index = 0 |
| self.last_reload_time = 0 |
| self._lock = threading.Lock() |
| |
| |
| self._load_tokens() |
| |
| def _load_tokens(self) -> None: |
| """Load tokens from file""" |
| try: |
| new_tokens = [] |
| |
| |
| if os.path.exists(self.token_file_path): |
| with open(self.token_file_path, 'r', encoding='utf-8') as f: |
| lines = f.readlines() |
| |
| for line in lines: |
| token = line.strip() |
| if token and not token.startswith('#'): |
| |
| existing_token = next((t for t in self.tokens if t.token == token), None) |
| if existing_token: |
| new_tokens.append(existing_token) |
| else: |
| new_tokens.append(TokenInfo(token=token)) |
| |
| if new_tokens: |
| debug_log(f"从tokens.txt文件加载了 {len(new_tokens)} 个token") |
| else: |
| debug_log("Token文件为空或无有效token") |
| |
| |
| try: |
| from app.core.config import settings |
| if hasattr(settings, 'BACKUP_TOKEN') and settings.BACKUP_TOKEN: |
| |
| backup_tokens = [token.strip() for token in settings.BACKUP_TOKEN.split(',') if token.strip()] |
| |
| |
| for backup_token in backup_tokens: |
| |
| existing_token = next((t for t in new_tokens if t.token == backup_token), None) |
| if not existing_token: |
| |
| old_token = next((t for t in self.tokens if t.token == backup_token), None) |
| if old_token: |
| new_tokens.append(old_token) |
| else: |
| new_tokens.append(TokenInfo(token=backup_token)) |
| |
| debug_log(f"从BACKUP_TOKEN加载了 {len(backup_tokens)} 个token") |
| except ImportError: |
| pass |
| |
| |
| if not new_tokens: |
| try: |
| from app.core.config import settings |
| if hasattr(settings, 'BACKUP_TOKEN') and settings.BACKUP_TOKEN: |
| |
| backup_tokens = [token.strip() for token in settings.BACKUP_TOKEN.split(',') if token.strip()] |
| new_tokens = [TokenInfo(token=token) for token in backup_tokens] |
| debug_log(f"仅使用BACKUP_TOKEN,共{len(backup_tokens)}个token") |
| except ImportError: |
| pass |
| |
| if new_tokens: |
| with self._lock: |
| self.tokens = new_tokens |
| |
| if self.current_index >= len(self.tokens): |
| self.current_index = 0 |
| self.last_reload_time = time.time() |
| |
| debug_log(f"总共加载了 {len(self.tokens)} 个token") |
| active_count = sum(1 for t in self.tokens if t.is_active) |
| debug_log(f"活跃token数量: {active_count}") |
| else: |
| debug_log("没有找到任何可用的token") |
| |
| except Exception as e: |
| debug_log(f"加载token失败: {e}") |
| |
| def _should_reload(self) -> bool: |
| """Check if tokens should be reloaded""" |
| return time.time() - self.last_reload_time > self.reload_interval |
| |
| def get_next_token(self) -> Optional[str]: |
| """Get next available token using round-robin with load balancing""" |
| |
| if self._should_reload(): |
| self._load_tokens() |
| |
| with self._lock: |
| if not self.tokens: |
| debug_log("没有可用的token") |
| return None |
| |
| |
| active_tokens = [i for i, t in enumerate(self.tokens) if t.is_active] |
| |
| if not active_tokens: |
| debug_log("没有活跃的token,尝试重置失败计数") |
| |
| for token in self.tokens: |
| token.is_active = True |
| token.failure_count = 0 |
| active_tokens = list(range(len(self.tokens))) |
| |
| |
| attempts = 0 |
| max_attempts = len(active_tokens) |
| |
| while attempts < max_attempts: |
| |
| token_index = None |
| for i in range(len(self.tokens)): |
| idx = (self.current_index + i) % len(self.tokens) |
| if idx in active_tokens: |
| token_index = idx |
| break |
| |
| if token_index is not None: |
| self.current_index = (token_index + 1) % len(self.tokens) |
| token_info = self.tokens[token_index] |
| token_info.last_used_time = time.time() |
| debug_log(f"选择token[{token_index}]: {token_info.token[:20]}...") |
| return token_info.token |
| |
| attempts += 1 |
| |
| debug_log("无法找到可用的token") |
| return None |
| |
| def mark_token_failed(self, token: str) -> None: |
| """Mark a token as failed and deactivate if necessary""" |
| with self._lock: |
| for token_info in self.tokens: |
| if token_info.token == token: |
| token_info.failure_count += 1 |
| token_info.last_failure_time = time.time() |
| |
| if token_info.failure_count >= self.max_failures: |
| token_info.is_active = False |
| debug_log(f"Token失效 (失败{token_info.failure_count}次): {token[:20]}...") |
| else: |
| debug_log(f"Token失败 ({token_info.failure_count}/{self.max_failures}): {token[:20]}...") |
| break |
| |
| def mark_token_success(self, token: str) -> None: |
| """Mark a token as successful (reset failure count)""" |
| with self._lock: |
| for token_info in self.tokens: |
| if token_info.token == token: |
| if token_info.failure_count > 0: |
| debug_log(f"Token恢复正常: {token[:20]}...") |
| token_info.failure_count = 0 |
| token_info.is_active = True |
| break |
| |
| def get_token_stats(self) -> Dict[str, Any]: |
| """Get token pool statistics""" |
| with self._lock: |
| if not self.tokens: |
| return { |
| "total": 0, |
| "active": 0, |
| "failed": 0, |
| "tokens": [] |
| } |
| |
| active_count = sum(1 for t in self.tokens if t.is_active) |
| failed_count = len(self.tokens) - active_count |
| |
| token_details = [] |
| for i, token_info in enumerate(self.tokens): |
| token_details.append({ |
| "index": i, |
| "token_preview": token_info.token[:20] + "...", |
| "is_active": token_info.is_active, |
| "failure_count": token_info.failure_count, |
| "last_failure_time": token_info.last_failure_time, |
| "last_used_time": token_info.last_used_time |
| }) |
| |
| return { |
| "total": len(self.tokens), |
| "active": active_count, |
| "failed": failed_count, |
| "current_index": self.current_index, |
| "last_reload_time": self.last_reload_time, |
| "tokens": token_details |
| } |
| |
| def reset_all_tokens(self) -> None: |
| """Reset all tokens (clear failure counts and reactivate)""" |
| with self._lock: |
| for token_info in self.tokens: |
| token_info.is_active = True |
| token_info.failure_count = 0 |
| token_info.last_failure_time = None |
| debug_log("已重置所有token状态") |
| |
| def reload_tokens(self) -> None: |
| """Force reload tokens from file""" |
| debug_log("强制重新加载token文件") |
| self._load_tokens() |
|
|
|
|
| |
| token_manager = TokenManager() |