PyTorch 推理工程(09):最小可展示推理项目骨架

1. 本节定位

前文分散在概念与示例中;工程上宜收敛为可运行仓库:统一入口、可切换精度与 batch、可记录延迟/吞吐/显存,并保留 Profiler 与导出路径。本篇给出一份小型「PyTorch Inference Lab」骨架,用于把第 01–08 篇中的选项固化为可重复实验。


2. 项目定位:不大,但要完整

这个项目不追求业务复杂,重点是"推理工程味道":

1
2
3
4
5
6
7
8
9
10
11
12
13
项目名称:PyTorch Inference Lab
目标问题:同一个 PyTorch 模型,在不同推理配置下,延迟、吞吐、显存和热点如何变化?

支持的实验维度:
- 精度:FP32 / AMP-FP16 / AMP-BF16
- batch size:1 / 4 / 16 / 64
- 推理模式:eager / torch.compile()
- 分析工具:benchmark / profiler / 显存统计
- 导出:ONNX 导出和验证

暂时不做:
- 分布式多卡、复杂微服务、前端页面
- 复杂大模型 serving 重写

3. 项目目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
pytorch_inference_lab/

├── models/
│ ├── __init__.py
│ ├── mlp.py # MLP 模型定义
│ ├── transformer.py # 简单 Transformer block(阶段 3 使用)
│ └── registry.py # 模型注册表

├── runner/
│ ├── __init__.py
│ ├── infer.py # 推理核心逻辑
│ ├── benchmark.py # benchmark 对比实验
│ ├── profiler_run.py # profiler 分析
│ └── memory_tracker.py # 显存统计

├── export/
│ ├── onnx_export.py # ONNX 导出和验证
│ └── compile_check.py # torch.compile 检查

├── results/
│ ├── benchmarks/ # benchmark 结果(JSON/CSV)
│ └── profiler/ # profiler trace 文件

├── app/
│ └── api.py # 最小 FastAPI 推理服务(阶段 4)

├── main.py # 统一入口
├── requirements.txt
└── README.md

4. 阶段 1:核心代码骨架

models/mlp.py — 模型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# models/mlp.py
import torch
import torch.nn as nn


class MLP(nn.Module):
"""
多层感知机(用于推理实验)

设计目的:足够简单以验证正确性,足够典型以观察性能差异。
"""
def __init__(self, in_dim: int = 512, hidden_dim: int = 2048, out_dim: int = 512, n_layers: int = 4):
super().__init__()
layers = [nn.Linear(in_dim, hidden_dim), nn.ReLU()]
for _ in range(n_layers - 2):
layers += [nn.Linear(hidden_dim, hidden_dim), nn.ReLU()]
layers.append(nn.Linear(hidden_dim, out_dim))
self.net = nn.Sequential(*layers)

def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.net(x)


class SimpleCNN(nn.Module):
"""
简单卷积网络(用于视觉推理实验)
"""
def __init__(self, in_channels: int = 3, num_classes: int = 10):
super().__init__()
self.features = nn.Sequential(
nn.Conv2d(in_channels, 32, 3, padding=1), nn.ReLU(),
nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(),
nn.AdaptiveAvgPool2d((4, 4)),
)
self.classifier = nn.Sequential(
nn.Linear(64 * 4 * 4, 256), nn.ReLU(),
nn.Linear(256, num_classes),
)

def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.features(x)
x = x.flatten(1)
return self.classifier(x)

models/registry.py — 模型注册表

1
2
3
4
5
6
7
8
9
10
11
12
13
# models/registry.py
import torch.nn as nn
from models.mlp import MLP, SimpleCNN

_REGISTRY: dict[str, type] = {
"mlp": MLP,
"cnn": SimpleCNN,
}

def build_model(name: str, **kwargs) -> nn.Module:
if name not in _REGISTRY:
raise ValueError(f"未知模型 '{name}',可选: {list(_REGISTRY.keys())}")
return _REGISTRY[name](**kwargs)

