vLLM工作流程

请求类与请求的添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#vllm自带的Request
class Request:
def __init__(
self,
request_id: str,
prompt_token_ids: Optional[list[int]], # 输入提示的token id列表
sampling_params: Optional[SamplingParams],
pooling_params: Optional[PoolingParams],
eos_token_id: Optional[int],
client_index: int = 0,
arrival_time: Optional[float] = None,
prompt_embeds: Optional[torch.Tensor] = None,
mm_features: Optional[list[MultiModalFeatureSpec]] = None,
lora_request: Optional["LoRARequest"] = None,
structured_output_request: Optional["StructuredOutputRequest"] = None,
cache_salt: Optional[str] = None,
priority: int = 0,
trace_headers: Optional[Mapping[str, str]] = None,
block_hasher: Optional[Callable[["Request"], list["BlockHash"]]] = None,
type_info: Optional[str] = "",
) -> None:
self.request_id = request_id # 请求唯一表示符,用于在整个生命周期中追踪请求
self.client_index = client_index
# 优先级,调度器根据优先级决定顺序
self.priority = priority
self.type_info = type_info
# 控制采样过程的参数
self.sampling_params = sampling_params
# 用于池化操作的参数,通常需要固定大小向量表示的模型中使用
self.pooling_params = pooling_params
# Because of LoRA, the eos token id can be different for each request.
self.eos_token_id = eos_token_id
self.lora_request = lora_request
self.structured_output_request = structured_output_request
# 请求到达引擎的时间,用于计算各种性能指标
self.arrival_time = arrival_time if arrival_time is not None else time.time()

self.status = RequestStatus.WAITING
self.use_structured_output = False
self.events: list[EngineCoreEvent] = []
self.stop_reason: Union[int, str, None] = None

# P/D: Connector-specific KV transfer parameters.
self.kv_transfer_params: Optional[dict[str, Any]] = None

if pooling_params is not None:
# Pooling models.
self.max_tokens = 1
elif sampling_params is not None:
# Generative models.
assert sampling_params.max_tokens is not None
self.max_tokens = sampling_params.max_tokens
if sampling_params.structured_outputs is not None:
self.status = RequestStatus.WAITING_FOR_FSM
self.use_structured_output = True

if sampling_params.extra_args is not None:
self.kv_transfer_params = sampling_params.extra_args.get(
"kv_transfer_params"
)
else:
raise ValueError("sampling_params and pooling_params can't both be unset")

self.prompt_token_ids = prompt_token_ids
self.prompt_embeds = prompt_embeds
self.num_prompt_tokens = length_from_prompt_token_ids_or_embeds(
prompt_token_ids, prompt_embeds
)
self._output_token_ids: list[int] = []
self._all_token_ids: list[int] = (
self.prompt_token_ids.copy()
if self.prompt_token_ids is not None
else [0] * self.num_prompt_tokens
)
self.num_output_placeholders = 0 # Used in async scheduling.
self.spec_token_ids: list[int] = []
self.num_computed_tokens = 0
self.cache_salt: Optional[str] = cache_salt

# Multi-modal related
# 用于多模态模型,使得vllm能够处理多模态请求
self.mm_features = mm_features or []
self.num_encoder_inputs = len(self.mm_features)
self.has_encoder_inputs = self.num_encoder_inputs > 0

# Read-only views
# Prevent directly appending to these lists since
# they should also be updated simultaneously.
self.output_token_ids = ConstantList(self._output_token_ids)
self.all_token_ids = ConstantList(self._all_token_ids)
# trace_headers
self.trace_headers = trace_headers
# State
# The number of tokens with prefix cache hits.
self.num_cached_tokens = -1

# The number of NaNs in logits. A value greater than 0
# indicates that the output is corrupted
self.num_nans_in_logits = 0

# The number of requests being preempted by the scheduler
self.num_preemptions = 0

self.block_hashes: list[BlockHash] = []
self.get_hash_new_full_blocks: Optional[Callable[[], list[BlockHash]]] = None
if block_hasher is not None:
self.get_hash_new_full_blocks = partial(block_hasher, self)
self.block_hashes = self.get_hash_new_full_blocks()


class RequestOutput:
"""The output data of a completion request to the LLM.
特别需要注意metrics这个参数
Args:
request_id: The unique ID of the request.
prompt: The prompt string of the request.
For encoder/decoder models, this is the
decoder input prompt.
prompt_token_ids: The token IDs of the prompt.
For encoder/decoder models, this is the
decoder input prompt token ids.
prompt_logprobs: The log probabilities to return per prompt token.
outputs: The output sequences of the request.
finished: Whether the whole request is finished.
metrics: Metrics associated with the request.
[Union[RequestMetrics, RequestStateStats]]
lora_request: The LoRA request that was used to generate the output.
encoder_prompt: The encoder prompt string of the request.
None if decoder-only.
encoder_prompt_token_ids: The token IDs of the encoder prompt.
None if decoder-only.
num_cached_tokens: The number of tokens with prefix cache hit.
kv_transfer_params: The params for remote K/V transfer.
"""

