| import re, os, threading, queue, requests
|
| from typing import List, Optional, Union
|
| from pydantic import BaseModel, Field
|
| from pydantic_settings import BaseSettings
|
|
|
| from api_types import ChatMessage
|
|
|
|
|
| def parse_think_response(full_response: str):
|
| think_start = full_response.find("<think")
|
| if think_start == -1:
|
| return None, full_response.strip()
|
|
|
| think_end = full_response.find("</think>")
|
| if think_end == -1:
|
| reasoning = full_response[think_start:].strip()
|
| content = ""
|
| else:
|
| reasoning = full_response[think_start : think_end + 9].strip()
|
| content = full_response[think_end + 9 :].strip()
|
|
|
|
|
| reasoning_content = reasoning.replace("<think", "").replace("</think>", "").strip()
|
| return reasoning_content, content
|
|
|
|
|
| def cleanMessages(messages: List[ChatMessage], removeThinkingContent: bool = False):
|
| promptStrList = []
|
|
|
| for message in messages:
|
| content = message.content.strip()
|
| content = re.sub(r"\n+", "\n", content)
|
| promptStrList.append(
|
| f"{message.role.strip().lower().capitalize()}: {content if message.role.strip().lower().capitalize()!='Assistant' or not removeThinkingContent else remove_nested_think_tags_stack(content)}"
|
| )
|
|
|
| return "\n\n".join(promptStrList)
|
|
|
|
|
| def remove_nested_think_tags_stack(text):
|
| stack = []
|
| result = ""
|
| i = 0
|
| while i < len(text):
|
| if text[i : i + 7] == "<think>":
|
| stack.append("<think>")
|
| i += 7
|
| elif text[i : i + 8] == "</think>":
|
| if stack and stack[-1] == "<think>":
|
| stack.pop()
|
| i += 8
|
| else:
|
| result += text[i : i + 8]
|
| i += 8
|
| elif not stack:
|
| result += text[i]
|
| i += 1
|
| else:
|
| i += 1
|
| return result
|
|
|
|
|
| def format_bytes(size):
|
| power = 2**10
|
| n = 0
|
| power_labels = {0: "", 1: "K", 2: "M", 3: "G", 4: "T"}
|
| while size > power:
|
| size /= power
|
| n += 1
|
| return f"{size:.4f}{power_labels[n]+'B'}"
|
|
|
|
|
| LOGGER_QUEUE = queue.Queue(5)
|
|
|
|
|
| def logger():
|
| print("enable")
|
| while True:
|
| item = LOGGER_QUEUE.get()
|
| try:
|
| requests.post(
|
| os.environ.get("LOG_PORT"),
|
| headers={"Content-Type": "application/json"},
|
| json=item,
|
| )
|
| except Exception:
|
| pass
|
|
|
|
|
| if os.environ.get("LOG_PORT"):
|
| threading.Thread(target=logger).start()
|
|
|
|
|
| def log(item):
|
| LOGGER_QUEUE.put_nowait(item)
|
|
|