runner/infer.py — 推理核心逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# runner/infer.py
"""
推理核心逻辑模块。

封装了推理的三个要素:
1. 正确的模型状态(eval + inference_mode)
2. 正确的精度上下文(autocast 或 float32)
3. 正确的设备管理(输入 / 输出在同一 device)
"""
import contextlib
from typing import Optional

import torch
import torch.nn as nn


def make_autocast_ctx(device: str, dtype_str: Optional[str]):
"""
根据 dtype 字符串返回合适的精度上下文管理器。

dtype_str=None → 不启用 autocast(纯 FP32)
dtype_str="fp16" → torch.autocast(float16)
dtype_str="bf16" → torch.autocast(bfloat16)
"""
if dtype_str is None:
return contextlib.nullcontext() # 什么都不做,保持 FP32

dtype_map = {
"fp16": torch.float16,
"bf16": torch.bfloat16,
}
if dtype_str not in dtype_map:
raise ValueError(f"不支持的 dtype: {dtype_str},可选: {list(dtype_map.keys())}")

device_type = "cuda" if "cuda" in device else "cpu"
return torch.autocast(device_type=device_type, dtype=dtype_map[dtype_str])


def run_inference(
model: nn.Module,
x: torch.Tensor,
dtype_str: Optional[str] = None,
) -> torch.Tensor:
"""
单次推理,自动处理精度上下文。

Args:
model: 已搬到目标 device 的模型(eval 状态)
x: 已搬到目标 device 的输入
dtype_str: None / "fp16" / "bf16"

Returns:
推理输出(GPU tensor)
"""
device = str(next(model.parameters()).device)

with torch.inference_mode():
with make_autocast_ctx(device, dtype_str):
return model(x)


def warmup(model: nn.Module, x: torch.Tensor, n: int = 10, dtype_str: Optional[str] = None):
"""预热:消除首次运行的 CUDA 初始化和 kernel 选择开销。"""
with torch.inference_mode():
with make_autocast_ctx(str(next(model.parameters()).device), dtype_str):
for _ in range(n):
model(x)
if "cuda" in str(next(model.parameters()).device):
torch.cuda.synchronize()

runner/benchmark.py — benchmark 对比实验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# runner/benchmark.py
"""
Benchmark 模块:系统对比不同推理配置的延迟、吞吐和显存。

使用 torch.utils.benchmark 而不是手写 for+time.time(),
原因:自动 warmup、自动处理 CUDA 同步、自动统计中位数和 IQR。
"""
import json
import time
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Optional

import torch
import torch.nn as nn
import torch.utils.benchmark as benchmark

from runner.infer import make_autocast_ctx, warmup


@dataclass
class BenchmarkResult:
model_name: str
batch_size: int
dtype: str
compiled: bool
device: str
latency_median_ms: float
latency_iqr_ms: float
throughput_samples_per_s: float
memory_allocated_mb: float
memory_reserved_mb: float


def run_benchmark(
model: nn.Module,
model_name: str,
batch_size: int,
input_shape: tuple, # 单样本的 shape,如 (512,) 或 (3, 32, 32)
dtype_str: Optional[str], # None / "fp16" / "bf16"
device: str,
compiled: bool = False,
min_run_time: float = 2.0, # benchmark 最少运行时间(秒)
) -> BenchmarkResult:

# 构造输入(单批)
x = torch.randn(batch_size, *input_shape, device=device)
device_type = "cuda" if "cuda" in device else "cpu"

# 先 warmup(10 次)
warmup(model, x, n=10, dtype_str=dtype_str)

# 用 benchmark.Timer 测稳态性能
autocast_code = ""
if dtype_str == "fp16":
autocast_code = "with torch.autocast('cuda', torch.float16):\n "
elif dtype_str == "bf16":
autocast_code = "with torch.autocast('cuda', torch.bfloat16):\n "

stmt = f"""
with torch.inference_mode():
{autocast_code}model(x)
"""

timer = benchmark.Timer(
stmt=stmt.strip(),
globals={"model": model, "x": x, "torch": torch},
label=model_name,
sub_label=f"batch={batch_size}, dtype={dtype_str or 'fp32'}, compile={compiled}",
)

