| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """ |
| Utilities for PEFT benchmarking. |
| """ |
|
|
| import datetime |
| import json |
| import os |
| import platform |
| import subprocess |
| from dataclasses import asdict, dataclass, field |
| from enum import Enum |
| from typing import Any, Callable, Optional |
|
|
| import psutil |
| import torch |
|
|
| from peft.utils import infer_device |
|
|
|
|
| FILE_NAME_BENCHMARK_PARAMS = "benchmark_params.json" |
| FILE_NAME_DEFAULT_CONFIG = "default_benchmark_params.json" |
|
|
| RESULT_PATH = os.path.join(os.path.dirname(__file__), "results") |
| RESULT_PATH_TEMP = os.path.join(os.path.dirname(__file__), "temporary_results") |
| RESULT_PATH_CANCELLED = os.path.join(os.path.dirname(__file__), "cancelled_results") |
|
|
|
|
| class BenchmarkStatus(Enum): |
| """Status of a benchmark run.""" |
|
|
| SUCCESS = "success" |
| FAILED = "failed" |
| CANCELLED = "cancelled" |
| RUNNING = "running" |
|
|
|
|
| @dataclass |
| class BenchmarkResult: |
| """Container for benchmark results.""" |
|
|
| experiment_name: str |
| status: BenchmarkStatus |
|
|
| model_id: str |
|
|
| run_info: dict = field(default_factory=dict) |
| generation_info: dict = field(default_factory=dict) |
| meta_info: dict = field(default_factory=dict) |
|
|
| def __post_init__(self): |
| """Initialize structured data format.""" |
| device = infer_device() |
| torch_accelerator_module = getattr(torch, device, torch.cuda) |
| self.run_info = { |
| "timestamp": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(), |
| "duration": 0.0, |
| "status": self.status.value, |
| "hardware": { |
| "num_accelerators": torch_accelerator_module.device_count() if torch_accelerator_module.is_available() else 0, |
| "accelerator_type": torch_accelerator_module.get_device_name(0) if torch_accelerator_module.is_available() else "N/A", |
| "cuda_version": torch.version.cuda if torch.cuda.is_available() else "N/A", |
| "pytorch_version": torch.__version__, |
| }, |
| } |
|
|
| self.meta_info = { |
| "model_id": self.model_id, |
| "parameters": { |
| "base_params": 0, |
| "trainable_params": 0, |
| "total_params": 0, |
| "param_ratio": 0.0, |
| }, |
| "model_size": { |
| "base_model_size_mb": 0.0, |
| "adapter_size_mb": 0.0, |
| }, |
| "package_info": { |
| "transformers-version": None, |
| "transformers-commit-hash": None, |
| "peft-version": None, |
| "peft-commit-hash": None, |
| "datasets-version": None, |
| "datasets-commit-hash": None, |
| "bitsandbytes-version": None, |
| "bitsandbytes-commit-hash": None, |
| "torch-version": torch.__version__, |
| "torch-commit-hash": None, |
| }, |
| "system_info": { |
| "system": platform.system(), |
| "release": platform.release(), |
| "version": platform.version(), |
| "machine": platform.machine(), |
| "processor": platform.processor(), |
| "accelerator": torch_accelerator_module.get_device_name(0) if torch_accelerator_module.is_available() else "N/A", |
| }, |
| } |
|
|
| self.generation_info = { |
| "memory": { |
| "peak_accelerator_memory_mb": 0.0, |
| "peak_ram_memory_mb": 0.0, |
| "memory_logs": [], |
| }, |
| "by_category": {}, |
| "overall": {}, |
| } |
|
|
| def update_meta_info(self, param_counts: dict, size_info: dict, package_info: Optional[dict] = None): |
| """Update model metadata information.""" |
| self.meta_info["parameters"].update(param_counts) |
| self.meta_info["model_size"].update(size_info) |
| if package_info: |
| self.meta_info["package_info"].update(package_info) |
|
|
| def update_generation_info(self, memory_data: Optional[dict] = None, performance_metrics: Optional[dict] = None): |
| """Update generation performance information, primarily for memory and high-level performance.""" |
| if memory_data: |
| self.generation_info["memory"].update(memory_data) |
| if performance_metrics: |
| self.generation_info.update(performance_metrics) |
|
|
| def add_memory_log(self, stage: str, ram_mb: float, accelerator_allocated_mb: float, accelerator_reserved_mb: float): |
| """Add a memory usage log entry to generation_info.""" |
| self.generation_info["memory"]["memory_logs"].append( |
| { |
| "stage": stage, |
| "ram_mb": ram_mb, |
| "accelerator_allocated_mb": accelerator_allocated_mb, |
| "accelerator_reserved_mb": accelerator_reserved_mb, |
| } |
| ) |
|
|
| def add_metrics_for_category(self, category: str, metrics: dict, individual_samples: list = None): |
| """Add metrics for a specific prompt category under generation_info.""" |
| category_data = {"metrics": metrics, "samples": individual_samples if individual_samples is not None else []} |
| self.generation_info["by_category"][category] = category_data |
|
|
| def update_run_info( |
| self, |
| duration: float, |
| status: BenchmarkStatus, |
| error: Optional[str] = None, |
| peft_config: Optional[dict] = None, |
| benchmark_config: Optional[dict] = None, |
| ): |
| """Update run information.""" |
| self.run_info["duration"] = duration |
| self.run_info["status"] = status.value |
| if error: |
| self.run_info["error"] = error |
| if peft_config: |
| self.run_info["peft_config"] = peft_config |
| if benchmark_config: |
| self.run_info["benchmark_config"] = benchmark_config |
|
|
| def compute_overall_metrics(self): |
| """Compute overall metrics across all categories within generation_info.""" |
| if not self.generation_info["by_category"]: |
| return |
|
|
| categories = self.generation_info["by_category"] |
| key_metrics = [ |
| "inference_time", |
| "base_inference_time", |
| "inference_overhead_pct", |
| "time_per_token", |
| "generated_tokens", |
| ] |
|
|
| for metric in key_metrics: |
| values = [] |
| for category_data in categories.values(): |
| if "metrics" in category_data and metric in category_data["metrics"]: |
| values.append(category_data["metrics"][metric]) |
|
|
| if values: |
| self.generation_info["overall"][metric] = sum(values) / len(values) |
|
|
| def to_dict(self) -> dict[str, Any]: |
| """Convert result to dictionary.""" |
| self.compute_overall_metrics() |
| return { |
| "run_info": self.run_info, |
| "generation_info": self.generation_info, |
| "meta_info": self.meta_info, |
| } |
|
|
| def save(self, path: Optional[str] = None): |
| """Save result to JSON file.""" |
| if path is None: |
| peft_branch = get_peft_branch() |
| if self.status == BenchmarkStatus.CANCELLED: |
| base_path = RESULT_PATH_CANCELLED |
| elif peft_branch != "main": |
| base_path = RESULT_PATH_TEMP |
| elif self.status == BenchmarkStatus.SUCCESS: |
| base_path = RESULT_PATH |
| elif self.status == BenchmarkStatus.FAILED: |
| base_path = RESULT_PATH_CANCELLED |
| else: |
| base_path = RESULT_PATH_TEMP |
|
|
| filename = f"{self.experiment_name}.json" |
| path = os.path.join(base_path, filename) |
|
|
| os.makedirs(os.path.dirname(path), exist_ok=True) |
|
|
| with open(path, "w") as f: |
| json.dump(self.to_dict(), f, indent=2) |
|
|
| return path |
|
|
|
|
| @dataclass |
| class BenchmarkConfig: |
| """Configuration for benchmarking PEFT methods.""" |
|
|
| model_id: str |
|
|
| seed: int |
| num_inference_runs: int |
| max_new_tokens: int |
|
|
| dtype: str = "float16" |
| use_4bit: bool = False |
| use_8bit: bool = False |
|
|
| category_generation_params: Optional[dict] = None |
|
|
| def __post_init__(self) -> None: |
| """Validate configuration.""" |
| if not isinstance(self.model_id, str): |
| raise ValueError(f"Invalid model_id: {self.model_id}") |
|
|
| if self.seed < 0: |
| raise ValueError(f"Invalid seed: {self.seed}") |
|
|
| if self.num_inference_runs <= 0: |
| raise ValueError(f"Invalid num_inference_runs: {self.num_inference_runs}") |
|
|
| if self.max_new_tokens <= 0: |
| raise ValueError(f"Invalid max_new_tokens: {self.max_new_tokens}") |
|
|
| @classmethod |
| def from_dict(cls, config_dict: dict) -> "BenchmarkConfig": |
| """Create config from dictionary.""" |
| valid_keys = set(cls.__dataclass_fields__.keys()) |
| filtered_dict = {k: v for k, v in config_dict.items() if k in valid_keys} |
|
|
| return cls(**filtered_dict) |
|
|
| @classmethod |
| def from_json(cls, json_path: str) -> "BenchmarkConfig": |
| """Load config from JSON file.""" |
| with open(json_path) as f: |
| config_dict = json.load(f) |
| return cls.from_dict(config_dict) |
|
|
| def to_dict(self) -> dict[str, Any]: |
| """Convert config to dictionary.""" |
| result = asdict(self) |
| return result |
|
|
| def save(self, path: str) -> None: |
| """Save config to JSON file.""" |
| with open(path, "w") as f: |
| json.dump(self.to_dict(), f, indent=2) |
|
|
| def merge_from_dict(self, config_dict: dict) -> None: |
| """Merge settings from a dictionary into this config object. |
| Keys in config_dict will override existing attributes. |
| """ |
| for key, value in config_dict.items(): |
| if hasattr(self, key): |
| setattr(self, key, value) |
|
|
|
|
| def validate_experiment_path(path: str) -> tuple[str, "BenchmarkConfig"]: |
| """Validate experiment path, load and merge configs, and return them.""" |
| if not os.path.exists(path): |
| raise FileNotFoundError(f"Experiment path not found: {path}") |
|
|
| path_parts = os.path.normpath(path).split(os.sep) |
|
|
| try: |
| experiments_idx = path_parts.index("experiments") |
| except ValueError: |
| experiment_name = os.path.basename(path.rstrip(os.sep)) |
| else: |
| if experiments_idx + 1 < len(path_parts): |
| method_name = path_parts[experiments_idx + 1] |
| remaining_parts = path_parts[experiments_idx + 2 :] |
| if remaining_parts: |
| remaining_name = "-".join(remaining_parts) |
| experiment_name = f"{method_name}--{remaining_name}" |
| else: |
| experiment_name = method_name |
| else: |
| experiment_name = os.path.basename(path.rstrip(os.sep)) |
|
|
| default_config_path = os.path.join(os.path.dirname(__file__), FILE_NAME_DEFAULT_CONFIG) |
| experiment_benchmark_params_path = os.path.join(path, FILE_NAME_BENCHMARK_PARAMS) |
|
|
| if not os.path.exists(default_config_path): |
| raise FileNotFoundError(f"Default configuration file not found: {default_config_path}. This is required.") |
| benchmark_config = BenchmarkConfig.from_json(default_config_path) |
| print(f"Loaded default configuration from {default_config_path}") |
|
|
| if os.path.exists(experiment_benchmark_params_path): |
| with open(experiment_benchmark_params_path) as f: |
| experiment_specific_params = json.load(f) |
|
|
| benchmark_config.merge_from_dict(experiment_specific_params) |
| print(f"Loaded and merged experiment-specific parameters from {experiment_benchmark_params_path}") |
| else: |
| print(f"No {FILE_NAME_BENCHMARK_PARAMS} found in {path}. Using only default configuration.") |
|
|
| return experiment_name, benchmark_config |
|
|
|
|
| def get_memory_usage() -> tuple[float, float, float]: |
| """Get current memory usage (RAM and accelerator).""" |
| process = psutil.Process(os.getpid()) |
| ram_usage_bytes = process.memory_info().rss |
| ram_usage_mb = ram_usage_bytes / (1024 * 1024) |
|
|
| if torch.cuda.is_available(): |
| accelerator_allocated = torch.cuda.memory_allocated() |
| accelerator_reserved = torch.cuda.memory_reserved() |
| accelerator_allocated_mb = accelerator_allocated / (1024 * 1024) |
| accelerator_reserved_mb = accelerator_reserved / (1024 * 1024) |
| elif torch.xpu.is_available(): |
| accelerator_allocated = torch.xpu.memory_allocated() |
| accelerator_reserved = torch.xpu.memory_reserved() |
| accelerator_allocated_mb = accelerator_allocated / (1024 * 1024) |
| accelerator_reserved_mb = accelerator_reserved / (1024 * 1024) |
| else: |
| accelerator_allocated_mb = 0.0 |
| accelerator_reserved_mb = 0.0 |
|
|
| return ram_usage_mb, accelerator_allocated_mb, accelerator_reserved_mb |
|
|
|
|
| def init_accelerator() -> tuple[float, float]: |
| """Initialize accelerator and return initial memory usage.""" |
| if torch.cuda.is_available(): |
| torch.cuda.init() |
| torch.cuda.empty_cache() |
| _, accelerator_allocated, accelerator_reserved = get_memory_usage() |
| elif torch.xpu.is_available(): |
| torch.xpu.init() |
| torch.xpu.empty_cache() |
| _, accelerator_allocated, accelerator_reserved = get_memory_usage() |
| else: |
| accelerator_allocated = 0.0 |
| accelerator_reserved = 0.0 |
| return accelerator_allocated, accelerator_reserved |
|
|
|
|
| def get_model_size_mb(model: torch.nn.Module, dtype_bytes: int = 4) -> float: |
| """Calculate model size in MB.""" |
| return sum(p.numel() * dtype_bytes for p in model.parameters()) / (1024 * 1024) |
|
|
|
|
| def get_peft_branch() -> str: |
| repo_root = os.path.dirname(__file__) |
| return subprocess.check_output("git rev-parse --abbrev-ref HEAD".split(), cwd=repo_root).decode().strip() |
|
|
|
|
| def log_results( |
| experiment_name: str, |
| benchmark_result: BenchmarkResult, |
| print_fn: Callable = print, |
| ) -> None: |
| """Log benchmark results to console.""" |
| print_fn("\n" + "=" * 50) |
| print_fn(f"Benchmark Results: {experiment_name}") |
| print_fn("=" * 50) |
|
|
| print_fn(f"Status: {benchmark_result.run_info.get('status', 'N/A')}") |
| print_fn(f"Duration: {benchmark_result.run_info.get('duration', 0):.2f} seconds") |
|
|
| if benchmark_result.run_info.get("status") != BenchmarkStatus.SUCCESS.value: |
| print_fn(f"Error: {benchmark_result.run_info.get('error', 'Unknown error')}") |
| print_fn("=" * 50) |
| return |
|
|
| print_fn("\nModel Information:") |
| print_fn(f" Base Model: {benchmark_result.meta_info.get('model_id', 'N/A')}") |
|
|
| print_fn("\nParameter Counts:") |
| params = benchmark_result.meta_info.get("parameters", {}) |
| print_fn(f" Base Parameters: {params.get('base_params', 0):,}") |
| print_fn(f" Trainable Parameters: {params.get('trainable_params', 0):,}") |
| print_fn(f" Parameter Ratio: {params.get('param_ratio', 0):.5%}") |
|
|
| print_fn("\nModel Size:") |
| size_info = benchmark_result.meta_info.get("model_size", {}) |
| print_fn(f" Base Model: {size_info.get('base_model_size_mb', 0):.2f} MB") |
| print_fn(f" Adapter: {size_info.get('adapter_size_mb', 0):.2f} MB") |
|
|
| print_fn("\nMemory Usage (from generation_info):") |
| memory_data = benchmark_result.generation_info.get("memory", {}) |
| print_fn(f" Peak Accelerator Memory: {memory_data.get('peak_accelerator_memory_mb', 0):.2f} MB") |
| print_fn(f" Peak RAM Memory: {memory_data.get('peak_ram_memory_mb', 0):.2f} MB") |
|
|
| print_fn("\nDetailed Metrics (from generation_info.by_category):") |
| if benchmark_result.generation_info.get("by_category"): |
| for category, cat_data in benchmark_result.generation_info["by_category"].items(): |
| print_fn(f" Category: {category}") |
| metrics = cat_data.get("metrics", {}) |
| print_fn(f" Inference Time: {metrics.get('inference_time', 0):.4f} seconds") |
| print_fn(f" Base Inference Time: {metrics.get('base_inference_time', 0):.4f} seconds") |
| print_fn(f" Inference Overhead: {metrics.get('inference_overhead_pct', 0):.2f}%") |
| print_fn(f" Time Per Token: {metrics.get('time_per_token', 0):.6f} seconds/token") |
| print_fn(f" Generated Tokens: {metrics.get('generated_tokens', 0):.1f}") |
|
|
| samples = cat_data.get("samples", []) |
| if samples: |
| print_fn(f" Number of Samples: {len(samples)}") |
| print_fn( |
| f" Average Generated Tokens: {sum(s.get('generated_tokens', 0) for s in samples) / len(samples):.1f}" |
| ) |
| else: |
| print_fn(" No per-category metrics available.") |
|
|
| benchmark_result.compute_overall_metrics() |
|
|
| print_fn("\nOverall Metrics (from generation_info.overall):") |
| overall = benchmark_result.generation_info.get("overall") |
| if overall: |
| print_fn(f" Inference Time: {overall.get('inference_time', 0):.4f} seconds") |
| print_fn(f" Base Inference Time: {overall.get('base_inference_time', 0):.4f} seconds") |
| print_fn(f" Inference Overhead: {overall.get('inference_overhead_pct', 0):.2f}%") |
| print_fn(f" Time Per Token: {overall.get('time_per_token', 0):.6f} seconds/token") |
| print_fn(f" Generated Tokens: {overall.get('generated_tokens', 0):.1f}") |
| else: |
| print_fn(" No overall metrics computed.") |
|
|
| print_fn("\nSaved results to:", benchmark_result.save()) |
| print_fn("=" * 50) |
|
|