| | from datetime import datetime |
| | import json |
| | import uuid |
| | import asyncio |
| | import random |
| | import os |
| | from typing import Any, Dict, Optional |
| |
|
| | import httpx |
| | from fastapi import HTTPException |
| | from api.config import ( |
| | MODEL_MAPPING, |
| | get_headers_api_chat, |
| | BASE_URL, |
| | AGENT_MODE, |
| | TRENDING_AGENT_MODE, |
| | MODEL_PREFIXES, |
| | ) |
| | from api.models import ChatRequest |
| | from api.logger import setup_logger |
| | from api.validate import getHid |
| |
|
| | logger = setup_logger(__name__) |
| |
|
| | BLOCKED_MESSAGE = "Generated by BLACKBOX.AI, try unlimited chat https://www.blackbox.ai" |
| | ADVERTISEMENT_TEXT = os.getenv("ADVERTISEMENT_TEXT", "") |
| |
|
| | def create_chat_completion_data(content: str, model: str, timestamp: int, finish_reason: Optional[str] = None) -> Dict[str, Any]: |
| | return { |
| | "id": f"chatcmpl-{uuid.uuid4()}", |
| | "object": "chat.completion.chunk", |
| | "created": timestamp, |
| | "model": model, |
| | "choices": [ |
| | { |
| | "index": 0, |
| | "delta": {"content": content, "role": "assistant"}, |
| | "finish_reason": finish_reason, |
| | } |
| | ], |
| | "usage": None, |
| | } |
| |
|
| | def message_to_dict(message, model_prefix: Optional[str] = None): |
| | content = message.content if isinstance(message.content, str) else message.content[0]["text"] |
| | if model_prefix: |
| | content = f"{model_prefix} {content}" |
| | return {"role": message.role, "content": content} |
| |
|
| | def strip_model_prefix(content: str, model_prefix: Optional[str] = None) -> str: |
| | if model_prefix and content.startswith(model_prefix): |
| | logger.debug(f"Stripping prefix '{model_prefix}' from content.") |
| | return content[len(model_prefix):].strip() |
| | return content |
| |
|
| | |
| | async def process_streaming_response(request: ChatRequest): |
| | request_id = f"chatcmpl-{uuid.uuid4()}" |
| | logger.info(f"Processing request with ID: {request_id} - Model: {request.model}") |
| |
|
| | agent_mode = AGENT_MODE.get(request.model, {}) |
| | trending_agent_mode = TRENDING_AGENT_MODE.get(request.model, {}) |
| | model_prefix = MODEL_PREFIXES.get(request.model, "") |
| |
|
| | headers_api_chat = get_headers_api_chat(BASE_URL) |
| |
|
| | |
| | if request.model == 'o1-preview': |
| | delay_seconds = random.randint(1, 60) |
| | logger.info(f"Introducing a delay of {delay_seconds} seconds for model 'o1-preview' (Request ID: {request_id})") |
| | await asyncio.sleep(delay_seconds) |
| |
|
| | h_value = await getHid() |
| | if not h_value: |
| | logger.error("Failed to retrieve h-value for validation.") |
| | raise HTTPException(status_code=500, detail="Validation failed due to missing h-value.") |
| |
|
| | json_data = { |
| | "agentMode": agent_mode, |
| | "clickedAnswer2": False, |
| | "clickedAnswer3": False, |
| | "clickedForceWebSearch": False, |
| | "codeModelMode": True, |
| | "githubToken": None, |
| | "id": None, |
| | "isChromeExt": False, |
| | "isMicMode": False, |
| | "maxTokens": request.max_tokens, |
| | "messages": [message_to_dict(msg, model_prefix=model_prefix) for msg in request.messages], |
| | "mobileClient": False, |
| | "playgroundTemperature": request.temperature, |
| | "playgroundTopP": request.top_p, |
| | "previewToken": None, |
| | "trendingAgentMode": trending_agent_mode, |
| | "userId": None, |
| | "userSelectedModel": MODEL_MAPPING.get(request.model, request.model), |
| | "userSystemPrompt": None, |
| | "validated": h_value, |
| | "visitFromDelta": False, |
| | "webSearchModePrompt": False, |
| | "imageGenerationMode": False, |
| | } |
| |
|
| | advertisement_added = False |
| |
|
| | async with httpx.AsyncClient() as client: |
| | try: |
| | async with client.stream("POST", f"{BASE_URL}/api/chat", headers=headers_api_chat, json=json_data, timeout=100) as response: |
| | response.raise_for_status() |
| |
|
| | timestamp = int(datetime.now().timestamp()) |
| | async for chunk in response.aiter_text(): |
| | if chunk: |
| | content = chunk |
| | if content.startswith("$@$v=undefined-rv1$@$"): |
| | content = content[21:] |
| |
|
| | if BLOCKED_MESSAGE in content: |
| | logger.info(f"Blocked message detected in response for Request ID {request_id}.") |
| | content = content.replace(BLOCKED_MESSAGE, '').strip() |
| |
|
| | if not content: |
| | continue |
| |
|
| | cleaned_content = strip_model_prefix(content, model_prefix) |
| |
|
| | |
| | yield f"data: {json.dumps(create_chat_completion_data(cleaned_content, request.model, timestamp))}\n\n" |
| |
|
| | |
| | if ADVERTISEMENT_TEXT and not advertisement_added: |
| | advertisement_with_line_break = "\n\n---\n" + ADVERTISEMENT_TEXT |
| | yield f"data: {json.dumps(create_chat_completion_data(advertisement_with_line_break, request.model, timestamp))}\n\n" |
| | advertisement_added = True |
| |
|
| | |
| | yield f"data: {json.dumps(create_chat_completion_data('', request.model, timestamp, 'stop'))}\n\n" |
| | yield "data: [DONE]\n\n" |
| |
|
| | except httpx.HTTPStatusError as e: |
| | logger.error(f"HTTP error occurred for Request ID {request_id}: {e}") |
| | raise HTTPException(status_code=e.response.status_code, detail=str(e)) |
| | except httpx.RequestError as e: |
| | logger.error(f"Request error occurred for Request ID {request_id}: {e}") |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | |
| | async def process_non_streaming_response(request: ChatRequest): |
| | request_id = f"chatcmpl-{uuid.uuid4()}" |
| | logger.info(f"Processing request with ID: {request_id} - Model: {request.model}") |
| |
|
| | agent_mode = AGENT_MODE.get(request.model, {}) |
| | trending_agent_mode = TRENDING_AGENT_MODE.get(request.model, {}) |
| | model_prefix = MODEL_PREFIXES.get(request.model, "") |
| |
|
| | headers_api_chat = get_headers_api_chat(BASE_URL) |
| |
|
| | |
| | if request.model == 'o1-preview': |
| | delay_seconds = random.randint(20, 60) |
| | logger.info(f"Introducing a delay of {delay_seconds} seconds for model 'o1-preview' (Request ID: {request_id})") |
| | await asyncio.sleep(delay_seconds) |
| |
|
| | h_value = await getHid() |
| | if not h_value: |
| | logger.error("Failed to retrieve h-value for validation.") |
| | raise HTTPException(status_code=500, detail="Validation failed due to missing h-value.") |
| |
|
| | json_data = { |
| | "agentMode": agent_mode, |
| | "clickedAnswer2": False, |
| | "clickedAnswer3": False, |
| | "clickedForceWebSearch": False, |
| | "codeModelMode": True, |
| | "githubToken": None, |
| | "id": None, |
| | "isChromeExt": False, |
| | "isMicMode": False, |
| | "maxTokens": request.max_tokens, |
| | "messages": [message_to_dict(msg, model_prefix=model_prefix) for msg in request.messages], |
| | "mobileClient": False, |
| | "playgroundTemperature": request.temperature, |
| | "playgroundTopP": request.top_p, |
| | "previewToken": None, |
| | "trendingAgentMode": trending_agent_mode, |
| | "userId": None, |
| | "userSelectedModel": MODEL_MAPPING.get(request.model, request.model), |
| | "userSystemPrompt": None, |
| | "validated": h_value, |
| | "visitFromDelta": False, |
| | "webSearchModePrompt": False, |
| | "imageGenerationMode": False, |
| | } |
| |
|
| | full_response = "" |
| |
|
| | async with httpx.AsyncClient() as client: |
| | try: |
| | async with client.stream("POST", f"{BASE_URL}/api/chat", headers=headers_api_chat, json=json_data) as response: |
| | response.raise_for_status() |
| | async for chunk in response.aiter_text(): |
| | full_response += chunk |
| |
|
| | except httpx.HTTPStatusError as e: |
| | logger.error(f"HTTP error occurred for Request ID {request_id}: {e}") |
| | raise HTTPException(status_code=e.response.status_code, detail=str(e)) |
| | except httpx.RequestError as e: |
| | logger.error(f"Request error occurred for Request ID {request_id}: {e}") |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | if full_response.startswith("$@$v=undefined-rv1$@$"): |
| | full_response = full_response[21:] |
| |
|
| | if BLOCKED_MESSAGE in full_response: |
| | full_response = full_response.replace(BLOCKED_MESSAGE, '').strip() |
| |
|
| | cleaned_full_response = strip_model_prefix(full_response, model_prefix) |
| |
|
| | |
| | if ADVERTISEMENT_TEXT: |
| | cleaned_full_response += "\n\n" + ADVERTISEMENT_TEXT |
| |
|
| | return { |
| | "id": f"chatcmpl-{uuid.uuid4()}", |
| | "object": "chat.completion", |
| | "created": int(datetime.now().timestamp()), |
| | "model": request.model, |
| | "choices": [ |
| | { |
| | "index": 0, |
| | "message": {"role": "assistant", "content": cleaned_full_response}, |
| | "finish_reason": "stop", |
| | } |
| | ], |
| | "usage": None, |
| | } |
| |
|