result = timer.blocked_autorange(min_run_time=min_run_time)

# 显存统计(在 warmup 之后,benchmark 之后测比较稳定)
mem_allocated = torch.cuda.memory_allocated() / 1024**2 if device_type == "cuda" else 0.0
mem_reserved = torch.cuda.memory_reserved() / 1024**2 if device_type == "cuda" else 0.0

latency_ms = result.median * 1000 # 秒 → 毫秒
throughput = batch_size / result.median # 样本 / 秒

return BenchmarkResult(
model_name=model_name,
batch_size=batch_size,
dtype=dtype_str or "fp32",
compiled=compiled,
device=device,
latency_median_ms=round(latency_ms, 3),
latency_iqr_ms=round(result.iqr * 1000, 3),
throughput_samples_per_s=round(throughput, 1),
memory_allocated_mb=round(mem_allocated, 1),
memory_reserved_mb=round(mem_reserved, 1),
)


def run_full_sweep(
model_fn, # 工厂函数:() → nn.Module
model_name: str,
input_shape: tuple,
device: str,
batch_sizes: list[int] = [1, 4, 16, 64],
dtypes: list[Optional[str]] = [None, "fp16", "bf16"],
try_compile: bool = True,
save_dir: str = "results/benchmarks",
) -> list[BenchmarkResult]:
"""
全矩阵 sweep:遍历所有 batch size × dtype × compile 组合并保存结果。
"""
results = []
Path(save_dir).mkdir(parents=True, exist_ok=True)

for dtype_str in dtypes:
for bs in batch_sizes:
# Eager 版本
model = model_fn().to(device).eval()
r = run_benchmark(model, model_name, bs, input_shape, dtype_str, device, compiled=False)
results.append(r)
print(f" [eager] batch={bs:3d}, dtype={r.dtype:5s}: "
f"latency={r.latency_median_ms:.2f}ms "
f"throughput={r.throughput_samples_per_s:.0f} samp/s "
f"mem_alloc={r.memory_allocated_mb:.0f}MB")

# Compiled 版本(可选)
if try_compile:
try:
model_c = torch.compile(model_fn().to(device).eval())
# compiled 需要多跑几次 warmup(编译触发在首次调用)
warmup(model_c, torch.randn(bs, *input_shape, device=device), n=20, dtype_str=dtype_str)
r_c = run_benchmark(model_c, model_name, bs, input_shape, dtype_str, device, compiled=True)
results.append(r_c)
print(f" [compile] batch={bs:3d}, dtype={r_c.dtype:5s}: "
f"latency={r_c.latency_median_ms:.2f}ms "
f"throughput={r_c.throughput_samples_per_s:.0f} samp/s")
except Exception as e:
print(f" [compile] batch={bs}, dtype={dtype_str}: 编译失败 - {e}")

# 保存结果
output_path = Path(save_dir) / f"{model_name}_benchmark.json"
with open(output_path, "w", encoding="utf-8") as f:
json.dump([asdict(r) for r in results], f, indent=2, ensure_ascii=False)
print(f"\n结果已保存到 {output_path}")

return results

runner/profiler_run.py — profiler 分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# runner/profiler_run.py
"""
Profiler 模块:找到"哪里慢",而不只是"多慢"。

使用策略:
- 先 warmup 几次,再 profile 1~3 次
- 同时记录 CPU 和 CUDA 活动
- record_shapes=True:知道热点 op 处理的是什么 shape
- with_stack=True:知道热点来自代码的哪一行
"""
from pathlib import Path
from typing import Optional

import torch
import torch.nn as nn

from runner.infer import make_autocast_ctx, warmup


def run_profiler(
model: nn.Module,
model_name: str,
x: torch.Tensor,
dtype_str: Optional[str] = None,
n_warmup: int = 5,
n_profile: int = 3,
save_dir: str = "results/profiler",
sort_by: str = "self_cuda_time_total",
row_limit: int = 20,
):
"""
对单次推理运行 profiler,输出热点表并保存 trace。

Args:
model: 已 eval 的模型
model_name: 用于保存文件名
x: 输入 tensor(已在目标 device 上)
dtype_str: None / "fp16" / "bf16"
sort_by: 排序列("self_cuda_time_total" 最常用)
"""
Path(save_dir).mkdir(parents=True, exist_ok=True)
device = str(next(model.parameters()).device)
device_type = "cuda" if "cuda" in device else "cpu"

