| import re |
| from pprint import pprint |
|
|
| from transformers import AutoTokenizer |
|
|
| from constants.models import AVAILABLE_MODELS, MODEL_MAP |
| from tclogger import logger |
|
|
|
|
| class MessageComposer: |
| def __init__(self, model: str = None): |
| if model in AVAILABLE_MODELS: |
| self.model = model |
| else: |
| self.model = "nous-mixtral-8x7b" |
| self.model_fullname = MODEL_MAP[self.model] |
| self.system_roles = ["system"] |
| self.inst_roles = ["user", "system", "inst"] |
| self.answer_roles = ["assistant", "bot", "answer", "model"] |
| self.default_role = "user" |
|
|
| def concat_messages_by_role(self, messages): |
| def is_same_role(role1, role2): |
| if ( |
| (role1 == role2) |
| or (role1 in self.inst_roles and role2 in self.inst_roles) |
| or (role1 in self.answer_roles and role2 in self.answer_roles) |
| ): |
| return True |
| else: |
| return False |
|
|
| concat_messages = [] |
| for message in messages: |
| role = message["role"] |
| content = message["content"] |
| if concat_messages and is_same_role(role, concat_messages[-1]["role"]): |
| concat_messages[-1]["content"] += "\n" + content |
| else: |
| if role in self.inst_roles: |
| message["role"] = "inst" |
| elif role in self.answer_roles: |
| message["role"] = "answer" |
| else: |
| message["role"] = "inst" |
| concat_messages.append(message) |
| return concat_messages |
|
|
| def merge(self, messages) -> str: |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
| |
| |
| |
|
|
| self.messages = messages |
| self.merged_str = "" |
|
|
| |
| if self.model in ["mixtral-8x7b", "mistral-7b"]: |
| self.messages = self.concat_messages_by_role(messages) |
| self.cached_str = "" |
| for message in self.messages: |
| role = message["role"] |
| content = message["content"] |
| if role in self.inst_roles: |
| self.cached_str = f"[INST] {content} [/INST]" |
| elif role in self.answer_roles: |
| self.merged_str += f"<s> {self.cached_str} {content} </s>\n" |
| self.cached_str = "" |
| else: |
| self.cached_str = f"[INST] {content} [/INST]" |
| if self.cached_str: |
| self.merged_str += f"{self.cached_str}" |
| |
| elif self.model in ["nous-mixtral-8x7b"]: |
| self.merged_str_list = [] |
| for message in self.messages: |
| role = message["role"] |
| content = message["content"] |
| if role not in ["system", "user", "assistant"]: |
| role = self.default_role |
| message_line = f"<|im_start|>{role}\n{content}<|im_end|>" |
| self.merged_str_list.append(message_line) |
| self.merged_str_list.append("<|im_start|>assistant") |
| self.merged_str = "\n".join(self.merged_str_list) |
| |
| elif self.model in ["openchat-3.5"]: |
| self.messages = self.concat_messages_by_role(messages) |
| self.merged_str_list = [] |
| self.end_of_turn = "<|end_of_turn|>" |
| for message in self.messages: |
| role = message["role"] |
| content = message["content"] |
| if role in self.inst_roles: |
| self.merged_str_list.append( |
| f"GPT4 Correct User:\n{content}{self.end_of_turn}" |
| ) |
| elif role in self.answer_roles: |
| self.merged_str_list.append( |
| f"GPT4 Correct Assistant:\n{content}{self.end_of_turn}" |
| ) |
| else: |
| self.merged_str_list.append( |
| f"GPT4 Correct User: {content}{self.end_of_turn}" |
| ) |
| self.merged_str_list.append(f"GPT4 Correct Assistant:\n") |
| self.merged_str = "\n".join(self.merged_str_list) |
| |
| elif self.model in ["gemma-7b"]: |
| self.messages = self.concat_messages_by_role(messages) |
| self.merged_str_list = [] |
| self.end_of_turn = "<end_of_turn>" |
| self.start_of_turn = "<start_of_turn>" |
| for message in self.messages: |
| role = message["role"] |
| content = message["content"] |
| if role in self.inst_roles: |
| self.merged_str_list.append( |
| f"{self.start_of_turn}user\n{content}{self.end_of_turn}" |
| ) |
| elif role in self.answer_roles: |
| self.merged_str_list.append( |
| f"{self.start_of_turn}model\n{content}{self.end_of_turn}" |
| ) |
| else: |
| self.merged_str_list.append( |
| f"{self.start_of_turn}user\n{content}{self.end_of_turn}" |
| ) |
| self.merged_str_list.append(f"{self.start_of_turn}model\n") |
| self.merged_str = "<bos>" + "\n".join(self.merged_str_list) |
| |
| |
| |
| elif self.model in ["openchat-3.5", "command-r-plus", "gemma-7b"]: |
| tokenizer = AutoTokenizer.from_pretrained(self.model_fullname) |
| self.merged_str = tokenizer.apply_chat_template( |
| messages, tokenize=False, add_generation_prompt=True |
| ) |
| else: |
| self.merged_str = "\n\n".join( |
| [f"{message['role']}: {message['content']}" for message in messages] |
| ) |
|
|
| return self.merged_str |
|
|
| def decompose_to_system_and_input_prompt( |
| self, messages: list[dict], append_assistant=True |
| ): |
| system_prompt_list = [] |
| user_and_assistant_messages = [] |
| for message in messages: |
| role = message["role"] |
| content = message["content"] |
| if role in self.system_roles: |
| system_prompt_list.append(content) |
| else: |
| user_and_assistant_messages.append(message) |
| system_prompt = "\n".join(system_prompt_list) |
|
|
| input_prompt_list = [] |
| input_messages = self.concat_messages_by_role(user_and_assistant_messages) |
| for message in input_messages: |
| role = message["role"] |
| content = message["content"] |
| if role in self.answer_roles: |
| role_content_str = f"`assistant`:\n{content}" |
| else: |
| role_content_str = f"`user`:\n{content}" |
| input_prompt_list.append(role_content_str) |
| input_prompt = "\n\n".join(input_prompt_list) |
|
|
| if append_assistant: |
| input_prompt += "\n\n`assistant`:" |
|
|
| return system_prompt, input_prompt |
|
|
|
|
| if __name__ == "__main__": |
| |
| |
| model = "gemma-7b" |
| |
| |
| composer = MessageComposer(model) |
| messages = [ |
| { |
| "role": "system", |
| "content": "You are a LLM developed by OpenAI.\nYour name is GPT-4.", |
| }, |
| {"role": "user", "content": "Hello, who are you?"}, |
| {"role": "assistant", "content": "I am a bot."}, |
| {"role": "user", "content": "What is your name?"}, |
| |
| |
| |
| |
| |
| |
| |
| ] |
| |
| |
| |
| |
|
|
| system_prompt, input_prompt = composer.decompose_to_system_and_input_prompt( |
| messages |
| ) |
| logger.note("system_prompt:") |
| logger.mesg(system_prompt) |
| logger.note("input_prompt:") |
| logger.mesg(input_prompt) |
|
|
| |
|
|