@dataclass
class RequestMetrics:
"""Metrics associated with a request.
记录处理请求过程中的详细时间戳,用于性能分析和监控
Attributes:
arrival_time: The time when the request arrived.
first_scheduled_time: The time when the request was first scheduled.
first_token_time: The time when the first token was generated.
time_in_queue: The time the request spent in the queue.
finished_time: The time when the request was finished.
scheduler_time: The time spent in the scheduler when this request was
being considered by the scheduler.
model_forward_time: The time spent in the model forward pass when this
request was in the batch.
model_execute_time: The time spent in the model execute function. This
will include model forward, block/sync across
workers, cpu-gpu sync time and sampling time.
"""

class RequestStatus(enum.IntEnum):
"""Status of a request."""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# vllm/vllm/v1/engine/llm_engine.py
def add_request(
self,
request_id: str, # 需要请求的id
prompt: Union[EngineCoreRequest, PromptType], # token列表
params: Union[SamplingParams, PoolingParams], # SamplingParams
arrival_time: Optional[float] = None, # 请求到达时间,主要用于计算请求在队列中的等待时间, 如果后续需要的话,可以增加这个功能
lora_request: Optional[LoRARequest] = None,
tokenization_kwargs: Optional[dict[str, Any]] = None,
trace_headers: Optional[Mapping[str, str]] = None, # 用于分布式追踪的头信息,当vllm作为大型系统的一部分时,用于追踪一个请求在不同服务之间的调用链路(重点关注)
priority: int = 0,
# 原始的提示文本字符串,主要用于日志记录和在output_processor中处理输出
prompt_text: Optional[str] = None,
type_info: Optional[str] = "",
) -> None:
# 核心逻辑:将request加工成EngineCoreRequest对象,并且发送给EngineCore
if isinstance(prompt, EngineCoreRequest):
request = prompt
else:
request = self.processor.process_inputs(
request_id,
prompt,
params,
arrival_time,
lora_request,
tokenization_kwargs,
trace_headers,
priority,
type_info,
)
self.output_processor.add_request(request, prompt_text, None, 0)
# 将请求添加到EngineCore.
self.engine_core.add_request(request)

Engine Core:核心引擎

请求添加由 LLMEngineadd_request 方法完成

LLMEngine.engine_core 中初始化代码:benchmark_my.py 文件中的 LLMEngineEngineCore 调用 SyncMPClient 运行

1
2
3
4
5
6
7
8
9
# EngineCore (gets EngineCoreRequests and gives EngineCoreOutputs)
self.engine_core = EngineCoreClient.make_client(
multiprocess_mode=multiprocess_mode, # 未启用
asyncio_mode=False,
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=self.log_stats,
)

/vllm/vllm/v1/engine/core_client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class EngineCoreClient(ABC):
"""
EngineCoreClient: subclasses handle different methods for pushing
and pulling from the EngineCore for asyncio / multiprocessing.

Subclasses:
* InprocClient: In process EngineCore (for V0-style LLMEngine use)
* SyncMPClient: ZMQ + background proc EngineCore (for LLM)
* AsyncMPClient: ZMQ + background proc EngineCore w/ asyncio (for AsyncLLM)
"""
@staticmethod
def make_client(
multiprocess_mode: bool, # False
asyncio_mode: bool, # False
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
) -> "EngineCoreClient":
# TODO: support this for debugging purposes.
if asyncio_mode and not multiprocess_mode:
raise NotImplementedError(
"Running EngineCore in asyncio without multiprocessing "
"is not currently supported."
)

if multiprocess_mode and asyncio_mode:
return EngineCoreClient.make_async_mp_client(
vllm_config, executor_class, log_stats
)

if multiprocess_mode and not asyncio_mode:
return SyncMPClient(vllm_config, executor_class, log_stats)

return InprocClient(vllm_config, executor_class, log_stats)