# 先 warmup
warmup(model, x, n=n_warmup, dtype_str=dtype_str)

activities = [torch.profiler.ProfilerActivity.CPU]
if device_type == "cuda":
activities.append(torch.profiler.ProfilerActivity.CUDA)

with torch.inference_mode():
with torch.profiler.profile(
activities=activities,
record_shapes=True, # 记录 op 的输入 shape
with_stack=True, # 记录触发 op 的代码位置
profile_memory=False, # 开启会增加 profiler 自身开销,按需开启
) as prof:
for _ in range(n_profile):
with make_autocast_ctx(device, dtype_str):
model(x)

# 打印热点表
dtype_label = dtype_str or "fp32"
print(f"\n=== Profiler 热点 | {model_name} | dtype={dtype_label} | batch={x.shape[0]} ===")
print(prof.key_averages().table(sort_by=sort_by, row_limit=row_limit))

# 保存 trace(可在 chrome://tracing 中查看)
trace_path = Path(save_dir) / f"{model_name}_{dtype_label}_trace.json"
prof.export_chrome_trace(str(trace_path))
print(f"Chrome trace 已保存到 {trace_path}")
print("→ 打开 chrome://tracing,拖入此文件即可查看 CPU/GPU 时间线。\n")

return prof

runner/memory_tracker.py — 显存统计工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# runner/memory_tracker.py
"""
显存统计工具。

区分两个关键指标:
allocated:当前活跃 Tensor 真正占用的显存
reserved :PyTorch caching allocator 保留的总显存
"""
import torch


def get_memory_stats(device: str = "cuda") -> dict:
"""返回当前显存状态(MB)。"""
if not torch.cuda.is_available():
return {"allocated_mb": 0, "reserved_mb": 0, "note": "无 CUDA 设备"}
return {
"allocated_mb": round(torch.cuda.memory_allocated() / 1024**2, 1),
"reserved_mb": round(torch.cuda.memory_reserved() / 1024**2, 1),
}


def print_memory_stats(label: str = ""):
"""打印当前显存状态。"""
stats = get_memory_stats()
prefix = f"[{label}] " if label else ""
print(f"{prefix}显存 allocated={stats['allocated_mb']:.1f}MB reserved={stats['reserved_mb']:.1f}MB")


class MemoryTracker:
"""
上下文管理器:追踪一段代码前后的显存变化。

使用示例:
with MemoryTracker("model forward") as mt:
y = model(x)
mt.report() # 打印显存变化
"""
def __init__(self, label: str = ""):
self.label = label
self.before = {}
self.after = {}

def __enter__(self):
if torch.cuda.is_available():
torch.cuda.synchronize()
self.before = get_memory_stats()
return self

def __exit__(self, *args):
if torch.cuda.is_available():
torch.cuda.synchronize()
self.after = get_memory_stats()

def report(self):
delta_alloc = self.after["allocated_mb"] - self.before["allocated_mb"]
delta_reserv = self.after["reserved_mb"] - self.before["reserved_mb"]
print(f"[{self.label}] 显存变化: allocated {delta_alloc:+.1f}MB reserved {delta_reserv:+.1f}MB")
print(f" after: allocated={self.after['allocated_mb']:.1f}MB reserved={self.after['reserved_mb']:.1f}MB")

export/onnx_export.py — ONNX 导出和验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# export/onnx_export.py
"""
ONNX 导出模块。

流程:
1. 用 dynamo=True(基于 torch.export)导出到 ONNX
2. 用 onnxruntime 验证导出结果和 PyTorch 输出的误差
"""
from pathlib import Path
from typing import Optional

import numpy as np
import torch
import torch.nn as nn


