| """ |
| Utility functions for the application |
| """ |
|
|
| import json |
| import re |
| import time |
| import random |
| from typing import Dict, List, Optional, Any, Tuple, Generator |
| import requests |
| from fake_useragent import UserAgent |
|
|
| from app.core.config import settings |
| from app.core.token_manager import token_manager |
|
|
| |
| _user_agent_instance = None |
|
|
| def get_user_agent_instance() -> UserAgent: |
| """获取或创建 UserAgent 实例(单例模式)""" |
| global _user_agent_instance |
| if _user_agent_instance is None: |
| _user_agent_instance = UserAgent() |
| return _user_agent_instance |
|
|
|
|
| def debug_log(message: str, *args) -> None: |
| """Log debug message if debug mode is enabled""" |
| if settings.DEBUG_LOGGING: |
| if args: |
| print(f"[DEBUG] {message % args}") |
| else: |
| print(f"[DEBUG] {message}") |
|
|
|
|
| def generate_request_ids() -> Tuple[str, str]: |
| """Generate unique IDs for chat and message""" |
| timestamp = int(time.time()) |
| chat_id = f"{timestamp * 1000}-{timestamp}" |
| msg_id = str(timestamp * 1000000) |
| return chat_id, msg_id |
|
|
|
|
| def get_browser_headers(referer_chat_id: str = "") -> Dict[str, str]: |
| """Get browser headers for API requests with dynamic User-Agent""" |
| |
| |
| ua = get_user_agent_instance() |
| |
| |
| browser_choices = ['chrome', 'chrome', 'chrome', 'edge', 'edge', 'firefox', 'safari'] |
| browser_type = random.choice(browser_choices) |
| |
| try: |
| |
| if browser_type == 'chrome': |
| user_agent = ua.chrome |
| elif browser_type == 'edge': |
| user_agent = ua.edge |
| elif browser_type == 'firefox': |
| user_agent = ua.firefox |
| elif browser_type == 'safari': |
| user_agent = ua.safari |
| else: |
| user_agent = ua.random |
| except: |
| |
| user_agent = ua.random |
| |
| |
| chrome_version = "139" |
| edge_version = "139" |
| |
| if "Chrome/" in user_agent: |
| try: |
| chrome_version = user_agent.split("Chrome/")[1].split(".")[0] |
| except: |
| pass |
| |
| if "Edg/" in user_agent: |
| try: |
| edge_version = user_agent.split("Edg/")[1].split(".")[0] |
| |
| sec_ch_ua = f'"Microsoft Edge";v="{edge_version}", "Chromium";v="{chrome_version}", "Not_A Brand";v="24"' |
| except: |
| sec_ch_ua = f'"Not_A Brand";v="8", "Chromium";v="{chrome_version}", "Google Chrome";v="{chrome_version}"' |
| elif "Firefox/" in user_agent: |
| |
| sec_ch_ua = None |
| else: |
| |
| sec_ch_ua = f'"Not_A Brand";v="8", "Chromium";v="{chrome_version}", "Google Chrome";v="{chrome_version}"' |
| |
| |
| headers = { |
| "Content-Type": "application/json", |
| "Accept": "application/json, text/event-stream", |
| "User-Agent": user_agent, |
| "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7", |
| "sec-ch-ua-mobile": "?0", |
| "sec-ch-ua-platform": '"Windows"', |
| "sec-fetch-dest": "empty", |
| "sec-fetch-mode": "cors", |
| "sec-fetch-site": "same-origin", |
| "X-FE-Version": "prod-fe-1.0.70", |
| "Origin": settings.CLIENT_HEADERS["Origin"], |
| "Cache-Control": "no-cache", |
| "Pragma": "no-cache", |
| } |
| |
| |
| if sec_ch_ua: |
| headers["sec-ch-ua"] = sec_ch_ua |
| |
| |
| if referer_chat_id: |
| headers["Referer"] = f"{settings.CLIENT_HEADERS['Origin']}/c/{referer_chat_id}" |
| |
| |
| if settings.DEBUG_LOGGING: |
| debug_log(f"使用 User-Agent: {user_agent[:100]}...") |
| |
| return headers |
|
|
|
|
| def get_anonymous_token() -> str: |
| """Get anonymous token for authentication""" |
| headers = get_browser_headers() |
| headers.update({ |
| "Accept": "*/*", |
| "Accept-Language": "zh-CN,zh;q=0.9", |
| "Referer": f"{settings.CLIENT_HEADERS['Origin']}/", |
| }) |
| |
| try: |
| response = requests.get( |
| f"{settings.CLIENT_HEADERS['Origin']}/api/v1/auths/", |
| headers=headers, |
| timeout=10.0 |
| ) |
| |
| if response.status_code != 200: |
| raise Exception(f"anon token status={response.status_code}") |
| |
| data = response.json() |
| token = data.get("token") |
| if not token: |
| raise Exception("anon token empty") |
| |
| return token |
| except Exception as e: |
| debug_log(f"获取匿名token失败: {e}") |
| raise |
|
|
|
|
| def get_auth_token() -> str: |
| """Get authentication token (anonymous or from token pool)""" |
| if settings.ANONYMOUS_MODE: |
| try: |
| token = get_anonymous_token() |
| debug_log(f"匿名token获取成功: {token[:10]}...") |
| return token |
| except Exception as e: |
| debug_log(f"匿名token获取失败,使用token池: {e}") |
| |
| |
| token = token_manager.get_next_token() |
| if token: |
| debug_log(f"从token池获取token: {token[:10]}...") |
| return token |
| else: |
| debug_log("token池无可用token,使用配置文件备用token") |
| return settings.BACKUP_TOKEN |
|
|
|
|
| def transform_thinking_content(content: str) -> str: |
| """Transform thinking content according to configuration""" |
| |
| content = re.sub(r'(?s)<summary>.*?</summary>', '', content) |
| |
| content = content.replace("</thinking>", "").replace("<Full>", "").replace("</Full>", "") |
| content = content.strip() |
| |
| if settings.THINKING_PROCESSING == "think": |
| content = re.sub(r'<details[^>]*>', '<span>', content) |
| content = content.replace("</details>", "</span>") |
| elif settings.THINKING_PROCESSING == "strip": |
| content = re.sub(r'<details[^>]*>', '', content) |
| content = content.replace("</details>", "") |
| |
| |
| content = content.lstrip("> ") |
| content = content.replace("\n> ", "\n") |
| |
| return content.strip() |
|
|
|
|
| def call_upstream_api( |
| upstream_req: Any, |
| chat_id: str, |
| auth_token: str |
| ) -> requests.Response: |
| """Call upstream API with proper headers""" |
| headers = get_browser_headers(chat_id) |
| headers["Authorization"] = f"Bearer {auth_token}" |
| |
| |
| request_data = upstream_req.model_dump(exclude_none=True) |
| request_json = upstream_req.model_dump_json() |
| |
| debug_log(f"调用上游API: {settings.API_ENDPOINT}") |
| debug_log(f"请求体大小: {len(request_json)} 字符") |
| |
| |
| if len(request_json) > 1000: |
| debug_log(f"上游请求体 (截断): {request_json[:500]}...{request_json[-200:]}") |
| else: |
| debug_log(f"上游请求体: {request_json}") |
| |
| |
| proxies = {} |
| if settings.HTTP_PROXY: |
| proxies['http'] = settings.HTTP_PROXY |
| if settings.HTTPS_PROXY: |
| proxies['https'] = settings.HTTPS_PROXY |
| |
| try: |
| response = requests.post( |
| settings.API_ENDPOINT, |
| json=request_data, |
| headers=headers, |
| timeout=(settings.CONNECTION_TIMEOUT, settings.REQUEST_TIMEOUT), |
| stream=True, |
| proxies=proxies if proxies else None, |
| verify=True, |
| ) |
| |
| debug_log(f"上游响应状态: {response.status_code}") |
| |
| |
| if settings.DEBUG_LOGGING: |
| content_type = response.headers.get('content-type', 'unknown') |
| content_length = response.headers.get('content-length', 'unknown') |
| debug_log(f"响应类型: {content_type}, 长度: {content_length}") |
| |
| return response |
| |
| except requests.exceptions.Timeout as e: |
| debug_log(f"请求超时: {e}") |
| raise Exception(f"上游API请求超时: {e}") |
| except requests.exceptions.ConnectionError as e: |
| debug_log(f"连接错误: {e}") |
| raise Exception(f"上游API连接失败: {e}") |
| except requests.exceptions.RequestException as e: |
| debug_log(f"请求异常: {e}") |
| raise Exception(f"上游API请求失败: {e}") |
| except Exception as e: |
| debug_log(f"未知错误: {e}") |
| raise |
|
|