multiprocess_modeasyncio_mode 参数区别

  1. multiprocess_mode: bool (多进程模式)

    • 作用: 决定 计算核心 (EngineCore) 在哪里运行
    • False (默认): EngineCore 与调用它的代码在同一个进程中运行。通信是直接的方法调用,开销极小。
    • True: EngineCore 在一个或多个独立的后台子进程中运行。通信必须通过进程间通信(IPC,vLLM 使用 ZeroMQ)来完成。这提供了更好的隔离性和健壮性,并且是分布式推理所必需的。
  2. asyncio_mode: bool (异步模式)

    • 作用: 决定 客户端应用程序的编程模型
    • False (默认): 客户端是同步的。当你的代码调用一个操作(如 generate)时,它会阻塞并一直等待,直到操作完成并返回结果。
    • True: 客户端是异步的,基于 Python 的 asyncio 框架。你的代码使用 await 关键字进行调用,这允许在等待 I/O 操作(比如从 EngineCore 进程接收结果)时,事件循环可以去执行其他任务。这对于构建高并发的服务(如 Web 服务器)至关重要。

不同参数组合与生成的 Client

代码段 /mnt/disk1/ljm/qwen_test/vllm/vllm/v1/engine/core_client.py#L81-96 中的逻辑根据这两个 mode 的组合,通过一个工厂模式来创建不同类型的客户端。下面是四种组合的分析:

multiprocess_mode asyncio_mode 生成的 Client 解释和使用场景
False False InprocClient 同步,单进程。这是最简单、最直接的模式。InprocClient 只是一个轻量级包装器,它直接持有 EngineCore 的实例并调用其方法。没有进程间通信的开销。场景: 本地开发、简单的脚本、单 GPU 推理。
True False SyncMPClient 同步,多进程。客户端是同步阻塞的,但它通过进程间通信(IPC)与运行在后台进程中的 EngineCore 交互。SyncMPClient 封装了所有同步的 ZMQ 通信细节。场景: 分布式推理(例如 tensor_parallel_size > 1)时,使用同步的 LLMEngine
True True AsyncMPClient (通过 make_async_mp_client 创建) 异步,多进程。客户端是基于 asyncio 的,它通过非阻塞的 IPC 与后台的 EngineCore 进程通信。这是最高性能、最高并发的模式。场景: AsyncLLMEngine,用于构建高吞吐量的 API 服务,可以同时处理大量并发请求。
False True NotImplementedError (不支持)。代码明确禁止了这种组合。这是因为 EngineCore 的核心循环是一个阻塞的 while True 循环,将它直接放在 asyncio 的事件循环中而不隔离到单独的进程或线程中,会阻塞整个事件循环,使 asyncio 失去意义。

SyncMPClient:

客户端以同步方式运行

TODO:判断客户端是否需要改成异步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class SyncMPClient(MPClient):
"""Synchronous client for multi-proc EngineCore.
初始化一个多进程环境的同步客户端
"""
def __init__(
self, vllm_config: VllmConfig, executor_class: type[Executor], log_stats: bool
):
# 调用父类的初始化方法,设置异步模式为False
super().__init__(
# 客户端将以同步模式运行,当调用一个方法时,程序会阻塞并等待该方法返回结果(todo)
asyncio_mode=False,
vllm_config=vllm_config, # vllm引擎配置的完整参数
executor_class=executor_class, # 指定用于执行模型推理的执行器类型
log_stats=log_stats, # 布尔标志,用于控制是否记录和输出性能统计信息,已设置为True
)


# 父类 MPClient
class MPClient(EngineCoreClient):
"""
MPClient: base client for multi-proc EngineCore.
EngineCore runs in a background process busy loop, getting
new EngineCoreRequests and returning EngineCoreOutputs

* pushes EngineCoreRequests via input_socket
* pulls EngineCoreOutputs via output_socket

* AsyncMPClient subclass for AsyncLLM usage
* SyncMPClient subclass for LLM usage
"""

def __init__(
self,
asyncio_mode: bool,
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
client_addresses: Optional[dict[str, str]] = None,
):
# 初始化Msgpack编码器,用于将EngineCoreRequests编码为Msgpack格式
self.encoder = MsgpackEncoder()
# 初始化Msgpack解码器,用于将Msgpack格式的EngineCoreOutputs解码为EngineCoreOutputs对象
self.decoder = MsgpackDecoder(EngineCoreOutputs)
# ZMQ setup.初始化一个ZMQ上下文,ZMQ是一个高性能异步消息库,vllm使用它来实现进程间快速通信
sync_ctx = zmq.Context(io_threads=2)
self.ctx = zmq.asyncio.Context(sync_ctx) if asyncio_mode else sync_ctx

### 核心逻辑:使用launch_core_engines完成enginecore子进程的启动
if client_addresses:
# Engines are managed externally to this client.
input_address = client_addresses["input_address"]
output_address = client_addresses["output_address"]
self.stats_update_address = client_addresses.get("stats_update_address")
else:
# Engines are managed by this client.
# 关键上下文管理器,EngineCore进程在这里被实际创建和启动
# launch_core_engines根据vllm_config的配置,启动一个或多个EngineCore子进程
with launch_core_engines(vllm_config, executor_class, log_stats) as (
engine_manager,
coordinator,
addresses,
):
# address会返回子进程的通信地址,MCPClient获得地址后可以进行通信
self.resources.coordinator = coordinator
# MPClient持有一个对engine_manager的引用,用于管理和监控EngineCore子进程
self.resources.engine_manager = engine_manager