def export_onnx(
model: nn.Module,
input_shape: tuple,
output_path: str = "results/model.onnx",
dynamic_batch: bool = True,
) -> str:
"""
导出 ONNX 文件。

Args:
model: 已 eval 的模型(CPU 上效果最稳定)
input_shape: 单样本 shape,如 (512,)
output_path: 输出文件路径
dynamic_batch: 是否声明 batch 维度为动态
"""
Path(output_path).parent.mkdir(parents=True, exist_ok=True)

# 导出前把模型移到 CPU(dynamo 导出在 CPU 更稳定)
model_cpu = model.cpu().eval()
x = torch.randn(1, *input_shape)

dynamic_shapes = None
if dynamic_batch:
from torch.export import Dim
batch_dim = Dim("batch", min=1, max=256)
dynamic_shapes = {"x": {0: batch_dim}}

print(f"正在导出 ONNX(dynamo=True, dynamic_batch={dynamic_batch})...")
torch.onnx.export(
model_cpu,
(x,),
output_path,
input_names=["x"],
output_names=["output"],
dynamo=True,
dynamic_shapes=dynamic_shapes,
)
print(f"ONNX 已导出到 {output_path}")
return output_path


def verify_onnx(
model: nn.Module,
onnx_path: str,
input_shape: tuple,
batch_sizes: list[int] = [1, 4, 16],
atol: float = 1e-4,
):
"""
验证 ONNX 输出和 PyTorch 输出的误差。
需要安装 onnxruntime:pip install onnxruntime
"""
try:
import onnxruntime as ort
except ImportError:
print("请先安装 onnxruntime:pip install onnxruntime")
return

sess = ort.InferenceSession(onnx_path, providers=["CPUExecutionProvider"])
model_cpu = model.cpu().eval()

print(f"\n验证 ONNX({onnx_path}):")
all_pass = True

for bs in batch_sizes:
x_np = np.random.randn(bs, *input_shape).astype(np.float32)

# ONNX Runtime 推理
onnx_out = sess.run(None, {"x": x_np})[0]

# PyTorch 推理
with torch.inference_mode():
pt_out = model_cpu(torch.tensor(x_np)).numpy()

max_err = abs(onnx_out - pt_out).max()
status = "PASS" if max_err < atol else "FAIL"
print(f" batch={bs:3d}: 最大误差={max_err:.2e} {status}")

if max_err >= atol:
all_pass = False

print(f"\n总体验证: {'全部通过' if all_pass else '存在失败项'}")

main.py — 统一入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# main.py
"""
统一命令行入口。

使用示例:
# 运行单次推理
python main.py --mode infer --model mlp --batch-size 32 --device cuda

# 运行完整 benchmark sweep
python main.py --mode benchmark --model mlp --device cuda

# 运行 profiler
python main.py --mode profile --model mlp --batch-size 32 --dtype fp16 --device cuda

# 导出 ONNX
python main.py --mode export-onnx --model mlp
"""
import argparse
import torch

from models.registry import build_model
from runner.infer import run_inference, warmup
from runner.benchmark import run_full_sweep
from runner.profiler_run import run_profiler
from runner.memory_tracker import print_memory_stats, MemoryTracker
from export.onnx_export import export_onnx, verify_onnx


# ─────── 模型配置表 ──────────────────────────────────────────────────────────
MODEL_CONFIGS = {
"mlp": {
"model_kwargs": {"in_dim": 512, "hidden_dim": 2048, "out_dim": 512, "n_layers": 4},
"input_shape": (512,),
},
"cnn": {
"model_kwargs": {"in_channels": 3, "num_classes": 10},
"input_shape": (3, 32, 32),
},
}
# ─────────────────────────────────────────────────────────────────────────────


def parse_args():
p = argparse.ArgumentParser(description="PyTorch Inference Lab")
p.add_argument("--mode", choices=["infer", "benchmark", "profile", "export-onnx"], default="infer")
p.add_argument("--model", choices=list(MODEL_CONFIGS.keys()), default="mlp")
p.add_argument("--device", default="cuda" if torch.cuda.is_available() else "cpu")
p.add_argument("--batch-size", type=int, default=32)
p.add_argument("--dtype", choices=["fp32", "fp16", "bf16"], default="fp32")
p.add_argument("--compile", action="store_true", help="启用 torch.compile()")
return p.parse_args()


