| import hashlib |
| import re |
| import time |
| import uuid |
| from datetime import timedelta |
| from pathlib import Path |
| from typing import Callable, Optional |
|
|
|
|
| def get_messages_content(messages: list[dict]) -> str: |
| return "\n".join( |
| [ |
| f"{message['role'].upper()}: {get_content_from_message(message)}" |
| for message in messages |
| ] |
| ) |
|
|
|
|
| def get_last_user_message_item(messages: list[dict]) -> Optional[dict]: |
| for message in reversed(messages): |
| if message["role"] == "user": |
| return message |
| return None |
|
|
|
|
| def get_content_from_message(message: dict) -> Optional[str]: |
| if isinstance(message["content"], list): |
| for item in message["content"]: |
| if item["type"] == "text": |
| return item["text"] |
| else: |
| return message["content"] |
| return None |
|
|
|
|
| def get_last_user_message(messages: list[dict]) -> Optional[str]: |
| message = get_last_user_message_item(messages) |
| if message is None: |
| return None |
| return get_content_from_message(message) |
|
|
|
|
| def get_last_assistant_message(messages: list[dict]) -> Optional[str]: |
| for message in reversed(messages): |
| if message["role"] == "assistant": |
| return get_content_from_message(message) |
| return None |
|
|
|
|
| def get_system_message(messages: list[dict]) -> Optional[dict]: |
| for message in messages: |
| if message["role"] == "system": |
| return message |
| return None |
|
|
|
|
| def remove_system_message(messages: list[dict]) -> list[dict]: |
| return [message for message in messages if message["role"] != "system"] |
|
|
|
|
| def pop_system_message(messages: list[dict]) -> tuple[Optional[dict], list[dict]]: |
| return get_system_message(messages), remove_system_message(messages) |
|
|
|
|
| def prepend_to_first_user_message_content( |
| content: str, messages: list[dict] |
| ) -> list[dict]: |
| for message in messages: |
| if message["role"] == "user": |
| if isinstance(message["content"], list): |
| for item in message["content"]: |
| if item["type"] == "text": |
| item["text"] = f"{content}\n{item['text']}" |
| else: |
| message["content"] = f"{content}\n{message['content']}" |
| break |
| return messages |
|
|
|
|
| def add_or_update_system_message(content: str, messages: list[dict]): |
| """ |
| Adds a new system message at the beginning of the messages list |
| or updates the existing system message at the beginning. |
| |
| :param msg: The message to be added or appended. |
| :param messages: The list of message dictionaries. |
| :return: The updated list of message dictionaries. |
| """ |
|
|
| if messages and messages[0].get("role") == "system": |
| messages[0]["content"] = f"{content}\n{messages[0]['content']}" |
| else: |
| |
| messages.insert(0, {"role": "system", "content": content}) |
|
|
| return messages |
|
|
|
|
| def openai_chat_message_template(model: str): |
| return { |
| "id": f"{model}-{str(uuid.uuid4())}", |
| "created": int(time.time()), |
| "model": model, |
| "choices": [{"index": 0, "logprobs": None, "finish_reason": None}], |
| } |
|
|
|
|
| def openai_chat_chunk_message_template( |
| model: str, message: Optional[str] = None |
| ) -> dict: |
| template = openai_chat_message_template(model) |
| template["object"] = "chat.completion.chunk" |
| if message: |
| template["choices"][0]["delta"] = {"content": message} |
| else: |
| template["choices"][0]["finish_reason"] = "stop" |
| return template |
|
|
|
|
| def openai_chat_completion_message_template( |
| model: str, message: Optional[str] = None |
| ) -> dict: |
| template = openai_chat_message_template(model) |
| template["object"] = "chat.completion" |
| if message is not None: |
| template["choices"][0]["message"] = {"content": message, "role": "assistant"} |
| template["choices"][0]["finish_reason"] = "stop" |
| return template |
|
|
|
|
| def get_gravatar_url(email): |
| |
| |
| |
| address = str(email).strip().lower() |
|
|
| |
| hash_object = hashlib.sha256(address.encode()) |
| hash_hex = hash_object.hexdigest() |
|
|
| |
| return f"https://www.gravatar.com/avatar/{hash_hex}?d=mp" |
|
|
|
|
| def calculate_sha256(file): |
| sha256 = hashlib.sha256() |
| |
| for chunk in iter(lambda: file.read(8192), b""): |
| sha256.update(chunk) |
| return sha256.hexdigest() |
|
|
|
|
| def calculate_sha256_string(string): |
| |
| sha256_hash = hashlib.sha256() |
| |
| sha256_hash.update(string.encode("utf-8")) |
| |
| hashed_string = sha256_hash.hexdigest() |
| return hashed_string |
|
|
|
|
| def validate_email_format(email: str) -> bool: |
| if email.endswith("@localhost"): |
| return True |
|
|
| return bool(re.match(r"[^@]+@[^@]+\.[^@]+", email)) |
|
|
|
|
| def sanitize_filename(file_name): |
| |
| lower_case_file_name = file_name.lower() |
|
|
| |
| sanitized_file_name = re.sub(r"[^\w\s]", "", lower_case_file_name) |
|
|
| |
| final_file_name = re.sub(r"\s+", "-", sanitized_file_name) |
|
|
| return final_file_name |
|
|
|
|
| def extract_folders_after_data_docs(path): |
| |
| path = Path(path) |
|
|
| |
| parts = path.parts |
|
|
| |
| try: |
| index_data_docs = parts.index("data") + 1 |
| index_docs = parts.index("docs", index_data_docs) + 1 |
| except ValueError: |
| return [] |
|
|
| |
| tags = [] |
|
|
| folders = parts[index_docs:-1] |
| for idx, _ in enumerate(folders): |
| tags.append("/".join(folders[: idx + 1])) |
|
|
| return tags |
|
|
|
|
| def parse_duration(duration: str) -> Optional[timedelta]: |
| if duration == "-1" or duration == "0": |
| return None |
|
|
| |
| pattern = r"(-?\d+(\.\d+)?)(ms|s|m|h|d|w)" |
| matches = re.findall(pattern, duration) |
|
|
| if not matches: |
| raise ValueError("Invalid duration string") |
|
|
| total_duration = timedelta() |
|
|
| for number, _, unit in matches: |
| number = float(number) |
| if unit == "ms": |
| total_duration += timedelta(milliseconds=number) |
| elif unit == "s": |
| total_duration += timedelta(seconds=number) |
| elif unit == "m": |
| total_duration += timedelta(minutes=number) |
| elif unit == "h": |
| total_duration += timedelta(hours=number) |
| elif unit == "d": |
| total_duration += timedelta(days=number) |
| elif unit == "w": |
| total_duration += timedelta(weeks=number) |
|
|
| return total_duration |
|
|
|
|
| def parse_ollama_modelfile(model_text): |
| parameters_meta = { |
| "mirostat": int, |
| "mirostat_eta": float, |
| "mirostat_tau": float, |
| "num_ctx": int, |
| "repeat_last_n": int, |
| "repeat_penalty": float, |
| "temperature": float, |
| "seed": int, |
| "tfs_z": float, |
| "num_predict": int, |
| "top_k": int, |
| "top_p": float, |
| "num_keep": int, |
| "typical_p": float, |
| "presence_penalty": float, |
| "frequency_penalty": float, |
| "penalize_newline": bool, |
| "numa": bool, |
| "num_batch": int, |
| "num_gpu": int, |
| "main_gpu": int, |
| "low_vram": bool, |
| "f16_kv": bool, |
| "vocab_only": bool, |
| "use_mmap": bool, |
| "use_mlock": bool, |
| "num_thread": int, |
| } |
|
|
| data = {"base_model_id": None, "params": {}} |
|
|
| |
| base_model_match = re.search( |
| r"^FROM\s+(\w+)", model_text, re.MULTILINE | re.IGNORECASE |
| ) |
| if base_model_match: |
| data["base_model_id"] = base_model_match.group(1) |
|
|
| |
| template_match = re.search( |
| r'TEMPLATE\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE |
| ) |
| if template_match: |
| data["params"] = {"template": template_match.group(1).strip()} |
|
|
| |
| stops = re.findall(r'PARAMETER stop "(.*?)"', model_text, re.IGNORECASE) |
| if stops: |
| data["params"]["stop"] = stops |
|
|
| |
| for param, param_type in parameters_meta.items(): |
| param_match = re.search(rf"PARAMETER {param} (.+)", model_text, re.IGNORECASE) |
| if param_match: |
| value = param_match.group(1) |
|
|
| try: |
| if param_type is int: |
| value = int(value) |
| elif param_type is float: |
| value = float(value) |
| elif param_type is bool: |
| value = value.lower() == "true" |
| except Exception as e: |
| print(e) |
| continue |
|
|
| data["params"][param] = value |
|
|
| |
| adapter_match = re.search(r"ADAPTER (.+)", model_text, re.IGNORECASE) |
| if adapter_match: |
| data["params"]["adapter"] = adapter_match.group(1) |
|
|
| |
| system_desc_match = re.search( |
| r'SYSTEM\s+"""(.+?)"""', model_text, re.DOTALL | re.IGNORECASE |
| ) |
| system_desc_match_single = re.search( |
| r"SYSTEM\s+([^\n]+)", model_text, re.IGNORECASE |
| ) |
|
|
| if system_desc_match: |
| data["params"]["system"] = system_desc_match.group(1).strip() |
| elif system_desc_match_single: |
| data["params"]["system"] = system_desc_match_single.group(1).strip() |
|
|
| |
| messages = [] |
| message_matches = re.findall(r"MESSAGE (\w+) (.+)", model_text, re.IGNORECASE) |
| for role, content in message_matches: |
| messages.append({"role": role, "content": content}) |
|
|
| if messages: |
| data["params"]["messages"] = messages |
|
|
| return data |
|
|