LandmarkDiff / landmarkdiff /benchmark.py
dreamlessx's picture
Update landmarkdiff/benchmark.py to v0.3.2
893c358 verified
"""Inference benchmarking for deployment sizing.
Measures throughput, latency, and memory usage for ControlNet inference
under various configurations (resolution, batch size, denoising steps).
Usage:
from landmarkdiff.benchmark import InferenceBenchmark
bench = InferenceBenchmark()
bench.add_result("gpu_a6000", latency_ms=142.3, throughput_fps=7.0, vram_gb=4.2)
bench.add_result("gpu_a6000", latency_ms=138.1, throughput_fps=7.2, vram_gb=4.2)
print(bench.summary())
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
@dataclass
class BenchmarkResult:
"""A single benchmark measurement."""
config_name: str
latency_ms: float
throughput_fps: float = 0.0
vram_gb: float = 0.0
batch_size: int = 1
resolution: int = 512
num_inference_steps: int = 20
device: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
class InferenceBenchmark:
"""Collect and analyze inference benchmarks.
Args:
model_name: Name of the model being benchmarked.
"""
def __init__(self, model_name: str = "LandmarkDiff-ControlNet") -> None:
self.model_name = model_name
self.results: list[BenchmarkResult] = []
def add_result(
self,
config_name: str,
latency_ms: float,
throughput_fps: float = 0.0,
vram_gb: float = 0.0,
batch_size: int = 1,
resolution: int = 512,
num_inference_steps: int = 20,
device: str = "",
**metadata: Any,
) -> None:
"""Add a benchmark result."""
if throughput_fps == 0.0 and latency_ms > 0:
throughput_fps = 1000.0 / latency_ms * batch_size
self.results.append(BenchmarkResult(
config_name=config_name,
latency_ms=latency_ms,
throughput_fps=throughput_fps,
vram_gb=vram_gb,
batch_size=batch_size,
resolution=resolution,
num_inference_steps=num_inference_steps,
device=device,
metadata=metadata,
))
def mean_latency(self, config_name: str | None = None) -> float:
"""Mean latency in ms, optionally filtered by config."""
results = self._filter(config_name)
if not results:
return float("nan")
return sum(r.latency_ms for r in results) / len(results)
def p99_latency(self, config_name: str | None = None) -> float:
"""P99 latency in ms."""
results = self._filter(config_name)
if not results:
return float("nan")
import math
sorted_latencies = sorted(r.latency_ms for r in results)
idx = min(len(sorted_latencies) - 1, math.ceil(len(sorted_latencies) * 0.99) - 1)
return sorted_latencies[idx]
def mean_throughput(self, config_name: str | None = None) -> float:
"""Mean throughput in FPS."""
results = self._filter(config_name)
if not results:
return float("nan")
return sum(r.throughput_fps for r in results) / len(results)
def max_vram(self, config_name: str | None = None) -> float:
"""Maximum VRAM usage in GB."""
results = self._filter(config_name)
if not results:
return 0.0
return max(r.vram_gb for r in results)
def _filter(self, config_name: str | None) -> list[BenchmarkResult]:
if config_name is None:
return self.results
return [r for r in self.results if r.config_name == config_name]
@property
def config_names(self) -> list[str]:
"""Unique config names in order."""
seen: dict[str, None] = {}
for r in self.results:
seen.setdefault(r.config_name, None)
return list(seen.keys())
def summary(self) -> str:
"""Generate text summary table."""
configs = self.config_names
if not configs:
return "No benchmark results."
header = f"{'Config':>20s} | {'Mean(ms)':>10s} | {'P99(ms)':>10s} | {'FPS':>8s} | {'VRAM(GB)':>8s} | {'N':>4s}"
lines = [
f"Inference Benchmark: {self.model_name}",
header,
"-" * len(header),
]
for cfg in configs:
results = self._filter(cfg)
lines.append(
f"{cfg:>20s} | "
f"{self.mean_latency(cfg):>10.1f} | "
f"{self.p99_latency(cfg):>10.1f} | "
f"{self.mean_throughput(cfg):>8.2f} | "
f"{self.max_vram(cfg):>8.1f} | "
f"{len(results):>4d}"
)
return "\n".join(lines)
def to_json(self, path: str | Path | None = None) -> str:
"""Export results as JSON."""
data = {
"model_name": self.model_name,
"results": [
{
"config_name": r.config_name,
"latency_ms": r.latency_ms,
"throughput_fps": round(r.throughput_fps, 2),
"vram_gb": r.vram_gb,
"batch_size": r.batch_size,
"resolution": r.resolution,
"num_inference_steps": r.num_inference_steps,
"device": r.device,
}
for r in self.results
],
"summary": {
cfg: {
"mean_latency_ms": round(self.mean_latency(cfg), 1),
"p99_latency_ms": round(self.p99_latency(cfg), 1),
"mean_fps": round(self.mean_throughput(cfg), 2),
"max_vram_gb": round(self.max_vram(cfg), 1),
"n_samples": len(self._filter(cfg)),
}
for cfg in self.config_names
},
}
j = json.dumps(data, indent=2)
if path:
Path(path).parent.mkdir(parents=True, exist_ok=True)
Path(path).write_text(j)
return j
class Timer:
"""Simple context manager for timing code blocks.
Usage:
with Timer() as t:
run_inference()
print(f"Took {t.elapsed_ms:.1f} ms")
"""
def __init__(self) -> None:
self.start_time: float = 0.0
self.end_time: float = 0.0
@property
def elapsed_ms(self) -> float:
return (self.end_time - self.start_time) * 1000
@property
def elapsed_s(self) -> float:
return self.end_time - self.start_time
def __enter__(self) -> Timer:
self.start_time = time.perf_counter()
return self
def __exit__(self, *args: Any) -> None:
self.end_time = time.perf_counter()