很多人第一次接触大模型流式输出时,会把它理解成一个前端体验优化:不等整段回答生成完,先把前几个 token 发出来,看起来更“丝滑”。这个理解不能说错,但只说到这里,远远不够。

在真实的 LLM serving 系统里,流式返回不是一个 UI 小技巧,而是服务层和引擎层协作方式的一部分。它决定了:

  • 请求何时算“开始有结果”;
  • 服务层如何把后台引擎产生的增量内容及时交给客户端;
  • 断连时系统怎样释放请求资源;
  • 前端该用什么方式消费这个流;
  • 为什么 HTTP 服务层可以和 continuous batching 并存,而不是互相破坏。

这篇文章就围绕一个问题展开:为什么现代大模型服务几乎都会做流式返回,以及这件事在 mini-infer 里是怎么被正确实现的。

核心代码会落在这些文件上:

如果想先抓主线,可以先记住 mini-infer 的流式链路长这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
HTTP POST /v1/chat/completions
|
v
StreamingResponse(_stream_generator(...))
|
v
AsyncEngine.generate_stream_events(...)
|
v
后台 step loop 持续调用 LLMEngine.step()
|
v
token / done event 投递到每个请求自己的 asyncio.Queue
|
v
_stream_generator 把事件翻译成 SSE chunk
|
v
客户端持续读取,直到收到 [DONE]

先把 SSE 讲明白

SSE 全称是 Server-Sent Events。它是一种很朴素的协议思想:

服务端保持 HTTP 连接不立刻关闭,而是持续往这个连接里写一段一段的事件数据,客户端一边读,一边处理。

它和普通一次性 JSON 响应的区别非常直接。

普通 JSON 响应在做什么

普通响应的思路是:

  1. 请求进来;
  2. 服务端把结果全部准备好;
  3. 一次性返回完整 JSON;
  4. 连接结束。

这适合结果很快就能准备好的场景,也适合那些“完整结果比中间过程更重要”的接口。

SSE 在做什么

SSE 的思路则是:

  1. 请求进来;
  2. 服务端开始处理;
  3. 一旦有部分结果,就先发一部分;
  4. 连接保持打开;
  5. 后续内容继续往同一个连接里写;
  6. 最后再明确告诉客户端“已经结束了”。

这和 LLM 的生成过程天然匹配。因为自回归模型本来就是一步一步往后生成,而不是先在内部瞬间得到完整答案。

为什么大模型服务天然适合流式返回

第一,用户真正感知的是首 token 时间

从交互角度看,用户不是只关心“总共多久出完”,还非常关心“多久能看到第一个字”。流式输出能显著改善这种感知。

这也是为什么大家会特别关注 TTFT。它并不等于系统的全部性能指标,但它对产品体验影响极大。

第二,模型生成本来就是增量的

如果底层模型是一 token 一 token 往前推,服务层硬要等全部结束再一次性返回,本质上就是故意把一个天然增量的问题改写成批量结果问题。

第三,流式接口能把服务层和引擎层解耦

服务层不需要自己持有“完整结果”。它只要知道:

  • 后台引擎有没有新的增量事件;
  • 有的话怎么写给客户端;
  • 没有的话是否需要继续等待;
  • 结束时如何清理资源。

这其实是一个非常系统化的问题,不只是“把 yield 用一下”。

SSE 协议到底长什么样

SSE 在协议层并不复杂。最重要的几点只有三个:

1. 响应类型是 text/event-stream

这告诉客户端:接下来不是一个一次性 JSON,而是一个事件流。

2. 每个事件之间用空行分隔

最常见的格式长这样:

1
2
3
4
5
6
data: {"choices":[{"delta":{"content":"你"}}]}

data: {"choices":[{"delta":{"content":"好"}}]}

data: [DONE]

重点不是“每行都以 data: 开头”,而是事件之间靠空行分隔。这意味着客户端读流时,不能简单地把每个网络 chunk 当成一个完整事件。