(input_address,) = addresses.inputs
(output_address,) = addresses.outputs
self.stats_update_address = addresses.frontend_stats_publish_address
if coordinator is not None:
assert self.stats_update_address == (
coordinator.get_stats_publish_address()
)

launch_core_enginesMPClient对象init时调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# /vllm/vllm/v1/engine/utils.py文件
@contextlib.contextmanager
def launch_core_engines(
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
num_api_servers: int = 1,
) -> Iterator[
tuple[
Optional[Union[CoreEngineProcManager, CoreEngineActorManager]],
Optional[DPCoordinator],
EngineZmqAddresses,
]
]:
"""Launch engine and DP coordinator processes as needed."""
#核心逻辑:
# 读取vllm config,解析配置,确定部署模式
# 准备通信地址(ZQM address)
# 启动数据并行协调器(DPCoordinator)
# 选择后端:Ray还是标准多进程multiprocessing
# 启动本地的EngineCore进程:(最核心的代码如下)

# 如果当前进程需要启动EngineCore子进程
if local_engine_count:
# 实际创建子进程的地方,在CoreEngineProcManager的构造函数中会根据local_engine_count的数量循环创建multiprocessing.Process对象
# 每个子进程的目标被设置成了EngineCoreProc.run_engine_core,这就是EngineCore的住循环函数
# 所有必要的配置都会传递给EngineCoreProc.run_engine_core函数
local_engine_manager = CoreEngineProcManager(
EngineCoreProc.run_engine_core, # target_fn
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=log_stats,
handshake_address=handshake_address,
client_handshake_address=client_handshake_address,
local_client=True,
local_engine_count=local_engine_count,
start_index=dp_rank,
local_start_index=local_start_index or 0,
)
else:
local_engine_manager = None
# 将控制权交还给调用者
yield local_engine_manager, coordinator, addresses
# with块结束后,等待引擎启动并完成清理

所以local_engine_manager会作为返回值返回给调用者MPClient

local_engine_managerCoreEngineProcManager对象

核心功能:管理vllm后端引擎(EngineCore)子进程的整个生命周期

初始化:用python的multiprocessing模块创建多个进程,每个进程都准备好target_fn并且赋予唯一名称(如EngineCore_DP0),便于识别与调试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# /vllm/vllm/v1/engine/utils.py文件
class CoreEngineProcManager:
"""
Utility class to handle creation, readiness, and shutdown
of background processes used by the AsyncLLM and LLMEngine.
"""

def __init__(
self,
target_fn: Callable,
local_engine_count: int,
start_index: int,
local_start_index: int,
vllm_config: VllmConfig,
local_client: bool,
handshake_address: str,
executor_class: type[Executor],
log_stats: bool,
client_handshake_address: Optional[str] = None,
):
context = get_mp_context() # address: vllm/vllm/utils/__init__.py
for index in range(local_engine_count):
local_index = local_start_index + index
global_index = start_index + index