def main():
args = parse_args()
cfg = MODEL_CONFIGS[args.model]
dtype_str = None if args.dtype == "fp32" else args.dtype

print(f"\n{'='*60}")
print(f" PyTorch Inference Lab")
print(f" model={args.model} mode={args.mode} device={args.device}")
print(f" batch={args.batch_size} dtype={args.dtype} compile={args.compile}")
print(f"{'='*60}\n")

# ─── 模式 1:单次推理 ─────────────────────────────────────────────────────
if args.mode == "infer":
model = build_model(args.model, **cfg["model_kwargs"]).to(args.device).eval()
if args.compile:
model = torch.compile(model)

x = torch.randn(args.batch_size, *cfg["input_shape"], device=args.device)

print_memory_stats("推理前")

with MemoryTracker("推理") as mt:
warmup(model, x, n=5, dtype_str=dtype_str)
y = run_inference(model, x, dtype_str=dtype_str)

mt.report()
print(f"\n输出 shape: {y.shape} dtype: {y.dtype} device: {y.device}")

# ─── 模式 2:完整 benchmark sweep ─────────────────────────────────────────
elif args.mode == "benchmark":
print(f"开始 benchmark sweep(model={args.model}, device={args.device})...\n")

results = run_full_sweep(
model_fn=lambda: build_model(args.model, **cfg["model_kwargs"]),
model_name=args.model,
input_shape=cfg["input_shape"],
device=args.device,
batch_sizes=[1, 4, 16, 64],
dtypes=[None, "fp16"],
try_compile=True,
)

# 打印汇总表
print(f"\n{'─'*80}")
print(f"{'dtype':>6} {'batch':>6} {'compiled':>8} {'latency(ms)':>12} {'throughput':>14} {'mem_alloc':>10}")
print(f"{'─'*80}")
for r in results:
print(f"{r.dtype:>6} {r.batch_size:>6} {str(r.compiled):>8} "
f"{r.latency_median_ms:>10.2f}ms "
f"{r.throughput_samples_per_s:>12.0f}/s "
f"{r.memory_allocated_mb:>8.0f}MB")

# ─── 模式 3:profiler ─────────────────────────────────────────────────────
elif args.mode == "profile":
model = build_model(args.model, **cfg["model_kwargs"]).to(args.device).eval()
x = torch.randn(args.batch_size, *cfg["input_shape"], device=args.device)

run_profiler(
model=model,
model_name=args.model,
x=x,
dtype_str=dtype_str,
)

# ─── 模式 4:ONNX 导出 ───────────────────────────────────────────────────
elif args.mode == "export-onnx":
model = build_model(args.model, **cfg["model_kwargs"]).eval()
onnx_path = f"results/{args.model}.onnx"

path = export_onnx(model, cfg["input_shape"], onnx_path)
verify_onnx(model, path, cfg["input_shape"])


if __name__ == "__main__":
main()

requirements.txt

1
2
3
4
5
6
7
torch>=2.1.0
onnx>=1.14.0
onnxruntime>=1.16.0
fastapi>=0.100.0
uvicorn>=0.23.0
pyyaml>=6.0
numpy>=1.24.0

5. 阶段 4:最小 FastAPI 推理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# app/api.py
"""
最小 FastAPI 推理服务。

接口:
GET /health → 健康检查
POST /infer → 单次推理
GET /benchmark/quick → 快速 benchmark(固定配置)

启动方式:
uvicorn app.api:app --host 0.0.0.0 --port 8000
"""
import time
from typing import Optional

import torch
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel

from models.registry import build_model
from runner.infer import run_inference, warmup
from runner.memory_tracker import get_memory_stats

app = FastAPI(title="PyTorch Inference Lab API")

# 全局模型(服务启动时只加载一次)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
_MODEL = build_model("mlp", in_dim=512, hidden_dim=2048, out_dim=512).to(DEVICE).eval()
warmup(_MODEL, torch.randn(1, 512, device=DEVICE), n=3)