3. [DONE] 是应用层结束信号

连接断开不一定代表“正常完成”,也可能是网络出错、客户端断连、服务端异常。很多 OpenAI 风格接口都会在最后显式发送 data: [DONE],让客户端明确知道:这次流式输出是在协议意义上完整结束的。

如果把一个完整的 OpenAI 风格流按阶段拆开,通常会经历:

  1. 首 chunk 先声明角色;
  2. 中间 chunk 持续追加增量文本;
  3. 最后一个结构化 chunk 给出 finish_reason
  4. 终止符 [DONE] 让客户端可以安全收尾。

SSE 和 WebSocket 的区别,为什么这里常选 SSE

两者都能做“边发边收”,但它们适合的方向并不一样。

SSE 更适合这种场景

  • 服务端单向推送为主;
  • 语义上仍然是 HTTP 请求/响应;
  • 接口希望和现有 HTTP 网关、认证、监控体系保持一致;
  • 客户端发起一次请求后,主要任务就是持续接收增量结果。

WebSocket 更适合双向高频交互

如果你的场景是浏览器游戏、协同编辑、持续双向信令,那 WebSocket 很自然。但对大模型的 Chat Completions 来说,请求通常是“先提交一次输入,然后不断接收输出”,服务端单向推送占主导地位,SSE 就足够而且更简单。

mini-infer 的流式链路是怎么搭起来的

mini-infer 的实现很值得看,因为它不是简单把生成器塞进 FastAPI,而是把协议层、异步协调层、引擎层分开了。

第一步:路由层决定走流式还是非流式

server.py 里,POST /v1/chat/completions 的逻辑很清楚:

1
2
3
4
5
6
7
if request.stream:
return StreamingResponse(
_stream_generator(engine, prompt, request),
media_type="text/event-stream",
)
else:
return await _non_stream(engine, prompt, request)

这里最重要的不是 StreamingResponse 这个类名,而是它体现的边界:

  • 路由层决定协议行为;
  • _stream_generator() 负责把增量事件转换成 SSE;
  • 引擎层只负责产出 token 事件,不直接关心 HTTP。

第二步:_stream_generator() 负责把事件包装成 OpenAI 风格 chunk

_stream_generator() 的行为非常标准:

  1. 先生成一个 completion_id
  2. 发送首个 role-only chunk;
  3. 持续消费 engine.generate_stream_events(...)
  4. 对每个增量事件构造 ChatCompletionChunk
  5. 最后发送 finish chunk 和 [DONE]

这一步本质上是在做一件事:把引擎内部的事件流,翻译成客户端能理解的协议流。

如果把 mini-infer 的流式输出写成接近真实的原始事件,大概会是这样:

1
2
3
4
5
6
7
8
data: {"id":"chatcmpl-1a2b3c4d","object":"chat.completion.chunk","model":"mini-infer","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}

data: {"id":"chatcmpl-1a2b3c4d","object":"chat.completion.chunk","model":"mini-infer","choices":[{"index":0,"delta":{"content":"SSE 适合 LLM streaming"},"finish_reason":null}]}

data: {"id":"chatcmpl-1a2b3c4d","object":"chat.completion.chunk","model":"mini-infer","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

这段示例里最值得记住的不是字段名,而是顺序:角色声明、内容增量、完成原因、显式结束。

第三个问题:为什么要先发 role-only chunk

这是很多人第一次模仿 OpenAI 流式接口时容易漏掉的细节。

在 OpenAI 风格协议里,首个 chunk 往往先把:

1
{"delta":{"role":"assistant"}}

发出去,而不是立刻发内容。这是为了先把这次增量输出的“身份”确定下来,后面再持续追加 delta.content。这看起来像小细节,实际上是很多客户端默认依赖的行为。

第四个问题:为什么一个 chunk 不等于一个 token

mini-infer 在注释里写得很清楚:一次 step 可能把多个 token 合并到同一个 delta 里。也就是说,客户端不能把“收到了一个 SSE chunk”简单等同于“收到了一个 token”。

这件事很重要,因为很多前端实现会误以为 chunk 边界就是 token 边界,结果在统计、光标动画、增量渲染时都出问题。

真正的上游不是 _stream_generator(),而是 AsyncEngine

如果只看 server.py,很容易以为流式返回的核心只是一个 Python 异步生成器。真正关键的部分其实在 runtime/async_engine.py

AsyncEngine 的职责可以概括成一句话:

它把同步的 LLMEngine 包装成一个异步前门,让多个 HTTP 请求可以共享同一个后台 step loop,同时又能各自拿到自己的增量输出。

它是怎么做到的

核心机制并不复杂,但非常关键:

  • 每个请求分配一个 request_id
  • 每个请求都注册一个自己的 asyncio.Queue
  • 后台线程持续调用 self._engine.step()
  • 新 token 通过 loop.call_soon_threadsafe(queue.put_nowait, item) 投递到对应请求的 queue;
  • 前台的 async generator 用 await queue.get() 逐个消费事件。

如果把这段机制翻译成一句更容易复述的话,就是:后台统一生产事件,前台按请求各自消费事件;客户端看到的是自己的输出流,但 GPU 实际上服务的是共享批次。

这意味着:

  • 所有请求共享一个推理主循环;
  • 每个请求仍然有自己的结果通道;
  • continuous batching 和流式返回可以同时成立。

这就是 mini-infer 服务层设计最漂亮的地方之一。它没有为了流式输出牺牲批处理能力,也没有为了共享引擎把所有请求的输出混在一起。

为什么“每个请求一个 queue”是对的

这个设计看起来朴素,实际上非常合理。

如果没有每请求独立队列,会发生什么

一旦多个请求共享一个输出队列,你就必须额外做复杂的多路复用和分发逻辑。否则不同请求的 token 会混在一起,客户端根本没法正确消费。

独立队列带来的三个好处

  • 请求之间的输出天然隔离;
  • 服务层可以按请求维度处理断连和取消;
  • 流式生成器只需要关心“从自己的 queue 里拿事件”,不需要自己再做二次分发。

这也是为什么 AsyncEngine 这层对理解 LLM serving 很重要。它不是简单的“异步语法糖”,而是在组织请求生命周期。

再往前推一步,这个设计还顺手解决了一个常见冲突:如果你把“一个连接对应一个同步生成过程”写死,服务层就很难再和 continuous batching 共存;而 queue + 后台 step loop 这种设计,天然允许“每个请求看起来是独立流”,但底层执行仍然是共享批处理。

断连处理为什么比“关闭连接”复杂得多

很多流式接口的真正问题,不是在正常路径,而是在异常路径。

想象一个客户端在输出到一半时断开连接。如果服务层只是把 HTTP 连接关掉,而后台引擎仍然继续为这个请求生成 token,就会出现两个后果:

  • GPU 资源继续被无效请求占用;
  • 请求状态和 KV cache 可能迟迟不被释放。

这就是为什么真正成熟的流式服务必须认真处理断连。

mini-infer 是怎么做的

AsyncEngine.generate_stream_events() 里有一个非常关键的 finally

1
2
3
4
finally:
self._token_queues.pop(rid, None)
with self._engine_lock:
self._engine.cancel_request(rid)

它做了两件事:

  1. 把这个请求的输出队列从追踪表里移除;
  2. 通知底层引擎取消该请求。

这不是一个“清理小动作”,而是保证系统不会因为客户端断连而持续浪费算力的关键步骤。

如果把断连后的清理时序拆开看,mini-infer 实际上做的是:

  1. 前台检测到流已经结束或客户端不再消费;
  2. 请求对应的 queue 从 _token_queues 中移除;
  3. 调用 cancel_request(rid) 通知底层引擎停止追踪该请求;
  4. 后续 step loop 不再为这个请求继续投递 token;
  5. 请求相关状态进入清理路径。

后台挂了,前台为什么不能一直等

另一个很容易被忽略的问题是:如果后台 step loop 出了异常,而前台还在 await queue.get(),会发生什么?

mini-infer 对这个问题的处理也很到位:

  • 前台等待 queue 时使用了周期性超时;
  • 超时后会做健康检查;
  • 一旦后台线程异常退出,会给等待中的消费者投递 _DoneEvent("error")

换句话说,前台不会无限期傻等,异常也会沿着事件链路传出来。这对流式服务尤其重要。

前端应该怎么消费这种流

很多文章讲 SSE 时只会提浏览器里的 EventSource,但对 OpenAI 风格的接口来说,这个建议经常不够准确。

EventSource 的限制

EventSource 非常适合传统 SSE 场景,但它天然偏向简单的 GET 请求。如果你的接口像 Chat Completions 这样是 POST 请求、需要自定义 headers、请求体里还有 JSON,那它就不那么顺手了。

为什么 OpenAI 风格接口更常见的是 fetch + ReadableStream

对这类接口来说,更常见的消费方式是:

  1. fetch 发起 POST
  2. 拿到响应体的字节流;
  3. 按 SSE 的事件边界切分;
  4. 解析每个 data: 事件;
  5. 单独处理 [DONE]

mini-inferexamples/openai_client.py 虽然是 Python 实现,但它清楚展示了这套消费逻辑的本质:不断读数据,跳过无关行,碰到 [DONE] 结束,其他行解析为 JSON chunk,再从 delta.content 里取增量文本。

如果以后你要在前端自己写消费逻辑,最稳妥的心智模型是:

  • 先按 SSE 事件边界切分;
  • 再从每个事件里解析 data: 负载;
  • 再把负载分成 [DONE] 和 JSON chunk 两类;
  • 最后才去取 delta.content

前端最容易踩的三个坑

1. 把网络 chunk 当成事件边界

错。网络层怎么切块,不由你控制。你必须按 SSE 协议规定的空行来识别事件边界。

2. 把事件边界当成 token 边界

也错。服务端完全可能在一个事件里合并多个 token。

3. 只靠连接关闭判断结束

不够稳。真正可靠的是识别 [DONE],把它当成应用层完成信号。

这套设计为什么在工程上是成立的

mini-infer 的 SSE 设计拆开看,会发现它的分层非常清楚。

协议层:server.py

负责把请求转成流式 HTTP 响应,并把内部事件翻译成 OpenAI 风格的 SSE chunk。

异步协调层:async_engine.py

负责请求注册、事件投递、后台 step loop、跨线程通信、断连清理和异常传播。

引擎层:LLMEngine

负责真正的 continuous batching、调度和 token 生成。

这三层分开后,每层都能各司其职:

  • 协议层不用知道底层调度细节;
  • 异步协调层不用知道 HTTP 是怎么暴露的;
  • 引擎层不用关心客户端究竟是浏览器、SDK 还是 benchmark。

这就是好设计最重要的特征:边界清楚。

建议和另外两篇一起看

这篇主要解决“流式路径到底怎么走”。如果你想把上下文补齐,建议再看:

写在最后

如果你只把流式返回理解成“把答案一个字一个字打出来”,那你看到的只是表层现象。真正重要的是,这件事背后连接了三套机制:

  • 协议层的 SSE
  • 服务层的 StreamingResponse 和请求生命周期;
  • 引擎层的 AsyncEngine + continuous batching

mini-infer 这套实现之所以值得看,不是因为它“也支持流式输出”,而是因为它把一条真正完整的链路走通了:

  • 客户端发 POST /v1/chat/completions
  • 服务层返回 text/event-stream
  • 后台异步引擎持续生产 token 事件;
  • 断连时及时取消请求;
  • 正常结束时明确发 [DONE]

当你把这条链路讲顺,就会明白:SSE 在大模型服务里从来不是装饰项,它是把“模型正在一步步生成”这件事实,可靠地交付给客户端的那层协议桥梁。