# Start EngineCore in background process.
local_dp_ranks.append(local_index)
self.processes.append(
context.Process(
target=target_fn,
name=f"EngineCore_DP{global_index}",
kwargs=common_kwargs
| {
"dp_rank": global_index,
"local_dp_rank": local_index,
},
)
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#  vllm/vllm/utils/__init__.py
def _maybe_force_spawn():
"""Check if we need to force the use of the `spawn` multiprocessing start
method.
"""
if os.environ.get("VLLM_WORKER_MULTIPROC_METHOD") == "spawn":
return

reasons = []
if is_in_ray_actor():
# even if we choose to spawn, we need to pass the ray address
# to the subprocess so that it knows how to connect to the ray cluster.
# env vars are inherited by subprocesses, even if we use spawn.
import ray

os.environ["RAY_ADDRESS"] = ray.get_runtime_context().gcs_address
reasons.append("In a Ray actor and can only be spawned")

if cuda_is_initialized():
reasons.append("CUDA is initialized")
elif xpu_is_initialized():
reasons.append("XPU is initialized")

if reasons:
logger.warning(
"We must use the `spawn` multiprocessing start method. "
"Overriding VLLM_WORKER_MULTIPROC_METHOD to 'spawn'. "
"See https://docs.vllm.ai/en/latest/usage/"
"troubleshooting.html#python-multiprocessing "
"for more information. Reasons: %s",
"; ".join(reasons),
)
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"


def get_mp_context():
"""Get a multiprocessing context with a particular method (spawn or fork).
By default we follow the value of the VLLM_WORKER_MULTIPROC_METHOD to
determine the multiprocessing method (default is fork). However, under
certain conditions, we may enforce spawn and override the value of
VLLM_WORKER_MULTIPROC_METHOD.
"""
_maybe_force_spawn()
mp_method = envs.VLLM_WORKER_MULTIPROC_METHOD
return multiprocessing.get_context(mp_method)

这里的 get_context(mp_method) 函数对应 multiprocessing 库中的 context.py 文件的 BaseContext 类的get_context 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py
#
class BaseContext(object):

ProcessError = ProcessError
BufferTooShort = BufferTooShort
TimeoutError = TimeoutError
AuthenticationError = AuthenticationError

current_process = staticmethod(process.current_process)
parent_process = staticmethod(process.parent_process)
active_children = staticmethod(process.active_children)

def get_context(self, method=None):
if method is None:
return self
try:
ctx = _concrete_contexts[method]
except KeyError:
raise ValueError('cannot find context for %r' % method) from None
ctx._check_available()
return ctx
# 根据不同的method值返回对应的Context对象
_concrete_contexts = {
'fork': ForkContext(),
'spawn': SpawnContext(),
'forkserver': ForkServerContext(),
}

#Context对象的不同Process函数:
class ForkContext(BaseContext):
_name = 'fork'
Process = ForkProcess

class SpawnContext(BaseContext):
_name = 'spawn'
Process = SpawnProcess

class ForkServerContext(BaseContext):
_name = 'forkserver'
Process = ForkServerProcess
def _check_available(self):
if not reduction.HAVE_SEND_HANDLE:
raise ValueError('forkserver start method not available')

所以vllmCoreEngineProcManager中的context.Process(...)实际在创建新的Process对象,并将创建的Process对象添加到self.processes

而后会对每个self.processes中的Process对象调用start()方法

这几个Process类的initstart函数相同,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# vllm/vllm/v1/engine/utils.py 文件
class CoreEngineProcManager:
def __init__:
...
common_kwargs = {
"vllm_config": vllm_config,
"local_client": local_client,
"handshake_address": handshake_address,
"executor_class": executor_class,
"log_stats": log_stats,
}

...

context.Process(
target=target_fn, # EngineCoreProc.run_engine_core
name=f"EngineCore_DP{global_index}",
kwargs=common_kwargs
| {
"dp_rank": global_index,
"local_dp_rank": local_index,
},
)

...

for proc, local_dp_rank in zip(self.processes, local_dp_ranks):
with (
set_device_control_env_var(vllm_config, local_dp_rank)
if (data_parallel)
else contextlib.nullcontext()
):
proc.start()

...

def start(self):
'''
Start child process
'''
# 检查该process对象是否已经被关闭
self._check_closed()
# 确保这个进程对象没有被启动过
assert self._popen is None, 'cannot start a process twice'
# 确保只有这个对象的父进程才能启动它
assert self._parent_pid == os.getpid(), \
'can only start a process object created by current process'
# 确保当前的进程不是一个守护进程
assert not _current_process._config.get('daemon'), \
'daemonic processes are not allowed to have children'
# 调用内部函数,用于清理之前已经结束但资源还未回收的僵尸进程
_cleanup()
# 调用一个内部的、与平台相关的_Popen类真正的创建操作系统级别的子进程
self._popen = self._Popen(self)
# sentinel是一个哨兵,它是一个整数文件描述符或句柄,父进程通过监听这个sential状态得知子进程是否结束
# 是实现process.join()和process.is_alive()等功能的关键
self._sentinel = self._popen.sentinel
# Avoid a refcycle if the target function holds an indirect
# reference to the process object (see bpo-30775)
# 子进程被创建后,父进程不再需要保留对目标函数及其参数的饮用,删除它们避免潜在的循环引用
del self._target, self._args, self._kwargs
# 将这个process对象添加到一个全局集合_children中,这个集合由父进程维护,用于追踪所有由它启动的仍在运行的子进程
_children.add(self)

所以在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class BaseProcess(object):
'''
multiprocessing/process.py 文件
Process objects represent activity that is run in a separate process
The class is analogous to `threading.Thread`
'''
def _Popen(self):
raise NotImplementedError

def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
*, daemon=None):
assert group is None, 'group argument must be None for now'
count = next(_process_counter)
self._identity = _current_process._identity + (count,)
self._config = _current_process._config.copy()
self._parent_pid = os.getpid()
self._parent_name = _current_process.name
self._popen = None
self._closed = False
# target_fn 告诉 process 对象当这个子进程被启动时,应该执行哪个函数
# 这里 target_fn 就是 EngineCore 的入口函数,是子进程要执行的主要任务
self._target = target # target_fn = EngineCoreProc.run_engine_core
self._args = tuple(args)
# 包含 vllm_config, local_client, handshake_address, executor_class, log_stats, dp_rank, local_dp_rank参数组成的字典
self._kwargs = dict(kwargs)
# 这里 name = f"EngineCore_DP{global_index}"
self._name = name or type(self).__name__ + '-' + \
':'.join(str(i) for i in self._identity)
if daemon is not None:
self.daemon = daemon
_dangling.add(self)
  • 对于 ForkContext 得到的 ForkProcess 对象,创建的 Popen 类的 _launch 方法会调用 ForkProcess_bootstrap 方法,在 _bootstrap 函数内部会调用 process.run() 从而启动 target_fn
    • 默认用的是 ForkProcess
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ForkProcess(process.BaseProcess):
_start_method = 'fork'
@staticmethod
def _Popen(process_obj):
from .popen_fork import Popen
return Popen(process_obj)

