| | import os |
| | import re |
| | import shutil |
| | import signal |
| | import subprocess |
| | import time |
| | from typing import Dict |
| |
|
| | import openai |
| | import requests |
| |
|
| | from chatdev.codes import Codes |
| | from chatdev.documents import Documents |
| | from chatdev.roster import Roster |
| | from chatdev.utils import log_and_print_online |
| |
|
| |
|
| | class ChatEnvConfig: |
| | def __init__(self, clear_structure, |
| | brainstorming, |
| | gui_design, |
| | git_management): |
| | self.clear_structure = clear_structure |
| | self.brainstorming = brainstorming |
| | self.gui_design = gui_design |
| | self.git_management = git_management |
| |
|
| | def __str__(self): |
| | string = "" |
| | string += "ChatEnvConfig.clear_structure: {}\n".format(self.clear_structure) |
| | string += "ChatEnvConfig.brainstorming: {}\n".format(self.brainstorming) |
| | return string |
| |
|
| |
|
| | class ChatEnv: |
| | def __init__(self, chat_env_config: ChatEnvConfig): |
| | self.config = chat_env_config |
| | self.roster: Roster = Roster() |
| | self.codes: Codes = Codes() |
| | self.proposed_images: Dict[str, str] = {} |
| | self.incorporated_images: Dict[str, str] = {} |
| | self.requirements: Documents = Documents() |
| | self.manuals: Documents = Documents() |
| | self.env_dict = { |
| | "directory": "", |
| | "task_prompt": "", |
| | "modality": "", |
| | "ideas": "", |
| | "language": "", |
| | "review_comments": "", |
| | "error_summary": "", |
| | "test_reports": "" |
| | } |
| |
|
| | @staticmethod |
| | def fix_module_not_found_error(test_reports): |
| | if "ModuleNotFoundError" in test_reports: |
| | for match in re.finditer(r"No module named '(\S+)'", test_reports, re.DOTALL): |
| | module = match.group(1) |
| | subprocess.Popen("pip install {}".format(module), shell=True).wait() |
| | log_and_print_online("**[CMD Execute]**\n\n[CMD] pip install {}".format(module)) |
| |
|
| | def set_directory(self, directory): |
| | assert len(self.env_dict['directory']) == 0 |
| | self.env_dict['directory'] = directory |
| | self.codes.directory = directory |
| | self.requirements.directory = directory |
| | self.manuals.directory = directory |
| |
|
| | if os.path.exists(self.env_dict['directory']) and len(os.listdir(directory)) > 0: |
| | new_directory = "{}.{}".format(directory, time.strftime("%Y%m%d%H%M%S", time.localtime())) |
| | shutil.copytree(directory, new_directory) |
| | print("{} Copied to {}".format(directory, new_directory)) |
| | if self.config.clear_structure: |
| | if os.path.exists(self.env_dict['directory']): |
| | shutil.rmtree(self.env_dict['directory']) |
| | os.mkdir(self.env_dict['directory']) |
| | print("{} Created".format(directory)) |
| | else: |
| | os.mkdir(self.env_dict['directory']) |
| |
|
| | def exist_bugs(self) -> tuple[bool, str]: |
| | directory = self.env_dict['directory'] |
| |
|
| | success_info = "The software run successfully without errors." |
| | try: |
| | command = "cd {}; ls -l; python3 main.py;".format(directory) |
| | process = subprocess.Popen(command, shell=True, preexec_fn=os.setsid, |
| | stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| | time.sleep(3) |
| | return_code = process.returncode |
| | |
| | if process.poll() is None: |
| | os.killpg(os.getpgid(process.pid), signal.SIGTERM) |
| | if return_code == 0: |
| | return False, success_info |
| | else: |
| | error_output = process.stderr.read().decode('utf-8') |
| | if error_output: |
| | if "Traceback".lower() in error_output.lower(): |
| | errs = error_output.replace(directory + "/", "") |
| | return True, errs |
| | else: |
| | return False, success_info |
| | except subprocess.CalledProcessError as e: |
| | return True, f"Error: {e}" |
| | except Exception as ex: |
| | return True, f"An error occurred: {ex}" |
| |
|
| | return False, success_info |
| |
|
| | def recruit(self, agent_name: str): |
| | self.roster._recruit(agent_name) |
| |
|
| | def exist_employee(self, agent_name: str) -> bool: |
| | return self.roster._exist_employee(agent_name) |
| |
|
| | def print_employees(self): |
| | self.roster._print_employees() |
| |
|
| | def update_codes(self, generated_content): |
| | self.codes._update_codes(generated_content) |
| |
|
| | def rewrite_codes(self) -> None: |
| | self.codes._rewrite_codes(self.config.git_management) |
| |
|
| | def get_codes(self) -> str: |
| | return self.codes._get_codes() |
| |
|
| | def _load_from_hardware(self, directory) -> None: |
| | self.codes._load_from_hardware(directory) |
| |
|
| | def _update_requirements(self, generated_content): |
| | self.requirements._update_docs(generated_content) |
| |
|
| | def rewrite_requirements(self): |
| | self.requirements._rewrite_docs() |
| |
|
| | def get_requirements(self) -> str: |
| | return self.requirements._get_docs() |
| |
|
| | def _update_manuals(self, generated_content): |
| | self.manuals._update_docs(generated_content, parse=False, predifined_filename="manual.md") |
| |
|
| | def rewrite_manuals(self): |
| | self.manuals._rewrite_docs() |
| |
|
| | def write_meta(self) -> None: |
| | directory = self.env_dict['directory'] |
| |
|
| | if not os.path.exists(directory): |
| | os.mkdir(directory) |
| | print("{} Created.".format(directory)) |
| |
|
| | meta_filename = "meta.txt" |
| | with open(os.path.join(directory, meta_filename), "w", encoding="utf-8") as writer: |
| | writer.write("{}:\n{}\n\n".format("Task", self.env_dict['task_prompt'])) |
| | writer.write("{}:\n{}\n\n".format("Config", self.config.__str__())) |
| | writer.write("{}:\n{}\n\n".format("Roster", ", ".join(self.roster.agents))) |
| | writer.write("{}:\n{}\n\n".format("Modality", self.env_dict['modality'])) |
| | writer.write("{}:\n{}\n\n".format("Ideas", self.env_dict['ideas'])) |
| | writer.write("{}:\n{}\n\n".format("Language", self.env_dict['language'])) |
| | writer.write("{}:\n{}\n\n".format("Code_Version", self.codes.version)) |
| | writer.write("{}:\n{}\n\n".format("Proposed_images", len(self.proposed_images.keys()))) |
| | writer.write("{}:\n{}\n\n".format("Incorporated_images", len(self.incorporated_images.keys()))) |
| | print(os.path.join(directory, meta_filename), "Wrote") |
| |
|
| | def generate_images_from_codes(self): |
| | def download(img_url, file_name): |
| | r = requests.get(img_url) |
| | filepath = os.path.join(self.env_dict['directory'], file_name) |
| | if os.path.exists(filepath): |
| | os.remove(filepath) |
| | with open(filepath, "wb") as f: |
| | f.write(r.content) |
| | print("{} Downloaded".format(filepath)) |
| |
|
| | regex = r"(\w+.png)" |
| | joined_codes = self.get_codes() |
| | matches = re.finditer(regex, joined_codes, re.DOTALL) |
| | |
| | for match in matches: |
| | filename = match.group(1).strip() |
| | if filename in self.proposed_images.keys(): |
| | self.incorporated_images[filename] = self.proposed_images[filename] |
| | else: |
| | self.incorporated_images[filename] = filename.replace("_", " ") |
| |
|
| | for filename in self.incorporated_images.keys(): |
| | if not os.path.exists(os.path.join(self.env_dict['directory'], filename)): |
| | desc = self.incorporated_images[filename] |
| | if desc.endswith(".png"): |
| | desc = desc.replace(".png", "") |
| | print("{}: {}".format(filename, desc)) |
| | response = openai.Image.create( |
| | prompt=desc, |
| | n=1, |
| | size="256x256" |
| | ) |
| | image_url = response['data'][0]['url'] |
| | download(image_url, filename) |
| |
|
| | def get_proposed_images_from_message(self, messages): |
| | def download(img_url, file_name): |
| | r = requests.get(img_url) |
| | filepath = os.path.join(self.env_dict['directory'], file_name) |
| | if os.path.exists(filepath): |
| | os.remove(filepath) |
| | with open(filepath, "wb") as f: |
| | f.write(r.content) |
| | print("{} Downloaded".format(filepath)) |
| |
|
| | regex = r"(\w+.png):(.*?)\n" |
| | matches = re.finditer(regex, messages, re.DOTALL) |
| | images = {} |
| | for match in matches: |
| | filename = match.group(1).strip() |
| | desc = match.group(2).strip() |
| | images[filename] = desc |
| |
|
| | if len(images.keys()) == 0: |
| | regex = r"(\w+.png)" |
| | matches = re.finditer(regex, messages, re.DOTALL) |
| | images = {} |
| | for match in matches: |
| | filename = match.group(1).strip() |
| | desc = " ".join(filename.replace(".png", "").split("_")) |
| | images[filename] = desc |
| | print("{}: {}".format(filename, images[filename])) |
| |
|
| | for filename in images.keys(): |
| | if not os.path.exists(os.path.join(self.env_dict['directory'], filename)): |
| | desc = images[filename] |
| | if desc.endswith(".png"): |
| | desc = desc.replace(".png", "") |
| | print("{}: {}".format(filename, desc)) |
| | response = openai.Image.create( |
| | prompt=desc, |
| | n=1, |
| | size="256x256" |
| | ) |
| | image_url = response['data'][0]['url'] |
| | download(image_url, filename) |
| |
|
| | return images |
| |
|