为什么大模型服务需要流式返回:结合 mini-infer 讲清 SSE 的协议、实现与断连处理
很多人第一次接触大模型流式输出时,会把它理解成一个前端体验优化:不等整段回答生成完,先把前几个 token 发出来,看起来更“丝滑”。这个理解不能说错,但只说到这里,远远不够。
在真实的 LLM serving 系统里,流式返回不是一个 UI 小技巧,而是服务层和引擎层协作方式的一部分。它决定了:
- 请求何时算“开始有结果”;
- 服务层如何把后台引擎产生的增量内容及时交给客户端;
- 断连时系统怎样释放请求资源;
- 前端该用什么方式消费这个流;
- 为什么 HTTP 服务层可以和 continuous batching 并存,而不是互相破坏。
这篇文章就围绕一个问题展开:为什么现代大模型服务几乎都会做流式返回,以及这件事在 mini-infer 里是怎么被正确实现的。
核心代码会落在这些文件上:
mini_infer/serving/server.pymini_infer/runtime/async_engine.pymini_infer/serving/openai_schema.pyexamples/openai_client.py
如果想先抓主线,可以先记住 mini-infer 的流式链路长这样:
1 | HTTP POST /v1/chat/completions |
先把 SSE 讲明白
SSE 全称是 Server-Sent Events。它是一种很朴素的协议思想:
服务端保持 HTTP 连接不立刻关闭,而是持续往这个连接里写一段一段的事件数据,客户端一边读,一边处理。
它和普通一次性 JSON 响应的区别非常直接。
普通 JSON 响应在做什么
普通响应的思路是:
- 请求进来;
- 服务端把结果全部准备好;
- 一次性返回完整 JSON;
- 连接结束。
这适合结果很快就能准备好的场景,也适合那些“完整结果比中间过程更重要”的接口。
SSE 在做什么
SSE 的思路则是:
- 请求进来;
- 服务端开始处理;
- 一旦有部分结果,就先发一部分;
- 连接保持打开;
- 后续内容继续往同一个连接里写;
- 最后再明确告诉客户端“已经结束了”。
这和 LLM 的生成过程天然匹配。因为自回归模型本来就是一步一步往后生成,而不是先在内部瞬间得到完整答案。
为什么大模型服务天然适合流式返回
第一,用户真正感知的是首 token 时间
从交互角度看,用户不是只关心“总共多久出完”,还非常关心“多久能看到第一个字”。流式输出能显著改善这种感知。
这也是为什么大家会特别关注 TTFT。它并不等于系统的全部性能指标,但它对产品体验影响极大。
第二,模型生成本来就是增量的
如果底层模型是一 token 一 token 往前推,服务层硬要等全部结束再一次性返回,本质上就是故意把一个天然增量的问题改写成批量结果问题。
第三,流式接口能把服务层和引擎层解耦
服务层不需要自己持有“完整结果”。它只要知道:
- 后台引擎有没有新的增量事件;
- 有的话怎么写给客户端;
- 没有的话是否需要继续等待;
- 结束时如何清理资源。
这其实是一个非常系统化的问题,不只是“把 yield 用一下”。
SSE 协议到底长什么样
SSE 在协议层并不复杂。最重要的几点只有三个:
1. 响应类型是 text/event-stream
这告诉客户端:接下来不是一个一次性 JSON,而是一个事件流。
2. 每个事件之间用空行分隔
最常见的格式长这样:
1 | data: {"choices":[{"delta":{"content":"你"}}]} |
重点不是“每行都以 data: 开头”,而是事件之间靠空行分隔。这意味着客户端读流时,不能简单地把每个网络 chunk 当成一个完整事件。
3. [DONE] 是应用层结束信号
连接断开不一定代表“正常完成”,也可能是网络出错、客户端断连、服务端异常。很多 OpenAI 风格接口都会在最后显式发送 data: [DONE],让客户端明确知道:这次流式输出是在协议意义上完整结束的。
如果把一个完整的 OpenAI 风格流按阶段拆开,通常会经历:
- 首 chunk 先声明角色;
- 中间 chunk 持续追加增量文本;
- 最后一个结构化 chunk 给出
finish_reason; - 终止符
[DONE]让客户端可以安全收尾。
SSE 和 WebSocket 的区别,为什么这里常选 SSE
两者都能做“边发边收”,但它们适合的方向并不一样。
SSE 更适合这种场景
- 服务端单向推送为主;
- 语义上仍然是 HTTP 请求/响应;
- 接口希望和现有 HTTP 网关、认证、监控体系保持一致;
- 客户端发起一次请求后,主要任务就是持续接收增量结果。
WebSocket 更适合双向高频交互
如果你的场景是浏览器游戏、协同编辑、持续双向信令,那 WebSocket 很自然。但对大模型的 Chat Completions 来说,请求通常是“先提交一次输入,然后不断接收输出”,服务端单向推送占主导地位,SSE 就足够而且更简单。
mini-infer 的流式链路是怎么搭起来的
mini-infer 的实现很值得看,因为它不是简单把生成器塞进 FastAPI,而是把协议层、异步协调层、引擎层分开了。
第一步:路由层决定走流式还是非流式
在 server.py 里,POST /v1/chat/completions 的逻辑很清楚:
1 | if request.stream: |
这里最重要的不是 StreamingResponse 这个类名,而是它体现的边界:
- 路由层决定协议行为;
_stream_generator()负责把增量事件转换成 SSE;- 引擎层只负责产出 token 事件,不直接关心 HTTP。
第二步:_stream_generator() 负责把事件包装成 OpenAI 风格 chunk
_stream_generator() 的行为非常标准:
- 先生成一个
completion_id; - 发送首个 role-only chunk;
- 持续消费
engine.generate_stream_events(...); - 对每个增量事件构造
ChatCompletionChunk; - 最后发送 finish chunk 和
[DONE]。
这一步本质上是在做一件事:把引擎内部的事件流,翻译成客户端能理解的协议流。
如果把 mini-infer 的流式输出写成接近真实的原始事件,大概会是这样:
1 | data: {"id":"chatcmpl-1a2b3c4d","object":"chat.completion.chunk","model":"mini-infer","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]} |
这段示例里最值得记住的不是字段名,而是顺序:角色声明、内容增量、完成原因、显式结束。
第三个问题:为什么要先发 role-only chunk
这是很多人第一次模仿 OpenAI 流式接口时容易漏掉的细节。
在 OpenAI 风格协议里,首个 chunk 往往先把:
1 | {"delta":{"role":"assistant"}} |
发出去,而不是立刻发内容。这是为了先把这次增量输出的“身份”确定下来,后面再持续追加 delta.content。这看起来像小细节,实际上是很多客户端默认依赖的行为。
第四个问题:为什么一个 chunk 不等于一个 token
mini-infer 在注释里写得很清楚:一次 step 可能把多个 token 合并到同一个 delta 里。也就是说,客户端不能把“收到了一个 SSE chunk”简单等同于“收到了一个 token”。
这件事很重要,因为很多前端实现会误以为 chunk 边界就是 token 边界,结果在统计、光标动画、增量渲染时都出问题。
真正的上游不是 _stream_generator(),而是 AsyncEngine
如果只看 server.py,很容易以为流式返回的核心只是一个 Python 异步生成器。真正关键的部分其实在 runtime/async_engine.py。
AsyncEngine 的职责可以概括成一句话:
它把同步的
LLMEngine包装成一个异步前门,让多个 HTTP 请求可以共享同一个后台 step loop,同时又能各自拿到自己的增量输出。
它是怎么做到的
核心机制并不复杂,但非常关键:
- 每个请求分配一个
request_id; - 每个请求都注册一个自己的
asyncio.Queue; - 后台线程持续调用
self._engine.step(); - 新 token 通过
loop.call_soon_threadsafe(queue.put_nowait, item)投递到对应请求的 queue; - 前台的 async generator 用
await queue.get()逐个消费事件。
如果把这段机制翻译成一句更容易复述的话,就是:后台统一生产事件,前台按请求各自消费事件;客户端看到的是自己的输出流,但 GPU 实际上服务的是共享批次。
这意味着:
- 所有请求共享一个推理主循环;
- 每个请求仍然有自己的结果通道;
- continuous batching 和流式返回可以同时成立。
这就是 mini-infer 服务层设计最漂亮的地方之一。它没有为了流式输出牺牲批处理能力,也没有为了共享引擎把所有请求的输出混在一起。
为什么“每个请求一个 queue”是对的
这个设计看起来朴素,实际上非常合理。
如果没有每请求独立队列,会发生什么
一旦多个请求共享一个输出队列,你就必须额外做复杂的多路复用和分发逻辑。否则不同请求的 token 会混在一起,客户端根本没法正确消费。
独立队列带来的三个好处
- 请求之间的输出天然隔离;
- 服务层可以按请求维度处理断连和取消;
- 流式生成器只需要关心“从自己的 queue 里拿事件”,不需要自己再做二次分发。
这也是为什么 AsyncEngine 这层对理解 LLM serving 很重要。它不是简单的“异步语法糖”,而是在组织请求生命周期。
再往前推一步,这个设计还顺手解决了一个常见冲突:如果你把“一个连接对应一个同步生成过程”写死,服务层就很难再和 continuous batching 共存;而 queue + 后台 step loop 这种设计,天然允许“每个请求看起来是独立流”,但底层执行仍然是共享批处理。
断连处理为什么比“关闭连接”复杂得多
很多流式接口的真正问题,不是在正常路径,而是在异常路径。
想象一个客户端在输出到一半时断开连接。如果服务层只是把 HTTP 连接关掉,而后台引擎仍然继续为这个请求生成 token,就会出现两个后果:
- GPU 资源继续被无效请求占用;
- 请求状态和 KV cache 可能迟迟不被释放。
这就是为什么真正成熟的流式服务必须认真处理断连。
mini-infer 是怎么做的
AsyncEngine.generate_stream_events() 里有一个非常关键的 finally:
1 | finally: |
它做了两件事:
- 把这个请求的输出队列从追踪表里移除;
- 通知底层引擎取消该请求。
这不是一个“清理小动作”,而是保证系统不会因为客户端断连而持续浪费算力的关键步骤。
如果把断连后的清理时序拆开看,mini-infer 实际上做的是:
- 前台检测到流已经结束或客户端不再消费;
- 请求对应的 queue 从
_token_queues中移除; - 调用
cancel_request(rid)通知底层引擎停止追踪该请求; - 后续 step loop 不再为这个请求继续投递 token;
- 请求相关状态进入清理路径。
后台挂了,前台为什么不能一直等
另一个很容易被忽略的问题是:如果后台 step loop 出了异常,而前台还在 await queue.get(),会发生什么?
mini-infer 对这个问题的处理也很到位:
- 前台等待 queue 时使用了周期性超时;
- 超时后会做健康检查;
- 一旦后台线程异常退出,会给等待中的消费者投递
_DoneEvent("error")。
换句话说,前台不会无限期傻等,异常也会沿着事件链路传出来。这对流式服务尤其重要。
前端应该怎么消费这种流
很多文章讲 SSE 时只会提浏览器里的 EventSource,但对 OpenAI 风格的接口来说,这个建议经常不够准确。
EventSource 的限制
EventSource 非常适合传统 SSE 场景,但它天然偏向简单的 GET 请求。如果你的接口像 Chat Completions 这样是 POST 请求、需要自定义 headers、请求体里还有 JSON,那它就不那么顺手了。
为什么 OpenAI 风格接口更常见的是 fetch + ReadableStream
对这类接口来说,更常见的消费方式是:
- 用
fetch发起POST; - 拿到响应体的字节流;
- 按 SSE 的事件边界切分;
- 解析每个
data:事件; - 单独处理
[DONE]。
mini-infer 的 examples/openai_client.py 虽然是 Python 实现,但它清楚展示了这套消费逻辑的本质:不断读数据,跳过无关行,碰到 [DONE] 结束,其他行解析为 JSON chunk,再从 delta.content 里取增量文本。
如果以后你要在前端自己写消费逻辑,最稳妥的心智模型是:
- 先按 SSE 事件边界切分;
- 再从每个事件里解析
data:负载; - 再把负载分成
[DONE]和 JSON chunk 两类; - 最后才去取
delta.content。
前端最容易踩的三个坑
1. 把网络 chunk 当成事件边界
错。网络层怎么切块,不由你控制。你必须按 SSE 协议规定的空行来识别事件边界。
2. 把事件边界当成 token 边界
也错。服务端完全可能在一个事件里合并多个 token。
3. 只靠连接关闭判断结束
不够稳。真正可靠的是识别 [DONE],把它当成应用层完成信号。
这套设计为什么在工程上是成立的
把 mini-infer 的 SSE 设计拆开看,会发现它的分层非常清楚。
协议层:server.py
负责把请求转成流式 HTTP 响应,并把内部事件翻译成 OpenAI 风格的 SSE chunk。
异步协调层:async_engine.py
负责请求注册、事件投递、后台 step loop、跨线程通信、断连清理和异常传播。
引擎层:LLMEngine
负责真正的 continuous batching、调度和 token 生成。
这三层分开后,每层都能各司其职:
- 协议层不用知道底层调度细节;
- 异步协调层不用知道 HTTP 是怎么暴露的;
- 引擎层不用关心客户端究竟是浏览器、SDK 还是 benchmark。
这就是好设计最重要的特征:边界清楚。
建议和另外两篇一起看
这篇主要解决“流式路径到底怎么走”。如果你想把上下文补齐,建议再看:
- 从 ASGI 到推理服务:FastAPI、Starlette、Uvicorn 在 mini-infer 里如何协作:看
StreamingResponse为什么能作为服务栈的一部分成立。 - 把推理引擎接成标准接口:结合 mini-infer 讲清 OpenAI-Compatible HTTP API:看 SSE 为什么不是孤立协议,而是 Chat Completions 契约的一部分。
写在最后
如果你只把流式返回理解成“把答案一个字一个字打出来”,那你看到的只是表层现象。真正重要的是,这件事背后连接了三套机制:
- 协议层的
SSE; - 服务层的
StreamingResponse和请求生命周期; - 引擎层的
AsyncEngine + continuous batching。
mini-infer 这套实现之所以值得看,不是因为它“也支持流式输出”,而是因为它把一条真正完整的链路走通了:
- 客户端发
POST /v1/chat/completions; - 服务层返回
text/event-stream; - 后台异步引擎持续生产 token 事件;
- 断连时及时取消请求;
- 正常结束时明确发
[DONE]。
当你把这条链路讲顺,就会明白:SSE 在大模型服务里从来不是装饰项,它是把“模型正在一步步生成”这件事实,可靠地交付给客户端的那层协议桥梁。