class Popen(object):
method = 'fork'

def __init__(self, process_obj):
util._flush_std_streams()
self.returncode = None
self.finalizer = None
self._launch(process_obj) # 调用launch方法

def _launch(self, process_obj):
code = 1
parent_r, child_w = os.pipe()
child_r, parent_w = os.pipe()
self.pid = os.fork()
if self.pid == 0:
try:
atexit._clear()
atexit.register(util._exit_function)
os.close(parent_r)
os.close(parent_w)
# Process的_bootstrap方法会调用子进程process.run()启动子进程
code = process_obj._bootstrap(parent_sentinel=child_r)
finally:
atexit._run_exitfuncs()
os._exit(code)
else:
os.close(child_w)
os.close(child_r)
self.finalizer = util.Finalize(self, util.close_fds,
(parent_r, parent_w,))
self.sentinel = parent_r

  • 对于 SpawnContext 得到的 SpawnProcess 对象,创建的 Popen 类的 _launch 方法本身不调用 _bootstrap 函数,因为它不创建能运行的 python 子进程,它的工作流程:

    • 序列化任务
    • 启动新解释器,执行类似 python -m multiprocessing.spawn --pipe-handle=... 的命令,启动一个全新的干净的 python 进程
    • 传递数据,将打包好字节流发送给这个新的进程,新进程在自己的进程空间里调用 process_obj._bootstrap(),后者接着调用 process_obj.run()
  • 对于 forkserver 模式:在“孙子”进程中调用

    这个模式结合 fork 的速度和 spawn 的安全性

    当主程序调用 p.start() 的时候

    • popen_forkserver.py_launch 方法并不会自己创建进程,而是连接到一个正在运行的 ForkServer,向他发送“帮我创建请求的申请”
    • ForkServer 收到请求后,执行 os.fork() 创建一个新的工作进程,这个工作进程可以看作主程序的“孙子”进程
    • 孙子进程将从主进程接收到的任务信息传递给新创建的工作进程,工作进程接收到序列化信息,反序列化出 process_obj 对象,然后调用 process_obj._bootstrap(),最终执行 process.run()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    class Popen(popen_fork.Popen):
    method = 'forkserver'
    DupFd = _DupFd

    def __init__(self, process_obj):
    self._fds = []
    super().__init__(process_obj). # init方法会调用_launch函数

    def _launch(self, process_obj):
    # 用spawn模式启动一个特殊的服务器进程,加载必要的库后进入等待状态
    prep_data = spawn.get_preparation_data(process_obj._name)
    buf = io.BytesIO()
    set_spawning_popen(self)
    try:
    reduction.dump(prep_data, buf)
    reduction.dump(process_obj, buf)
    finally:
    set_spawning_popen(None)
    # 当主程序调用p.start()的时候
    # popen_forkserver.py的_launch方法并不会自己创建进程,而是连接到一个正在运行的ForkServer,向他发送“帮我创建请求的申请
    self.sentinel, w = forkserver.connect_to_new_process(self._fds)
    # Keep a duplicate of the data pipe's write end as a sentinel of the
    # parent process used by the child process.
    _parent_w = os.dup(w)
    self.finalizer = util.Finalize(self, util.close_fds,
    (_parent_w, self.sentinel))
    with open(w, 'wb', closefd=True) as f:
    f.write(buf.getbuffer())
    self.pid = forkserver.read_signed(self.sentinel)

process_objProcess 对象

process_obj._bootstrap 内部调用逻辑代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# /multiprocessing/process.py
class BaseProcess(object):
def _bootstrap(self, parent_sentinel=None):
from . import util, context
global _current_process, _parent_process, _process_counter, _children