class InferRequest(BaseModel):
batch_size: int = 1
input_dim: int = 512
dtype: Optional[str] = None # None / "fp16" / "bf16"


class InferResponse(BaseModel):
output_shape: list[int]
latency_ms: float
device: str
dtype_used: str


@app.get("/health")
def health():
"""健康检查接口。"""
return {
"status": "ok",
"device": DEVICE,
"cuda_available": torch.cuda.is_available(),
"memory": get_memory_stats(DEVICE),
}


@app.post("/infer", response_model=InferResponse)
def infer(req: InferRequest):
"""单次推理接口。"""
x = torch.randn(req.batch_size, req.input_dim, device=DEVICE)

# 正确计时(同步后打点)
if DEVICE == "cuda":
torch.cuda.synchronize()
t0 = time.time()

y = run_inference(_MODEL, x, dtype_str=req.dtype)

if DEVICE == "cuda":
torch.cuda.synchronize()
latency_ms = (time.time() - t0) * 1000

return InferResponse(
output_shape=list(y.shape),
latency_ms=round(latency_ms, 3),
device=DEVICE,
dtype_used=req.dtype or "fp32",
)


@app.get("/benchmark/quick")
def quick_benchmark():
"""快速 benchmark:对比 batch=1 和 batch=64 的吞吐。"""
import torch.utils.benchmark as bm

results = {}
for bs in [1, 16, 64]:
x = torch.randn(bs, 512, device=DEVICE)
warmup(_MODEL, x, n=5)

t = bm.Timer(
stmt="with torch.inference_mode(): model(x)",
globals={"model": _MODEL, "x": x, "torch": torch},
)
r = t.blocked_autorange(min_run_time=1.0)
results[f"batch_{bs}"] = {
"latency_ms": round(r.median * 1000, 2),
"throughput_samples_s": round(bs / r.median, 0),
}

return results


if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

6. 完整运行流程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 安装依赖
pip install -r requirements.txt

# 阶段 1:验证推理正确
python main.py --mode infer --model mlp --batch-size 32 --device cuda

# 阶段 2:运行完整 benchmark sweep
python main.py --mode benchmark --model mlp --device cuda
# → 结果保存到 results/benchmarks/mlp_benchmark.json

# 阶段 3:profiler 分析
python main.py --mode profile --model mlp --batch-size 64 --dtype fp16 --device cuda
# → trace 保存到 results/profiler/mlp_fp16_trace.json
# → 在 chrome://tracing 里打开 trace 文件,查看时间线

# 阶段 4(可选):ONNX 导出和验证
python main.py --mode export-onnx --model mlp
# → 导出到 results/mlp.onnx,并自动验证输出误差

# 阶段 4(可选):启动 FastAPI 服务
uvicorn app.api:app --host 0.0.0.0 --port 8000
# → 访问 http://localhost:8000/docs 查看 API 文档

7. 如何阅读 benchmark 汇总

run_full_sweep 完成后,终端可能呈现类似下表:

1
2
3
4
5
6
7
8
dtype  batch   compiled  latency(ms)     throughput  mem_alloc
──────────────────────────────────────────────────────────────
fp32 1 False 0.21ms 4,761 samp/s 128MB
fp32 64 False 1.85ms 34,594 samp/s 128MB
fp16 1 False 0.18ms 5,556 samp/s 64MB
fp16 64 False 0.95ms 67,368 samp/s 64MB
fp16 1 True 0.12ms 8,333 samp/s 64MB
fp16 64 True 0.78ms 82,051 samp/s 64MB

宜逐项说明:

现象 原因
fp16 比 fp32 快 显存占用减半、Tensor Core 吞吐更高
大 batch 吞吐高但延迟也高 GPU 利用率提升抵消了每样本平均延迟
compile 在大 batch 收益更大 kernel fusion 在更大矩阵上效果更好
fp16 的 mem_alloc 更低 参数和激活值都更小

这套"现象→原因"的解释能力,就是 AI infra 面试最看重的东西。


8. README 模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# PyTorch Inference Lab

单机 PyTorch 推理性能分析与优化实验平台。

