| | |
| | |
| |
|
| | import os |
| | import json |
| | import random |
| | import tarfile |
| | import requests |
| | import datetime |
| | import numpy as np |
| | import pandas as pd |
| | from ..utils import make_parent_folder |
| | from ...core.logging import logger |
| | from ...core.module_utils import load_json, save_json |
| |
|
| |
|
| | AFLOW_DATASET_FILES_MAP = { |
| | "hotpotqa": {"train": None, "dev": "hotpotqa_validate.jsonl", "test": "hotpotqa_test.jsonl"}, |
| | "humaneval": {"train": None, "dev": "humaneval_validate.jsonl", "test": "humaneval_test.jsonl", "test_cases": "humaneval_public_test.jsonl"}, |
| | "mbpp": {"train": None, "dev": "mbpp_validate.jsonl", "test": "mbpp_test.jsonl", "test_cases": "mbpp_public_test.jsonl"}, |
| | "gsm8k": {"train": None, "dev": "gsm8k_validate.jsonl", "test": "gsm8k_test.jsonl"}, |
| | "math": {"train": None, "dev": "math_validate.jsonl", "test": "math_test.jsonl"}, |
| | } |
| |
|
| | def extract_tar_gz(filename: str, extract_path: str) -> None: |
| | """Extract a tar.gz file to the specified path.""" |
| | with tarfile.open(filename, "r:gz") as tar: |
| | tar.extractall(path=extract_path) |
| |
|
| |
|
| | def download_aflow_benchmark_data(dataset: str, save_folder: str): |
| |
|
| | candidate_datasets = list(AFLOW_DATASET_FILES_MAP.keys()) + ["all"] |
| | lower_candidate_datasets = [dataset.lower() for dataset in candidate_datasets] |
| | if dataset.lower() not in lower_candidate_datasets: |
| | raise ValueError(f"Invalid value for dataset: {dataset}. Available choices: {candidate_datasets}") |
| | |
| | url = "https://drive.google.com/uc?export=download&id=1DNoegtZiUhWtvkd2xoIuElmIi4ah7k8e" |
| | logger.info(f"Downloading AFlow benchmark data from {url} ...") |
| | aflow_data_save_file = os.path.join(save_folder, "aflow_data.tar.gz") |
| | |
| | make_parent_folder(aflow_data_save_file) |
| | response = requests.get(url, stream=True) |
| | response.raise_for_status() |
| | with open(aflow_data_save_file, "wb") as file: |
| | for chunk in response.iter_content(chunk_size=1024): |
| | if chunk: |
| | file.write(chunk) |
| |
|
| | logger.info(f"Extracting data for {dataset} dataset(s) from {aflow_data_save_file} ...") |
| | extract_tar_gz(aflow_data_save_file, extract_path=save_folder) |
| |
|
| | if dataset != "all": |
| | dataset_files = [file for file in list(AFLOW_DATASET_FILES_MAP[dataset].values()) if file is not None] |
| | for file in os.listdir(save_folder): |
| | if file not in dataset_files: |
| | os.remove(os.path.join(save_folder, file)) |
| | |
| | if os.path.exists(aflow_data_save_file): |
| | logger.info(f"Remove {aflow_data_save_file}") |
| | os.remove(aflow_data_save_file) |
| |
|
| |
|
| | class DataUtils: |
| |
|
| | def __init__(self, root_path: str): |
| | self.root_path = root_path |
| | self.top_scores = [] |
| |
|
| | def load_results(self, path: str) -> list: |
| | result_path = os.path.join(path, "results.json") |
| | if os.path.exists(result_path): |
| | with open(result_path, "r") as json_file: |
| | try: |
| | return json.load(json_file) |
| | except json.JSONDecodeError: |
| | return [] |
| | return [] |
| |
|
| | def get_top_rounds(self, sample: int, path=None, mode="Graph"): |
| |
|
| | self._load_scores(path, mode) |
| | unique_rounds = set() |
| | unique_top_scores = [] |
| |
|
| | first_round = next((item for item in self.top_scores if item["round"] == 0), None) |
| | if first_round: |
| | unique_top_scores.append(first_round) |
| | unique_rounds.add(0) |
| |
|
| | for item in self.top_scores: |
| | if item["round"] not in unique_rounds: |
| | unique_top_scores.append(item) |
| | unique_rounds.add(item["round"]) |
| |
|
| | if len(unique_top_scores) >= sample: |
| | break |
| |
|
| | return unique_top_scores |
| |
|
| | def select_round(self, items): |
| |
|
| | if not items: |
| | raise ValueError("Item list is empty.") |
| |
|
| | sorted_items = sorted(items, key=lambda x: x["score"], reverse=True) |
| | scores = [item["score"] * 100 for item in sorted_items] |
| |
|
| | probabilities = self._compute_probabilities(scores) |
| | logger.info(f"\nMixed probability distribution: {probabilities}") |
| | logger.info(f"\nSorted rounds: {sorted_items}") |
| |
|
| | selected_index = np.random.choice(len(sorted_items), p=probabilities) |
| | logger.info(f"\nSelected index: {selected_index}, Selected item: {sorted_items[selected_index]}") |
| |
|
| | return sorted_items[selected_index] |
| |
|
| | def _compute_probabilities(self, scores, alpha=0.2, lambda_=0.3): |
| |
|
| | scores = np.array(scores, dtype=np.float64) |
| | n = len(scores) |
| |
|
| | if n == 0: |
| | raise ValueError("Score list is empty.") |
| |
|
| | uniform_prob = np.full(n, 1.0 / n, dtype=np.float64) |
| |
|
| | max_score = np.max(scores) |
| | shifted_scores = scores - max_score |
| | exp_weights = np.exp(alpha * shifted_scores) |
| |
|
| | sum_exp_weights = np.sum(exp_weights) |
| | if sum_exp_weights == 0: |
| | raise ValueError("Sum of exponential weights is 0, cannot normalize.") |
| |
|
| | score_prob = exp_weights / sum_exp_weights |
| |
|
| | mixed_prob = lambda_ * uniform_prob + (1 - lambda_) * score_prob |
| |
|
| | total_prob = np.sum(mixed_prob) |
| | if not np.isclose(total_prob, 1.0): |
| | mixed_prob = mixed_prob / total_prob |
| |
|
| | return mixed_prob |
| |
|
| | def load_log(self, cur_round, path=None, mode: str = "Graph"): |
| | if mode == "Graph": |
| | log_dir = os.path.join(self.root_path, f"round_{cur_round}", "log.json") |
| | else: |
| | log_dir = path |
| |
|
| | if not os.path.exists(log_dir): |
| | return "" |
| | logger.info(log_dir) |
| | |
| | data = load_json(log_dir, type="json") |
| |
|
| | if isinstance(data, dict): |
| | data = [data] |
| | elif not isinstance(data, list): |
| | data = list(data) |
| |
|
| | if not data: |
| | return "" |
| |
|
| | sample_size = min(3, len(data)) |
| | random_samples = random.sample(data, sample_size) |
| |
|
| | log = "" |
| | for sample in random_samples: |
| | log += json.dumps(sample, indent=4, ensure_ascii=False) + "\n\n" |
| |
|
| | return log |
| |
|
| | def get_results_file_path(self, graph_path: str) -> str: |
| | return os.path.join(graph_path, "results.json") |
| |
|
| | def create_result_data(self, round: int, score: float, avg_cost: float, total_cost: float) -> dict: |
| | now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| | return {"round": round, "score": score, "avg_cost": avg_cost, "total_cost": total_cost, "time": now} |
| |
|
| | def save_results(self, json_file_path: str, data: list): |
| | save_json(data, json_file_path, type="json", use_indent=True) |
| |
|
| | def _load_scores(self, path=None, mode="Graph"): |
| | if mode == "Graph": |
| | rounds_dir = self.root_path |
| | else: |
| | rounds_dir = path |
| |
|
| | result_file = os.path.join(rounds_dir, "results.json") |
| | self.top_scores = [] |
| |
|
| | data = load_json(result_file, type="json") |
| | df = pd.DataFrame(data) |
| |
|
| | scores_per_round = df.groupby("round")["score"].mean().to_dict() |
| |
|
| | for round_number, average_score in scores_per_round.items(): |
| | self.top_scores.append({"round": round_number, "score": average_score}) |
| |
|
| | self.top_scores.sort(key=lambda x: x["score"], reverse=True) |
| |
|
| | return self.top_scores |
| | |
| |
|
| | def test_case_2_test_function(solution: str, test_case: str, entry_point: str): |
| | tester_function = f""" |
| | {solution} |
| | |
| | |
| | def check(candidate): |
| | {test_case} |
| | |
| | def test_check(): |
| | check({entry_point}) |
| | |
| | test_check() |
| | """ |
| | return tester_function |
| | |