try:
if self._start_method is not None:
context._force_start_method(self._start_method)
_process_counter = itertools.count(1)
_children = set()
util._close_stdin()
old_process = _current_process
_current_process = self
_parent_process = _ParentProcess(
self._parent_name, self._parent_pid, parent_sentinel)
if threading._HAVE_THREAD_NATIVE_ID:
threading.main_thread()._set_native_id()
try:
self._after_fork()
finally:
# delay finalization of the old process object until after
# _run_after_forkers() is executed
del old_process
util.info('child process calling self.run()')
self.run() # 调用run函数,启动target_fn,并传入准备好的参数
exitcode = 0
except SystemExit as e:
if e.code is None:
exitcode = 0
elif isinstance(e.code, int):
exitcode = e.code
else:
sys.stderr.write(str(e.code) + '\n')
exitcode = 1
except:
exitcode = 1
import traceback
sys.stderr.write('Process %s:\n' % self.name)
traceback.print_exc()
finally:
threading._shutdown()
util.info('process exiting with exitcode %d' % exitcode)
util._flush_std_streams()

return exitcode

def run(self):
'''
Method to be run in sub-process; can be overridden in sub-class
'''
if self._target:
self._target(*self._args, **self._kwargs) # EngineCoreProc.run_engine_core

因为vLLM在Linux中默认启动的是ForkProcess,所以这里直接会在_bootstrap方法中调用run()函数,启动EngineCoreProc.run_engine_core

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# vllm/vllm/v1/engine/core.py > EngineCoreProc
class EngineCoreProc(EngineCore):
"""ZMQ-wrapper for running EngineCore in background process."""

ENGINE_CORE_DEAD = b"ENGINE_CORE_DEAD"

@staticmethod # 静态方法,调用它不需要与类的任何特定实例绑定
def run_engine_core(*args, dp_rank: int = 0, local_dp_rank: int = 0, **kwargs):
"""
Launch EngineCore busy loop in background process.
主要作用:在新的子进程环境中,完成EngineCoreProc实例的创建、初始化和运行
"""
# Signal handler used for graceful termination.
# SystemExit exception is only raised once to allow this and worker
# processes to terminate without error
shutdown_requested = False

# Ensure we can serialize transformer config after spawning
maybe_register_config_serialize_by_value()

def signal_handler(signum, frame):
nonlocal shutdown_requested
if not shutdown_requested:
shutdown_requested = True
raise SystemExit()

# Either SIGTERM or SIGINT will terminate the engine_core
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)

engine_core: Optional[EngineCoreProc] = None
try:
parallel_config: ParallelConfig = kwargs["vllm_config"].parallel_config
if parallel_config.data_parallel_size > 1 or dp_rank > 0:
set_process_title("EngineCore", f"DP{dp_rank}")
decorate_logs()
# Set data parallel rank for this engine process.
parallel_config.data_parallel_rank = dp_rank
parallel_config.data_parallel_rank_local = local_dp_rank
# 核心:这里因为启动dp,所以初始化DPEngineCoreProc对象
engine_core = DPEngineCoreProc(*args, **kwargs)
else:
set_process_title("EngineCore")
decorate_logs()
engine_core = EngineCoreProc(*args, **kwargs)

engine_core.run_busy_loop() # 进入DPEngineCoreProc的主循环逻辑

except SystemExit: ...
except Exception as e: ...
finally: ...