## 项目目标

系统比较不同推理配置下的延迟、吞吐、显存和热点分布:
- 精度:FP32 / AMP-FP16 / AMP-BF16
- batch size:1 / 4 / 16 / 64
- 推理模式:eager / torch.compile()
- 分析:profiler 热点 + Chrome trace 时间线

## 技术栈

- Python 3.10+ / PyTorch 2.x / CUDA
- `torch.profiler`:算子级性能分析
- `torch.utils.benchmark`:稳健的延迟测量
- ONNX Runtime:导出验证
- FastAPI:最小推理服务(可选)

## 快速开始

```bash
pip install -r requirements.txt

# 推理验证
python main.py --mode infer --model mlp --batch-size 32

# 完整实验对比
python main.py --mode benchmark --model mlp

# 热点分析
python main.py --mode profile --model mlp --batch-size 64 --dtype fp16

实验结果样例

dtype batch latency throughput
fp32 64 1.85ms 34,594/s
fp16 64 0.95ms 67,368/s

结论:AMP-FP16 在 batch=64 时吞吐提升 ~2×,显存减少 50%。
Profiler 显示瓶颈在 aten::addmm,kernel fusion 后 (torch.compile) 再提升 15%。

1
2
3
4
5

---

## 9. 简历项目描述(直接可用)

搭建单机 PyTorch 推理实验平台(PyTorch Inference Lab):
• 支持 FP32 / AMP-FP16 / AMP-BF16 精度切换与自动 benchmark 对比实验
• 集成 torch.profiler 热点分析,定位 self_cuda_time_total 最高算子并导出 Chrome trace
• 实现 torch.compile 编译对比实验,分析 kernel fusion 在不同 batch size 下的加速边界
• 完成 ONNX 导出(dynamo=True)并使用 onnxruntime 验证输出误差 < 1e-4
• 使用 FastAPI 封装最小推理服务,提供 /infer 和 /benchmark 接口
• 实验结论:AMP-FP16 + compile 在 batch=64 时吞吐较 FP32 eager 提升约 2.4×,显存减少 50%


---

## 10. 面试时怎么讲这个项目(标准框架)

**背景**:我想系统理解 PyTorch 推理优化,不只是会跑模型,所以做了一个推理实验平台。

**目标**:比较不同推理配置对延迟、吞吐、显存和热点分布的影响,形成可重复的分析流程。

**实现**:从"正确推理闭环"出发,逐步加入 AMP、compile、benchmark、profiler 和 ONNX 导出。每个模块独立可测,结果结构化保存到 JSON。

**关键发现**:(填入本机实测数字)
- AMP-FP16 的吞吐比 FP32 高 X%,显存低 50%
- compile 在大 batch 收益更明显(kernel fusion 效果依赖矩阵大小)
- Profiler 显示热点在 `aten::addmm`,CPU CUDA 时间比约 1:8
- ONNX 导出后输出误差 < 1e-5,验证了精度保留

**收获**:这个项目让我把 PyTorch API 转化成了推理系统思维,也更理解 KV Cache、batching 和 runtime 的设计动机。

---

## 11. 本节要点与自检

- 能创建并运行整个项目骨架(直接用本节代码)
- 能解读 benchmark 结果表里的每一行数字,并说出原因
- 能跑 profiler 并找到 `self_cuda_time_total` 最高的热点
- 能成功导出 ONNX 并验证误差
- 会启动 FastAPI 服务并调用 `/infer` 接口
- 能用可核对的数据与术语描述本项目(对外文档或自述)

---

## 12. 小结

> 本仓库的价值在于将前文各主题收敛为可运行、可测量、可导出、可复述的最小闭环;上述代码需在本地执行后,以实测数字与日志作为讨论依据。
---

## 系列导航

- 上一篇:[PyTorch推理工程:08 批处理、KV Cache 与 Serving 视角](/posts/fb17598c/)
- 下一篇:[PyTorch推理工程:10 里程碑、简历转化与面试表达](/posts/7ab343d0/)
- [系列索引(00 导读)](/posts/846fbbaf/)