| import json |
| import os |
| import shutil |
| import subprocess |
| from loguru import logger |
| from multiprocessing import Pool, Lock, Value |
|
|
| |
| from dataloaders.ProblemState import ProblemStateROCm |
| from tb_eval.evaluators.interface import get_evaluators |
| from tb_eval.helpers.helper import extract_first_pytest_failure |
| from tb_eval.perf.efficiency import get_perf_evaluators |
|
|
| class ROCm: |
| def __init__(self, |
| statis_path, |
| py_folder, |
| instruction_path, |
| log_root, |
| py_interpreter='python3', |
| result_path=None |
| ): |
| self.statis_path = statis_path |
| self.py_folder = py_folder |
| self.instruction_path = instruction_path |
| |
| self.rocm_tests = True |
| self.problem_states = self.load_ps() |
| self.log_root = log_root |
| |
| |
| self.evaluator = get_evaluators["rocm"]() |
| self.perf_evaluator = get_perf_evaluators["rocm"]() |
| logger.info("Custom tests path set to: {}".format(self.py_folder)) |
|
|
| def load_ps(self,): |
| problem_states = [] |
| with open(self.instruction_path, "r", encoding='utf-8') as file: |
| instructions = json.load(file) |
| statis_data = json.loads(open(self.statis_path, 'r', encoding='utf-8').read()) |
|
|
| for line in instructions: |
| instruction = line["instruction"] |
| label = line["label"] |
| opname = line["opname"] |
| g = label.replace("<|im_end|>", "").replace("<|EOT|>", "") |
| tmp = False |
| for item in statis_data: |
| if g in item["label"]: |
| file = item["file"] |
| tmp = item |
| break |
| if tmp: statis_data.remove(tmp) |
| |
| path = os.path.join(self.py_folder, file) |
| assert os.path.exists(path), f"{file} not exist!" |
| test_code = open(path, "r", encoding="utf-8").read().split("#"*146)[-1] |
| assert "def test_" in test_code, "" |
|
|
| problemstate = ProblemStateROCm( |
| instruction=instruction, |
| label=label, |
| test_code=test_code, |
| filename=file, |
| opname=opname, |
| target_kernel_name=line.get("target_kernel_name", "") |
| ) |
| problem_states.append(problemstate) |
| return problem_states |
|
|
| def __len__(self): |
| return len(self.problem_states) |
|
|
| def write_file(self, file_path, start_idx=0, datalen=None): |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) |
| data_len = datalen if datalen is not None else len(self) |
| with open(file_path, 'w') as f: |
| for ps in self.problem_states[start_idx:(start_idx + data_len)]: |
| output = { |
| "instruction": ps.instruction, |
| "label": ps.label, |
| "file": ps.filename, |
| "target_kernel_name": ps.target_kernel_name, |
| "predict": ps.solution if ps.solution else "", |
| "speedup": ps.speedup |
| } |
| f.write(json.dumps(output) + "\n") |
|
|
| def test_opt_correctness(self, code, filename, opname, tmp_dir="temp", save_scripts=True, exe_dir="pass_exe", gpu_id=0): |
| tmp_dir = os.path.join(self.log_root, tmp_dir) |
| os.makedirs(tmp_dir, exist_ok=True) |
| exe_dir = os.path.join(self.log_root, exe_dir) |
| |
| os.makedirs(exe_dir, exist_ok=True) |
| logger.info(f"Testing correctness for {filename} in {tmp_dir}") |
| |
| try: |
| log_root = os.path.abspath(os.path.join(tmp_dir, "tmp")) |
| os.makedirs(log_root, exist_ok=True) |
|
|
| |
| exec_root_eval = os.path.abspath(os.path.join(tmp_dir, "exec_eval")) |
| os.makedirs(exec_root_eval, exist_ok=True) |
| |
| call_status, exec_status, stdout, stderr = self.evaluator(code, log_root, exec_root_eval, filename, opname=opname, atol=1e-2, rtol=1e-2, custom_tests_path=self.py_folder, gpu_id=gpu_id) |
| |
| if exec_status and save_scripts: |
| |
| src_file = os.path.join(exec_root_eval, opname) |
| dst_file = os.path.join(exe_dir, opname) |
| if os.path.exists(src_file): |
| shutil.copy(src_file, dst_file) |
| |
| |
| return bool(call_status), bool(exec_status), stdout, stderr, stdout, stderr |
|
|
| except Exception as e: |
| logger.error(f"Exception during correctness test for {filename}: {e}") |
| return False, False, None, str(e), None, str(e) |
| finally: |
| if os.path.exists(log_root): |
| shutil.rmtree(log_root, ignore_errors=True) |
| if os.path.exists(exec_root_eval): |
| shutil.rmtree(exec_root_eval, ignore_errors=True) |
|
|
| def run_perf_evaluation(self, exec_folder, gen_perf_folder, gpu_id=0): |
| """ |
| Runs the performance evaluation for ROCm using the tb_eval module. |
| |
| Args: |
| exec_folder (str): The directory containing the correctly executed scripts. |
| gen_perf_folder (str): The directory where performance JSON results will be stored. |
| |
| Returns: |
| dict: A dictionary containing performance results, mapping filename to metrics. |
| """ |
| logger.info(f"Starting ROCm performance evaluation for kernels in: {exec_folder}") |
| try: |
| |
| |
| |
| |
| try: |
| perf_results = self.perf_evaluator(exec_folder,gpu_id=gpu_id) |
| except Exception as e: |
| logger.error(f"Performance evaluation failed: {e}") |
| return {} |
| logger.success("ROCm performance evaluation completed successfully.") |
| return perf_results |
| except Exception as e: |
| logger.error(f"ROCm performance evaluation failed: {e}") |
| return {} |
|
|