vLLM工作流程
请求类与请求的添加
1 | #vllm自带的Request |
1 | # vllm/vllm/v1/engine/llm_engine.py |
Engine Core:核心引擎
请求添加由 LLMEngine 的 add_request 方法完成
在 LLMEngine.engine_core 中初始化代码:benchmark_my.py 文件中的 LLMEngine 的 EngineCore 调用 SyncMPClient 运行
1 | # EngineCore (gets EngineCoreRequests and gives EngineCoreOutputs) |
/vllm/vllm/v1/engine/core_client.py
1 | class EngineCoreClient(ABC): |
multiprocess_mode 和 asyncio_mode 参数区别
multiprocess_mode: bool(多进程模式)- 作用: 决定 计算核心 (
EngineCore) 在哪里运行。 False(默认):EngineCore与调用它的代码在同一个进程中运行。通信是直接的方法调用,开销极小。True:EngineCore在一个或多个独立的后台子进程中运行。通信必须通过进程间通信(IPC,vLLM 使用 ZeroMQ)来完成。这提供了更好的隔离性和健壮性,并且是分布式推理所必需的。
- 作用: 决定 计算核心 (
asyncio_mode: bool(异步模式)- 作用: 决定 客户端应用程序的编程模型。
False(默认): 客户端是同步的。当你的代码调用一个操作(如generate)时,它会阻塞并一直等待,直到操作完成并返回结果。True: 客户端是异步的,基于 Python 的asyncio框架。你的代码使用await关键字进行调用,这允许在等待 I/O 操作(比如从EngineCore进程接收结果)时,事件循环可以去执行其他任务。这对于构建高并发的服务(如 Web 服务器)至关重要。
不同参数组合与生成的 Client
代码段 /mnt/disk1/ljm/qwen_test/vllm/vllm/v1/engine/core_client.py#L81-96 中的逻辑根据这两个 mode 的组合,通过一个工厂模式来创建不同类型的客户端。下面是四种组合的分析:
multiprocess_mode |
asyncio_mode |
生成的 Client | 解释和使用场景 |
|---|---|---|---|
False |
False |
InprocClient |
同步,单进程。这是最简单、最直接的模式。InprocClient 只是一个轻量级包装器,它直接持有 EngineCore 的实例并调用其方法。没有进程间通信的开销。场景: 本地开发、简单的脚本、单 GPU 推理。 |
True |
False |
SyncMPClient |
同步,多进程。客户端是同步阻塞的,但它通过进程间通信(IPC)与运行在后台进程中的 EngineCore 交互。SyncMPClient 封装了所有同步的 ZMQ 通信细节。场景: 分布式推理(例如 tensor_parallel_size > 1)时,使用同步的 LLMEngine。 |
True |
True |
AsyncMPClient (通过 make_async_mp_client 创建) |
异步,多进程。客户端是基于 asyncio 的,它通过非阻塞的 IPC 与后台的 EngineCore 进程通信。这是最高性能、最高并发的模式。场景: AsyncLLMEngine,用于构建高吞吐量的 API 服务,可以同时处理大量并发请求。 |
False |
True |
NotImplementedError |
(不支持)。代码明确禁止了这种组合。这是因为 EngineCore 的核心循环是一个阻塞的 while True 循环,将它直接放在 asyncio 的事件循环中而不隔离到单独的进程或线程中,会阻塞整个事件循环,使 asyncio 失去意义。 |
SyncMPClient:
客户端以同步方式运行
TODO:判断客户端是否需要改成异步
1 | class SyncMPClient(MPClient): |
launch_core_engines:MPClient对象init时调用
1 | # /vllm/vllm/v1/engine/utils.py文件 |
所以local_engine_manager会作为返回值返回给调用者MPClient
local_engine_manager:CoreEngineProcManager对象
核心功能:管理vllm后端引擎(EngineCore)子进程的整个生命周期
初始化:用python的multiprocessing模块创建多个进程,每个进程都准备好target_fn并且赋予唯一名称(如EngineCore_DP0),便于识别与调试
1 | # /vllm/vllm/v1/engine/utils.py文件 |
1 | # vllm/vllm/utils/__init__.py |
这里的 get_context(mp_method) 函数对应 multiprocessing 库中的 context.py 文件的 BaseContext 类的get_context 方法:
1 | # Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py |
所以vllm的CoreEngineProcManager中的context.Process(...)实际在创建新的Process对象,并将创建的Process对象添加到self.processes中
而后会对每个self.processes中的Process对象调用start()方法
这几个Process类的init和start函数相同,内容如下:
1 | # vllm/vllm/v1/engine/utils.py 文件 |
所以在
1 | class BaseProcess(object): |
- 对于
ForkContext得到的ForkProcess对象,创建的Popen类的_launch方法会调用ForkProcess的_bootstrap方法,在_bootstrap函数内部会调用process.run()从而启动target_fn- 默认用的是
ForkProcess
- 默认用的是
1 | class ForkProcess(process.BaseProcess): |
对于
SpawnContext得到的SpawnProcess对象,创建的Popen类的_launch方法本身不调用_bootstrap函数,因为它不创建能运行的python子进程,它的工作流程:- 序列化任务
- 启动新解释器,执行类似
python -m multiprocessing.spawn --pipe-handle=...的命令,启动一个全新的干净的python进程 - 传递数据,将打包好字节流发送给这个新的进程,新进程在自己的进程空间里调用
process_obj._bootstrap(),后者接着调用process_obj.run()
对于
forkserver模式:在“孙子”进程中调用这个模式结合
fork的速度和spawn的安全性当主程序调用
p.start()的时候popen_forkserver.py的_launch方法并不会自己创建进程,而是连接到一个正在运行的ForkServer,向他发送“帮我创建请求的申请”ForkServer收到请求后,执行os.fork()创建一个新的工作进程,这个工作进程可以看作主程序的“孙子”进程- 孙子进程将从主进程接收到的任务信息传递给新创建的工作进程,工作进程接收到序列化信息,反序列化出
process_obj对象,然后调用process_obj._bootstrap(),最终执行process.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29class Popen(popen_fork.Popen):
method = 'forkserver'
DupFd = _DupFd
def __init__(self, process_obj):
self._fds = []
super().__init__(process_obj). # init方法会调用_launch函数
def _launch(self, process_obj):
# 用spawn模式启动一个特殊的服务器进程,加载必要的库后进入等待状态
prep_data = spawn.get_preparation_data(process_obj._name)
buf = io.BytesIO()
set_spawning_popen(self)
try:
reduction.dump(prep_data, buf)
reduction.dump(process_obj, buf)
finally:
set_spawning_popen(None)
# 当主程序调用p.start()的时候
# popen_forkserver.py的_launch方法并不会自己创建进程,而是连接到一个正在运行的ForkServer,向他发送“帮我创建请求的申请
self.sentinel, w = forkserver.connect_to_new_process(self._fds)
# Keep a duplicate of the data pipe's write end as a sentinel of the
# parent process used by the child process.
_parent_w = os.dup(w)
self.finalizer = util.Finalize(self, util.close_fds,
(_parent_w, self.sentinel))
with open(w, 'wb', closefd=True) as f:
f.write(buf.getbuffer())
self.pid = forkserver.read_signed(self.sentinel)
process_obj 是 Process 对象
process_obj._bootstrap 内部调用逻辑代码如下
1 | # /multiprocessing/process.py |
因为vLLM在Linux中默认启动的是ForkProcess,所以这里直接会在_bootstrap方法中调用run()函数,启动EngineCoreProc.run_engine_core
1 | # vllm/vllm/v1/engine/core.py > EngineCoreProc |
EngineCore && EngineCoreProc&&DPEngineCore`
在 __run_engine_core__ 函数中会初始化 __DPEngineCore__ ,初始化的过程中实际上先后调用了3个 EngineCore 的 __init__() 函数
1 | class EngineCore: |
在完成 DPEngineCoreProc 实例化之后,__run_engine_core__ 方法会调用 __engine_core.run_busy_loop()__,启动引擎主循环,开始通过ZMQ接收并处理请求
MPClient 类
而后再度回到 MPClient 类中
1 | class MPClient(EngineCoreClient): |
OutputProcessor:负责处理engine core的输出
不参与核心的模型计算,专注于处理engine_core的输出,并且将其转化为用户最终需要的格式