EngineCore && EngineCoreProc&&DPEngineCore`

__run_engine_core__ 函数中会初始化 __DPEngineCore__ ,初始化的过程中实际上先后调用了3个 EngineCore__init__() 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class EngineCore:
"""
Inner loop of vLLM's Engine. 初始化链的最后一环,也是最核心的部分,负责模型加载、调度器和KV缓存的创建
"""

def __init__(
self,
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
executor_fail_callback: Optional[Callable] = None,
):
...

# 实例化模型执行器:包括在指定GPU上加载模型权重,初始化CUDA内核,设置分布式环境
self.model_executor = executor_class(vllm_config)
if executor_fail_callback is not None:
self.model_executor.register_failure_callback(executor_fail_callback)

self.available_gpu_memory_for_kv_cache = -1

# 初始化KV缓存:通过一次试运行(profilling)来检测模型的峰值内存使用量
num_gpu_blocks, num_cpu_blocks, kv_cache_config = self._initialize_kv_caches(
vllm_config
)

vllm_config.cache_config.num_gpu_blocks = num_gpu_blocks
vllm_config.cache_config.num_cpu_blocks = num_cpu_blocks
self.collective_rpc("initialize_cache", args=(num_gpu_blocks, num_cpu_blocks))

self.structured_output_manager = StructuredOutputManager(vllm_config)

# 实例化调度器
if isinstance(vllm_config.scheduler_config.scheduler_cls, str):
Scheduler = resolve_obj_by_qualname(
vllm_config.scheduler_config.scheduler_cls
)
else:
Scheduler = vllm_config.scheduler_config.scheduler_cls
...

class EngineCoreProc(EngineCore):
"""ZMQ-wrapper for running EngineCore in background process."""
# 这个类用于初始化进程间的通信(使用ZMQ)
ENGINE_CORE_DEAD = b"ENGINE_CORE_DEAD"
def __init__(
self,
vllm_config: VllmConfig,
local_client: bool,
handshake_address: str,
executor_class: type[Executor],
log_stats: bool,
client_handshake_address: Optional[str] = None,
engine_index: int = 0,
):
...

# 进程标识
self.engine_index = engine_index
# 将数字索引转换成字符串,作为ZMQ通信的身份标识,确保消息正确路由
identity = self.engine_index.to_bytes(length=2, byteorder="little")
self.engines_running = False
# 执行ZQM握手
with self._perform_handshakes(
handshake_address,
identity,
local_client,
vllm_config,
client_handshake_address,
) as addresses:
self.client_count = len(addresses.outputs)

# Set up data parallel environment.
self.has_coordinator = addresses.coordinator_output is not None
self.frontend_stats_publish_address = (
addresses.frontend_stats_publish_address
)
...
self._init_data_parallel(vllm_config)
# 再次调用父类EngineCore的init方法
super().__init__(
vllm_config, executor_class, log_stats, executor_fail_callback
)

...


class DPEngineCoreProc(EngineCoreProc):
"""ZMQ-wrapper for running EngineCore in background process
in a data parallel context."""

def __init__(
self,
vllm_config: VllmConfig,
local_client: bool,
handshake_address: str,
executor_class: type[Executor],
log_stats: bool,
client_handshake_address: Optional[str] = None,
):
# 初始化步数计数器,在数据并行模式下,各个副本需要定期同步状态
# 计数器用于每N次模型前向计算后同步一次
self.step_counter = 0
# 初始化请求波次计数器,在DP模式下,请求可以分批次处理,有助于协调不同副本间的请求处理进度
self.current_wave = 0
self.last_counts = (0, 0)

# 获取数据并行排名
dp_rank = vllm_config.parallel_config.data_parallel_rank
# 调用父类init方法
super().__init__(
vllm_config,
local_client,
handshake_address,
executor_class,
log_stats,
client_handshake_address,
dp_rank,
)

在完成 DPEngineCoreProc 实例化之后,__run_engine_core__ 方法会调用 __engine_core.run_busy_loop()__,启动引擎主循环,开始通过ZMQ接收并处理请求

MPClient

而后再度回到 MPClient 类中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class MPClient(EngineCoreClient):
"""
MPClient: base client for multi-proc EngineCore.
EngineCore runs in a background process busy loop, getting
new EngineCoreRequests and returning EngineCoreOutputs

* pushes EngineCoreRequests via input_socket
* pulls EngineCoreOutputs via output_socket

* AsyncMPClient subclass for AsyncLLM usage
* SyncMPClient subclass for LLM usage
"""
def start_engine_core_monitor(self):
"""Start a monitor thread for engine core processes."""
engine_manager = self.resources.engine_manager
if (
engine_manager is None
or not hasattr(engine_manager, "processes")
or not engine_manager.processes
):
# No engine processes to monitor
return

# 获取要监控的线程
engine_processes = engine_manager.processes
self_ref = weakref.ref(self)

# 监控程序要执行的核心函数
def monitor_engine_cores():
# 获取所有EngineCore进程的sentinel,当一个进程终止时,它的sentinel会变成就绪状态
sentinels = [proc.sentinel for proc in engine_processes]
# wait函数阻塞所有当前线程,知道sentinels列表中任意一个sentinel变为就绪状态
died = multiprocessing.connection.wait(sentinels)
_self = self_ref()
if not _self or _self.resources.engine_dead:
return
_self.resources.engine_dead = True
proc_name = next(
proc.name for proc in engine_processes if proc.sentinel == died[0]
)
logger.error(
"Engine core proc %s died unexpectedly, shutting down client.",
proc_name,
)
_self.shutdown()
# Note: For MPClient, we don't have a failure callback mechanism
# like MultiprocExecutor, but we set engine_dead flag which will
# cause subsequent operations to raise EngineDeadError
# 创建一个新的进程来执行 monitor_engine_cores 函数,启动线程,开启后台监控任务
Thread(
target=monitor_engine_cores, daemon=True, name="MPClientEngineMonitor"
).start()

OutputProcessor:负责处理engine core的输出

不参与核心的模型计算,专注于处理engine_core的输出,并且将其转化为用户最终需要的格式

评论