1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
| """ Benchmark 模块:系统对比不同推理配置的延迟、吞吐和显存。
使用 torch.utils.benchmark 而不是手写 for+time.time(), 原因:自动 warmup、自动处理 CUDA 同步、自动统计中位数和 IQR。 """ import json import time from dataclasses import dataclass, asdict from pathlib import Path from typing import Optional
import torch import torch.nn as nn import torch.utils.benchmark as benchmark
from runner.infer import make_autocast_ctx, warmup
@dataclass class BenchmarkResult: model_name: str batch_size: int dtype: str compiled: bool device: str latency_median_ms: float latency_iqr_ms: float throughput_samples_per_s: float memory_allocated_mb: float memory_reserved_mb: float
def run_benchmark( model: nn.Module, model_name: str, batch_size: int, input_shape: tuple, dtype_str: Optional[str], device: str, compiled: bool = False, min_run_time: float = 2.0, ) -> BenchmarkResult:
x = torch.randn(batch_size, *input_shape, device=device) device_type = "cuda" if "cuda" in device else "cpu"
warmup(model, x, n=10, dtype_str=dtype_str)
autocast_code = "" if dtype_str == "fp16": autocast_code = "with torch.autocast('cuda', torch.float16):\n " elif dtype_str == "bf16": autocast_code = "with torch.autocast('cuda', torch.bfloat16):\n "
stmt = f""" with torch.inference_mode(): {autocast_code}model(x) """
timer = benchmark.Timer( stmt=stmt.strip(), globals={"model": model, "x": x, "torch": torch}, label=model_name, sub_label=f"batch={batch_size}, dtype={dtype_str or 'fp32'}, compile={compiled}", )
result = timer.blocked_autorange(min_run_time=min_run_time)
mem_allocated = torch.cuda.memory_allocated() / 1024**2 if device_type == "cuda" else 0.0 mem_reserved = torch.cuda.memory_reserved() / 1024**2 if device_type == "cuda" else 0.0
latency_ms = result.median * 1000 throughput = batch_size / result.median
return BenchmarkResult( model_name=model_name, batch_size=batch_size, dtype=dtype_str or "fp32", compiled=compiled, device=device, latency_median_ms=round(latency_ms, 3), latency_iqr_ms=round(result.iqr * 1000, 3), throughput_samples_per_s=round(throughput, 1), memory_allocated_mb=round(mem_allocated, 1), memory_reserved_mb=round(mem_reserved, 1), )
def run_full_sweep( model_fn, model_name: str, input_shape: tuple, device: str, batch_sizes: list[int] = [1, 4, 16, 64], dtypes: list[Optional[str]] = [None, "fp16", "bf16"], try_compile: bool = True, save_dir: str = "results/benchmarks", ) -> list[BenchmarkResult]: """ 全矩阵 sweep:遍历所有 batch size × dtype × compile 组合并保存结果。 """ results = [] Path(save_dir).mkdir(parents=True, exist_ok=True)
for dtype_str in dtypes: for bs in batch_sizes: model = model_fn().to(device).eval() r = run_benchmark(model, model_name, bs, input_shape, dtype_str, device, compiled=False) results.append(r) print(f" [eager] batch={bs:3d}, dtype={r.dtype:5s}: " f"latency={r.latency_median_ms:.2f}ms " f"throughput={r.throughput_samples_per_s:.0f} samp/s " f"mem_alloc={r.memory_allocated_mb:.0f}MB")
if try_compile: try: model_c = torch.compile(model_fn().to(device).eval()) warmup(model_c, torch.randn(bs, *input_shape, device=device), n=20, dtype_str=dtype_str) r_c = run_benchmark(model_c, model_name, bs, input_shape, dtype_str, device, compiled=True) results.append(r_c) print(f" [compile] batch={bs:3d}, dtype={r_c.dtype:5s}: " f"latency={r_c.latency_median_ms:.2f}ms " f"throughput={r_c.throughput_samples_per_s:.0f} samp/s") except Exception as e: print(f" [compile] batch={bs}, dtype={dtype_str}: 编译失败 - {e}")
output_path = Path(save_dir) / f"{model_name}_benchmark.json" with open(output_path, "w", encoding="utf-8") as f: json.dump([asdict(r) for r in results], f, indent=2, ensure_ascii=False) print(f"\n结果已保存到 {output_path}")
return results
|