vLLMEngineCore

1. DPEngineCoreProc/EngineCoreProc/EngineCore

在执行流程(1)中梳理了DPEngineCoreProc对象的init方法的主要流程,run_engine_core()在成功初始化DPEngineCoreProc对象后会调用它的run_busy_loop()方法,启动这个引擎的主循环,开始通过ZMQ接收并处理推理请求

下面是DPEngineCoreProcrun_busy_loop()方法代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# vllm/vllm/v1/engine/core.py > DPEngineCoreProc
class DPEngineCoreProc(EngineCoreProc):
"""ZMQ-wrapper for running EngineCore in background process
in a data parallel context."""

def run_busy_loop(self):
"""Core busy loop of the EngineCore for data parallel case."""

# Loop until process is sent a SIGINT or SIGTERM
while True:
# 1) 处理输入队列:从输入队列中获取新的任务
self._process_input_queue()

# 2) 执行引擎单步
# 引擎的核心工作负载,负责调度和执行一个批次的推理
# executed为布尔值,=1表示这一步中实际处理了至少一个token
executed = self._process_engine_step()
self._maybe_publish_request_counts()

# 首先检查当前worker是否还有未完成的请求
local_unfinished_reqs = self.scheduler.has_unfinished_requests()
# 处理跑空的情况,如果这一步没有执行任何实际的计算:
if not executed:
# 如果本地没有未完成的请求并且全局状态也为false(所有引擎都空闲)
# 直接进入下一个循环等待新任务
if not local_unfinished_reqs and not self.engines_running:
# All engines are idle.
continue

# We are in a running state and so must execute a dummy pass
# if the model didn't execute any ready requests.
self.execute_dummy_batch()

# 3) 全局状态同步,在数据并行的所有worker之间同步状态,判断整个波次的请求是否已经全部完成
# 该操作的返回值会更新self.engines_running
# true表示整个数据并行组仍然繁忙,false表示所有worker都已完成当前波次的任务
self.engines_running = self._has_global_unfinished_reqs(
local_unfinished_reqs
)

# 4)波次完成处理
if not self.engines_running:
if self.dp_rank == 0 or not self.has_coordinator:
# Notify client that we are pausing the loop.
logger.debug(
"Wave %d finished, pausing engine loop.", self.current_wave
)
# In the coordinator case, dp rank 0 sends updates to the
# coordinator. Otherwise (offline spmd case), each rank
# sends the update to its colocated front-end process.
client_index = -1 if self.has_coordinator else 0
# 通知客户端可以安全地发送下一个波次的请求了
self.output_queue.put_nowait(
(
client_index,
EngineCoreOutputs(wave_complete=self.current_wave),
)
)
# 更新计数器
self.current_wave += 1
self.step_counter = 0

这里调用到的核心函数是 __EngineCore__类里的_process_engine_step()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# /vllm/vllm/v1/engine/core.py > EngineCore
class EngineCore:
"""Inner loop of vLLM's Engine."""

def __init__(
self,
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
executor_fail_callback: Optional[Callable] = None,
):
# multiprocexecutor
self.model_executor = executor_class(vllm_config)
if executor_fail_callback is not None:
self.model_executor.register_failure_callback(executor_fail_callback)
# step_fn被初始化为 self.step 或者 self.step_with_batch_queue
self.step_fn = (
self.step if self.batch_queue is None else self.step_with_batch_queue
)

def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
"""Schedule, execute, and make output.

Returns tuple of outputs and a flag indicating whether the model
was executed.
"""

# 调用调度器,生成一个批次的待处理请求
if not self.scheduler.has_requests():
return {}, False
scheduler_output = self.scheduler.schedule()
# 将这个批次送入GPU执行模型的前向传播,这一步是阻塞的,函数会一直等待模型执行完成
model_output = self.execute_model_with_error_logging(
self.model_executor.execute_model, # type: ignore
scheduler_output,
)
# 模型执行完毕后,用返回的结果更新调度器中各个请求的状态
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output
) # type: ignore

return (engine_core_outputs, scheduler_output.total_num_scheduled_tokens > 0)

# 异步流水线模式
def step_with_batch_queue(
self,
) -> tuple[Optional[dict[int, EngineCoreOutputs]], bool]:
"""Schedule and execute batches with the batch queue.
Note that if nothing to output in this step, None is returned.

The execution flow is as follows:
1. Try to schedule a new batch if the batch queue is not full.
If a new batch is scheduled, directly return an empty engine core
output. In other words, fulfilling the batch queue has a higher priority
than getting model outputs.
2. If there is no new scheduled batch, meaning that the batch queue
is full or no other requests can be scheduled, we block until the first
batch in the job queue is finished.
3. Update the scheduler from the output.
"""
batch_queue = self.batch_queue
assert batch_queue is not None

# Try to schedule a new batch if the batch queue is not full, but
# the scheduler may return an empty batch if all requests are scheduled.
# Note that this is not blocking.
assert len(batch_queue) < self.batch_queue_size

model_executed = False

if self.scheduler.has_requests():
# 生成一个新的批次
scheduler_output = self.scheduler.schedule()
# non_block=True,模型执行并立即返回一个 future 对象,而不会等待计算完成
# self.model_executor 是 Multiprocexecutor 类对象
future = self.model_executor.execute_model(scheduler_output, non_block=True)
# 将 future 对象和相关的调度信息放入 batch_queue 中
batch_queue.appendleft((future, scheduler_output)) # type: ignore[arg-type]

model_executed = scheduler_output.total_num_scheduled_tokens > 0
if (
model_executed
and len(batch_queue) < self.batch_queue_size
and not batch_queue[-1][0].done()
):
# Don't block on next worker response unless the queue is full
# or there are no more requests to schedule.
# 返回empty core output, 添加新的请求比得到输出有更高的优先级
return None, True

elif not batch_queue:
# Queue is empty. We should not reach here since this method should
# only be called when the scheduler contains requests or the queue
# is non-empty.
return None, False

# 如果队列已满或者没有新的请求可以调度,函数从队列的另一端取出最先提交的那个批次的future
future, scheduler_output = batch_queue.pop()
model_output = self.execute_model_with_error_logging(
lambda _: future.result(), scheduler_output
)
# 获取输出,并且更新状态
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output
)

return engine_core_outputs, model_executed

class EngineCoreProc(EngineCore):
"""ZMQ-wrapper for running EngineCore in background process."""
ENGINE_CORE_DEAD = b"ENGINE_CORE_DEAD"

def _process_input_queue(self):
"""Exits when an engine step needs to be performed."""

waited = False
while (
not self.engines_running
and not self.scheduler.has_requests()
and not self.batch_queue
):
if logger.isEnabledFor(DEBUG) and self.input_queue.empty():
logger.debug("EngineCore waiting for work.")
waited = True
req = self.input_queue.get()
self._handle_client_request(*req)

if waited:
logger.debug("EngineCore loop active.")

# Handle any more client requests.
while not self.input_queue.empty():
req = self.input_queue.get_nowait()
self._handle_client_request(*req)

def _process_engine_step(self) -> bool:
"""
Called only when there are unfinished local requests.
对step_fn的核心分装,将step和step_with_batch_queue两种模式的共同逻辑抽象出来
"""
# 核心调用. outputs 输出结果以及 model_executed 标志
# step_fn在 EngineCore 中被初始化
outputs, model_executed = self.step_fn()
# 将 step_fn 返回的,需要发送给前端的推理结果放入一个输出队列
for output in outputs.items() if outputs else ():
self.output_queue.put_nowait(output)
# 调用一个后处理钩子,负责更新模型状态
self.post_step(model_executed)
# 返回一个模型executed标志,在run_busy_loop中用来判断是否需要执行dummy_batch以保持分布式同步
return model_executed

2. MultiprocExecutor

EngineCore对象中的self.model_executor被初始化为__MultiprocExecutor__

(这里executor类型的决策逻辑在/vllm/vllm/v1/executor/abstract.py的Executor.getclass()方法中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# /vllm/vllm/v1/executor/multiproc_executor.py
class MultiprocExecutor(Executor):
supports_pp: bool = True

# _init_executor()由父类Executor的父类ExecutorBase的__init__()函数调用
# 父类的__init__()函数是空壳,不同的executor主要通过_init_executor函数初始化
def _init_executor(self) -> None:
# 创建广播消息队列,executor之后会把所有指令(rpc调用)都放入这个队列中
self.rpc_broadcast_mq = MessageQueue(
self.world_size, self.world_size, max_chunk_bytes=max_chunk_bytes
) # n_local_reader = self.world_size,全为本地reader

# 创建并同步worker子进程
context = get_mp_context()
shared_worker_lock = context.Lock()
unready_workers: list[UnreadyWorkerProcHandle] = []
success = False
try:
# 为每个rank创建一个对应的worker进程
for rank in range(self.world_size):
# 每创建一个进程,就把管理这个进程的句柄UnreadyWorkerProcHandle存入unready_workers列表
unready_workers.append(
WorkerProc.make_worker_process(
vllm_config=self.vllm_config,
local_rank=rank,
rank=rank,
distributed_init_method=distributed_init_method,
# 通过scheduler_output_handle连接到主进程队列,可以接收主进程的broadcast
input_shm_handle=scheduler_output_handle,
shared_worker_lock=shared_worker_lock,
)
)

# 第一个关键同步点,主进程在这里暂停,逐个等待unready_workers列表中所有子进程完成各自初始化
# 例如加载到GPU,初始化torch.distributed环境等
# self.workers存储的是完全准备就绪的worker句柄列表
self.workers = WorkerProc.wait_for_ready(unready_workers)

# 第二个关键同步点,等待消息队列就绪
self.rpc_broadcast_mq.wait_until_ready()
for w in self.workers:
w.worker_response_mq.wait_until_ready()

# 创建io线程池
if self.max_concurrent_batches > 1:
self.io_thread_pool = ThreadPoolExecutor(
max_workers=1, thread_name_prefix="mp_exec_io"
)

def execute_model(
self,
scheduler_output: SchedulerOutput, # 包含调度器决定在当前推理步骤中需要运行的所有序列
# 一个布尔标志,决定此调用阻塞还是非阻塞,非阻塞模式用于实现CPU调度与GPU计算的流水线,可以提高效率
non_block: bool = False,
) -> Union[ModelRunnerOutput, Future[ModelRunnerOutput]]:
# 如果没有流水线并行,通常是张量并行
# 所有worker(GPU)会协同完成整个模型的单次推理
# 虽然每个worker都在计算,但最终只有一个worker(通常是rank 0)需要负责整合并返回最终的输出结果
if not self.has_connector:
# get output only from a single worker (output_rank)
# 向所有worker进程广播指令,要求他们执行内部的execute_model方法
(output,) = self.collective_rpc(
"execute_model",
args=(scheduler_output,),
unique_reply_rank=self.output_rank, # 通常是rank 0
non_block=non_block,
timeout=envs.VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS,
)
return output # 直接返回从主 worker 获取的 ModelRunnerOutput 对象

# 启用了流水线并行,每个worker作为流水线阶段,只执行模型的一部分
# 每个worker会产生中间结果,主进程需要收集所有worker的输出,然后聚合成一个有意义的最终结果
outputs = self.collective_rpc( # 没有unique_reply_rank参数
"execute_model",
args=(scheduler_output,),
non_block=non_block,
timeout=envs.VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS,
)

# kv输出聚合器处理来自不同流水线的outputs列表,整合成一个单一的、完整的ModelRunnerOutput
if non_block:
return self.kv_output_aggregator.async_aggregate(outputs, self.output_rank)
return self.kv_output_aggregator.aggregate(outputs, self.output_rank)


def collective_rpc(
self,
method: Union[str, Callable], # 要在worker进程上执行的方法,可以是字符串或可调用对象
timeout: Optional[float] = None, # 超时时间
args: tuple = (),
kwargs: Optional[dict] = None,
non_block: bool = False, # 关键参数:如果false则函数会等待知道收到结果才返回
# 如果true则立即返回一个future对象
unique_reply_rank: Optional[int] = None, # 如果提供一个值,则只返回特定worker的结果
) -> list[Any]:
# 通信核心枢纽,主要工作是将一个指令广播给所有或部分worker子进程,然后收集他们执行的结果
# 把方法广播到所有worker,每个worker都会收到并执行这个method
# rpc_broadcast_mq是MessageQueue对象,在_init_executor中被初始化
self.rpc_broadcast_mq.enqueue(
(send_method, args, kwargs, unique_reply_rank)
)
# 确定要从哪些workers中收集结果
workers = (
(self.workers[unique_reply_rank],)
if unique_reply_rank is not None
else self.workers
)
responses = []
# 封装了从单个worker获取相应的逻辑
# 从指定worker的私有响应队列(w.worker_response_mq)中dequeue一个结果,同时检查返回状态
def get_response(
w: WorkerProcHandle,
dequeue_timeout: Optional[float] = None,
cancel_event: Optional[threading.Event] = None,
):
status, result = w.worker_response_mq.dequeue(
timeout=dequeue_timeout, cancel=cancel_event
)
...
return result
# 核心逻辑
# 遍历目标worker
for w in workers:
dequeue_timeout = (
None if deadline is None else (deadline - time.monotonic())
)
# 如果启动io线程池以支持异步操作
if self.io_thread_pool is not None:
# 将get_response任务提交给线程池执行,该调用非阻塞,立即返回一个future对象
# 这个future对象代表一个未来会完成的计算
result = self.io_thread_pool.submit( # type: ignore
get_response, w, dequeue_timeout, self.shutdown_event
)
# 阻塞模式:
# 立即调用Future.result()
# result()阻塞当前线程直到Future完成并返回其计算结果
if not non_block:
result = result.result()

# 没有io线程池+阻塞模式:
# 直接调用get_response,这个调用会阻塞,直到worker的队列中获取到结果
elif not non_block:
result = get_response(w, dequeue_timeout, self.shutdown_event)
else:
raise RuntimeError(
"non_block can only be used when max_concurrent_batches > 1"
)
# 将获取到的结果添加到responses列表中
responses.append(result)

return responses

3. MessageQueue类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# vllm/vllm/distributed/device/device_cummunicators/shm_broadcast.py		
class MessageQueue:
def __init__(
self,
n_reader, # number of all readers
n_local_reader, # number of local readers through shared memory
local_reader_ranks: Optional[list[int]] = None,
max_chunk_bytes: int = 1024 * 1024 * 10,
max_chunks: int = 10,
connect_ip: Optional[str] = None,
):
local_reader_ranks = list(range(n_local_reader))
self.n_local_reader = n_local_reader
if n_local_reader > 0:
self.buffer = ShmRingBuffer(n_local_reader, max_chunk_bytes, max_chunks)
self.local_socket = context.socket(XPUB)
self.local_socket.setsockopt(XPUB_VERBOSE, True)
local_subscribe_addr = get_open_zmq_ipc_path()
logger.debug("Binding to %s", local_subscribe_addr)
self.local_socket.bind(local_subscribe_addr)
self.current_idx = 0
# handle用于让子进程接收父进程的广播
self.handle = Handle(
local_reader_ranks=local_reader_ranks, # list(range(n_local_reader))
buffer_handle=self.buffer.handle() if self.buffer is not None else None,
local_subscribe_addr=local_subscribe_addr,
remote_subscribe_addr=remote_subscribe_addr,
remote_addr_ipv6=remote_addr_ipv6,
)

def export_handle(self) -> Handle:
return self.handle

# 作用:共享内存广布队列的写入端接口,用于把一条消息广播给所有的监听者(本地和远端)
# 所有workers都会接收到主线程的Broadcast
def enqueue(self, obj, timeout: Optional[float] = None):
# 只允许写端调用
assert self._is_writer, "Only writers can enqueue"
# 序列化对象,将任意python对象转化为字节流
serialized_obj = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
# 如果存在本地读者(同一台机子,共享内存最快)
if self.n_local_reader > 0: # n_local_reader = world_size全为本地reader
# 溢出:无法内联写入共享内存
if len(serialized_obj) >= self.buffer.max_chunk_bytes:
with self.acquire_write(timeout) as buf:
buf[0] = 1 # overflow
# 改用本地套接字传输完整内容
self.local_socket.send(serialized_obj)
# 未溢出:将消息内联写入共享内存
else:
with self.acquire_write(timeout) as buf:
buf[0] = 0 # not overflow
buf[1 : len(serialized_obj) + 1] = serialized_obj
# 远端读者路径,直接通过远端套接字发送整条信息
if self.n_remote_reader > 0:
self.remote_socket.send(serialized_obj)

4. WorkProc类

MultiprocExecutor._init_executor() 循环中,每次调用make_worker_process这个函数就会有一个新的WorkerProc被创建出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# vllm/vllm/v1/executor/multiproc_executor.py
class WorkerProc:
"""创建并启动一个全新的、独立的worker子进程"""
READY_STR = "READY"

def __init__(
self,
vllm_config: VllmConfig,
local_rank: int,
rank: int,
distributed_init_method: str,
input_shm_handle: Handle,
shared_worker_lock: LockType,
):
self.rank = rank
# 创建WorkerWrapperBase
wrapper = WorkerWrapperBase(vllm_config=vllm_config, rpc_rank=rank)
# 按照wrapper.init_worker构建真实worker,完成分布式相关准备工作
all_kwargs: list[dict] = [
{} for _ in range(vllm_config.parallel_config.world_size)
]
is_driver_worker = rank % vllm_config.parallel_config.tensor_parallel_size == 0
all_kwargs[rank] = {
"vllm_config": vllm_config,
"local_rank": local_rank,
"rank": rank,
"distributed_init_method": distributed_init_method,
"is_driver_worker": is_driver_worker,
}
# 构建真实的worker,完成分布式相关准备工作
wrapper.init_worker(all_kwargs)
self.worker = wrapper

# 用父进程传来的handle建立接收调度输出的广播队列
self.rpc_broadcast_mq = MessageQueue.create_from_handle(
input_shm_handle, self.worker.rank
)
# 为子到父的结果返回构建单读者消息队列
self.worker_response_mq = MessageQueue(1, 1)

# 这个函数处理了所有与multiprocessing库相关的底层细节,为主进程提供了一个干净、简单的接口来生产worker
@staticmethod
def make_worker_process(
vllm_config: VllmConfig,
local_rank: int,
rank: int,
distributed_init_method: str,
input_shm_handle, # Receive SchedulerOutput
shared_worker_lock: LockType,
) -> UnreadyWorkerProcHandle: # 返回一个UnreadyWorkerProcHandle对象
# 返回BaseContext对象,封装具体进程启动方式,并提供与该方式一致的API
context = get_mp_context()
# 创建一对管道端点,子进程可以向父进程发送就绪信号,父进程等待信号
reader, writer = context.Pipe(duplex=False)
# 创建死亡管道,父进程可以通知子进程自己退出
death_reader, death_writer = context.Pipe(duplex=False)

process_kwargs = {
"vllm_config": vllm_config,
"local_rank": local_rank,
"rank": rank, # rank信息
"distributed_init_method": distributed_init_method, # 分布式初始化地址
"input_shm_handle": input_shm_handle, # 输入共享内存句柄
"ready_pipe": (reader, writer), # 就绪管道*2
"death_pipe": death_reader,
"shared_worker_lock": shared_worker_lock, # 跨进程共享锁 context.lock()
}
# Run EngineCore busy loop in background process.
# 创建子进程对象
proc = context.Process(
target=WorkerProc.worker_main, # 目标函数
kwargs=process_kwargs,
name=f"VllmWorker-{rank}",
daemon=True, # 父进程退出时子进程终止
)

proc.start() # 启动子进程
writer.close()

# 返回未就绪的句柄,共上一层等待所有worker完成初始化
return UnreadyWorkerProcHandle(proc, rank, reader, death_writer)

# 用于Multiprocexecutor等待所有worker初始化完成
@staticmethod
def wait_for_ready(
unready_proc_handles: list[UnreadyWorkerProcHandle],
) -> list[WorkerProcHandle]:
# ...
# Extract the message queue handle.
worker_response_mq = MessageQueue.create_from_handle(
response["handle"], 0
)
ready_proc_handles[unready_proc_handle.rank] = (
WorkerProcHandle.from_unready_handle(
unready_proc_handle, worker_response_mq
)
)
# ...
# 返回列表按rank索引填充,保证后续代码按固定顺序访问
return cast(list[WorkerProcHandle], ready_proc_handles)

make_worker_process(...)中,创建子进程proc = context.Process(target=WorkerProc.worker_main, kwargs=process_kwargs, ...)一旦子进程启动,会执行worker.main函数,从中调用worker.__init__()完成子进程侧的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# /vllm/vllm/v1/executor/multiproc_executor.py > WorkerProc
class WorkerProc:
def __init__(
...
):
self.rank = rank
# 创建WorkerWrapperBase
wrapper = WorkerWrapperBase(vllm_config=vllm_config, rpc_rank=rank)
# 按照wrapper.init_worker构建真实worker,完成分布式相关准备工作
all_kwargs: list[dict] = [
{} for _ in range(vllm_config.parallel_config.world_size)
]
is_driver_worker = rank % vllm_config.parallel_config.tensor_parallel_size == 0
all_kwargs[rank] = {
"vllm_config": vllm_config,
"local_rank": local_rank,
"rank": rank,
"distributed_init_method": distributed_init_method,
"is_driver_worker": is_driver_worker,
}
# 构建真实的worker,完成分布式相关准备工作
wrapper.init_worker(all_kwargs)
self.worker = wrapper

# Initialize device
self.worker.init_device()
# Load model
self.worker.load_model()

@staticmethod
def worker_main(*args, **kwargs):
"""Worker initialization and execution loops.
This runs a background process"""

shutdown_requested = False
# ...
# 安装信号处理器:捕获SIGTERM/SIGINT
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)

worker = None
# 解析并准备管道与退出事件
reader, ready_writer = kwargs.pop("ready_pipe")
death_pipe = kwargs.pop("death_pipe", None)
shutdown_event = threading.Event()
# ...
try:
reader.close() # 关闭reader

# 初始化worker对象
worker = WorkerProc(*args, **kwargs)
# 当模型与环境就绪后,通知父进程
ready_writer.send(
{
"status": WorkerProc.READY_STR,
"handle": worker.worker_response_mq.export_handle(),
}
)
# 等待消息队列就绪
worker.rpc_broadcast_mq.wait_until_ready()
worker.worker_response_mq.wait_until_ready()
ready_writer.close()
ready_writer = None
# 进入忙循环:持续从rpc_broadcast_mq读取,执行对应方法,并根据output_rank决定是否发送输出到worker_response_mq
worker.worker_busy_loop(cancel=shutdown_event)
# 异常处理并退出
except Exception:
# ...

WorkerWrapperBase类

WorkProc对象的静态方法worker_main中,初始化WorkerProc对象的时调用__init__方法,其中代码wrapper = WorkerWrapperBase(vllm_config=vllm_config, rpc_rank=rank)会实例化__wrapper__对象。

而后调用wrapper.init_worker(all_kwargs)构建真实的worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# /vllm/vllm/v1/worker/worker_base.py
class WorkerWrapperBase:
"""
这个类代表了executor中的一个process,记录了worker module以及class name
并最终在init_worker中完成worker创建
"""
def __init__(
self,
vllm_config: VllmConfig,
rpc_rank: int = 0,
) -> None:

self.rpc_rank = rpc_rank
self.worker: WorkerBase | None = None # 在init_worker中填充
self.vllm_config: VllmConfig | None = None # 在init_worker中填充

if vllm_config.model_config is not None:
trust_remote_code = vllm_config.model_config.trust_remote_code
if trust_remote_code:
# 延迟导入,避免在分布式/设备未完全初始化前提前加载重资源库,减少初始化副作用
from vllm.utils import init_cached_hf_modules
# 对可能的远程代码模块进行初始化/缓存准备
init_cached_hf_modules()

# 获取配置参数并用当前配置的参数实例化最终worker
def init_worker(self, all_kwargs: list[dict[str, Any]]) -> None:
kwargs = all_kwargs[self.rpc_rank]
self.vllm_config = kwargs.get("vllm_config") # 放入vllm_config参数
assert self.vllm_config is not None, (
"vllm_config is required to initialize the worker"
)
# 开启函数调用追踪(重点关注!)
enable_trace_function_call_for_thread(self.vllm_config)

from vllm.plugins import load_general_plugins
# 加载通用插件
load_general_plugins()
# 要求worker_cls类为字符串
# resolve_obj_by_qualname将其解析为类对象
if isinstance(self.vllm_config.parallel_config.worker_cls, str):
worker_class = resolve_obj_by_qualname(
self.vllm_config.parallel_config.worker_cls
)
# ...
# 注入扩展类
if self.vllm_config.parallel_config.worker_extension_cls:
worker_extension_cls = resolve_obj_by_qualname(
self.vllm_config.parallel_config.worker_extension_cls
)
# ...
# 设置配置上下文并实例化
with set_current_vllm_config(self.vllm_config):
# 实例化真实Worker
# worker_class的获取通过 resolve_obj_by_qualname
self.worker = worker_class(**kwargs)
assert self.worker is not None
# 将所有未知属性转发到worker中,所以wrapper.load_model实际上执行的是worker.load_model
def __getattr__(self, attr):
return getattr(self.worker, attr)

resolve_obj_by_qualname函数获取worker_class,代码为worker_class = resolve_obj_by_qualname(self.vllm_config.parallel_config.worker_cls)传入的self.vllm_config.parallel_config.worker_clsworker_cls的类型为str

1
2
3
4
5
6
7
8
# /vllm/vllm/utils/__init__.py
def resolve_obj_by_qualname(qualname: str) -> Any:
"""
obj为字符串路径,默认截取最后一个/后的内容为module name
"""
module_name, obj_name = qualname.rsplit(".", 1)
module = importlib.import_module(module_name)
return getattr(module, obj_name)

这里self.vllm_config.parallel_config.worker_cls来自于vllm_configLLMEnginezaizai,在LLMEngine的from_engine_args函数中被加载,vllm_config通过 EngineArgs.create_engine_config() 构造出的 VllmConfig ,随后一路传递到 WorkerProc ,再注入到 WorkerWrapperBase.init_worker()

  • 对于cuda/ROCm上,worker_cls = vllm.v1.worker.gpu_worker.Worker(/vllm/vllm/platforms/cuda.py)

所以这里得到的worker_cls为gpu_worker.Worker

1
2
3
# in vllm/vllm/platform/cuda.py
if parallel_config.worker_cls == "auto":
parallel_config.worker_cls = "vllm.v1.worker.gpu_worker.Worker"

Worker(gpu_worker版本)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# in vllm/vllm/vq/gpu_worker.py
class Worker(WorkerBase):
# GPU后端的工作进程Worker的构造函数,完成本进程级的轻量初始化,复用WorkerBase初始化
def __init__(
self,
vllm_config: VllmConfig,
local_rank: int,
rank: int,
distributed_init_method: str,
is_driver_worker: bool = False, # 是否为driver_worker rank0
):
# 复用WorkerBase初始化,代码见下方
super().__init__(
vllm_config=vllm_config,
local_rank=local_rank,
rank=rank,
distributed_init_method=distributed_init_method,
is_driver_worker=is_driver_worker,
)
# 远程代码预热
if self.model_config.trust_remote_code:
# 延迟导入
from vllm.utils import init_cached_hf_modules

init_cached_hf_modules()

# 睡眠缓冲区,在进入睡眠模式前保留必要的张量缓冲,便于唤醒时恢复
self._sleep_saved_buffers: dict[str, torch.Tensor] = {}

# 环境变量决定是否开启性能分析器
if envs.VLLM_TORCH_PROFILER_DIR:
torch_profiler_trace_dir = envs.VLLM_TORCH_PROFILER_DIR
# ...
self.profiler = torch.profiler.profile(
activities=[
torch.profiler.ProfilerActivity.CPU,
torch.profiler.ProfilerActivity.CUDA,
],
record_shapes=envs.VLLM_TORCH_PROFILER_RECORD_SHAPES,
profile_memory=envs.VLLM_TORCH_PROFILER_WITH_PROFILE_MEMORY,
with_stack=envs.VLLM_TORCH_PROFILER_WITH_STACK,
with_flops=envs.VLLM_TORCH_PROFILER_WITH_FLOPS,
on_trace_ready=torch.profiler.tensorboard_trace_handler(
torch_profiler_trace_dir, use_gzip=True
),
)
else:
self.profiler = None

def init_device(self):
if self.device_config.device.type == "cuda":
# ...
init_worker_distributed_environment(
self.vllm_config,
self.rank,
self.distributed_init_method,
self.local_rank,
current_platform.dist_backend,
)
self.model_runner: GPUModelRunner = GPUModelRunner(
self.vllm_config, self.device
)
if self.rank == 0:
# If usage stat is enabled, collect relevant info.
report_usage_stats(self.vllm_config)

# WorkProc.__init__()调用
def load_model(self) -> None:
# 环境变量开关
eep_scale_up = os.environ.get("VLLM_ELASTIC_EP_SCALE_UP_LAUNCH") == "1"
# 内存池上下文
with self._maybe_get_memory_pool_context(tag="weights"):
# 加载模型
self.model_runner.load_model(eep_scale_up=eep_scale_up)

GPUModelRunner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# vllm/vllm/v1/worker/gpu_model_runner.py
class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
def __init__(
self,
vllm_config: VllmConfig,
device: torch.device,
):
# ...

def load_model(self, eep_scale_up: bool = False) -> None:
"""
Args:
eep_scale_up: the model loading is for elastic EP scale up.
"""
logger.info("Starting to load model %s...", self.model_config.model)
if eep_scale_up:
# ep情况
# ...

with DeviceMemoryProfiler() as m:
time_before_load = time.perf_counter()
# 根据模型类型选择正确的加载器
model_loader = get_model_loader(self.load_config)
logger.info("Loading model from scratch...")
# 把模型权重加载到目标设备
self.model = model_loader.load_model(
vllm_config=self.vllm_config, model_config=self.model_config
)
# 注入lora权重
if self.lora_config:
self.model = self.load_lora_model(
self.model, self.vllm_config, self.device
)
if hasattr(self, "drafter"):
logger.info("Loading drafter model...")
self.drafter.load_model(self.model)
# EAGLE3辅助层
if self.use_aux_hidden_state_outputs:
# ...
aux_layers = self._get_eagle3_aux_layers_from_config()
if aux_layers:
logger.info(
"Using auxiliary layers from speculative config: %s",
aux_layers,
)
else:
aux_layers = self.model.get_eagle3_aux_hidden_state_layers()

self.model.set_aux_hidden_state_layers(aux_layers)
time_after_load = time.perf_counter()
# 记录加载期显存消耗,日志答应GiB和秒数
self.model_memory_usage = m.consumed_memory
logger.info(
"Model loading took %.4f GiB and %.6f seconds",
self.model_memory_usage / GiB_bytes,
time_after_load - time_before_load,
)
# 准备分布式通信缓冲
prepare_communication_buffer_for_model(self.model)
# 多模态剪枝开关
self.is_multimodal_pruning_enabled = (
supports_multimodal_pruning(self.model)
and self.model_config.multimodal_config.is_multimodal_pruning_enabled()
)
# 如果模型是mixture of expert且启用EPLB
if is_mixture_of_experts(self.model) and self.parallel_config.enable_eplb:
logger.info("EPLB is enabled for model %s.", self.model_config.model)
self.eplb_state = EplbState.build(
self.model,
self.device,
self.parallel_config,
global_expert_load,
old_global_expert_indices,
rank_mapping,
)
# ...

这里加载模型用到的get_model_loader代码如下:

1
2
3
4
5
6
7
# vllm/vllm/model_executor/model_loader/__init__.py
def get_model_loader(load_config: LoadConfig) -> BaseModelLoader:
"""Get a model loader based on the load format."""
load_format = load_config.load_format
if load_format not in _LOAD_FORMAT_TO_MODEL_LOADER:
raise ValueError(f"Load format `{load_format}` is not supported")
return _LOAD_FORMAT_TO_MODEL_LOADER[load_format](load_config)

而后加载模型的代码self.model = model_loader.load_model(vllm_config=self.vllm_config, model_config=self.model_config)对应的load_model函数代码如下。该函数会加载模型,得到一个nn.Module对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# vllm/vllm/model_executor/model_loader/base_loader.py > BaseModelLoader
def load_model(
self, vllm_config: VllmConfig, model_config: ModelConfig
) -> nn.Module:
"""Load a model with the given configurations."""
# 优选读取load_config.device,获取target_device
device_config = vllm_config.device_config
load_config = vllm_config.load_config
load_device = (
device_config.device if load_config.device is None else load_config.device
)
target_device = torch.device(load_device)
with set_default_torch_dtype(model_config.dtype):
# 在target_device上下文中调用initialize_model构建nn.Module
with target_device:
model = initialize_model(
vllm_config=vllm_config, model_config=model_config
)

logger.debug("Loading weights on %s ...", load_device)
# 加载权重
self.load_weights(model, model_config)
# 权重后处理
process_weights_after_loading(model, model_config, target_device)
# 将模块设置为评估模式并返回,关闭dropout等训练态行为
return model.eval()

而后在load_model函数中会进行编译器选择

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# vllm/vllm/v1/worker/gpu_model_runner.py
class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
def __init__(
self,
vllm_config: VllmConfig,
device: torch.device,
):
def load_model(self, eep_scale_up: bool = False) -> None:
"""
Args:
eep_scale_up: the model loading is for elastic EP scale up.
"""

# ...

# DYNAMO_AS_IS参数表示“按原样”用Pytorch Dynamo对模型做全图编译,不额外进行CUDA graph包装
# support_dynamo用于检查环境是否支持dynamo编译
if ( # Dynamo编译并返回,把模型交给Pytorch Dynamo做全图编译,后端通常由配置决定,然后直接return
self.vllm_config.compilation_config.level == CompilationLevel.DYNAMO_AS_IS
and supports_dynamo()
):
# init_backend,return VLLMBackend(vllm_config)
backend = self.vllm_config.compilation_config.init_backend(self.vllm_config)
compilation_counter.dynamo_as_is_count += 1
self.model.compile(fullgraph=True, backend=backend)
return
if (
self.compilation_config.cudagraph_mode.has_full_cudagraphs()
and not self.parallel_config.enable_dbo
): # 全量cuda graph包装(未启用DBO)
self.model = CUDAGraphWrapper(
self.model, self.vllm_config, runtime_mode=CUDAGraphMode.FULL
)
elif self.parallel_config.enable_dbo: # 启用DBO
# 启用DBO的UBatch包装
if self.compilation_config.cudagraph_mode.has_full_cudagraphs():
self.model = UBatchWrapper(
self.model, self.vllm_config, CUDAGraphMode.FULL, self.device
)
else:
self.model = UBatchWrapper(
self.model, self.vllm_config, CUDAGraphMode.NONE, self.device
)
  • Dynamo 编译:能进行算子融合与图级优化,适合较稳定的执行图;首次编译有编译开销,对高度动态形状可能收益不明显。
  • FULL CUDA Graph:极大降低每次推理的 CPU 开销与抖动,但要求输入/内存地址较稳定;vLLM 通过自有调度尽量满足。
  • UBatch + 可选 CUDA Graph:在更动态的并发/批次场景中维持较好的可控性与性能,适合开启 DBO 的场合。

对于backend = self.vllm_config.compilation_config.init_backend(self.vllm_config),会进行后端初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# vllm/vllm/config/compilation.py > CompilationConfig		
def init_backend(self, vllm_config: "VllmConfig") -> Union[str, Callable]:
if self.level == CompilationLevel.NO_COMPILATION:
raise ValueError("No compilation level is set.")

from torch._dynamo.backends.registry import list_backends

torch_backends = list_backends(exclude_tags=tuple())
if self.level in [CompilationLevel.DYNAMO_AS_IS, CompilationLevel.DYNAMO_ONCE]:
if self.backend == "":
return "eager"
if self.backend in torch_backends:
return self.backend
return resolve_obj_by_qualname(self.backend)

# TODO: pass user-specified backend to piecewise compilation
# merge with the config use_inductor
assert self.level == CompilationLevel.PIECEWISE

from vllm.compilation.backends import VllmBackend

return VllmBackend(vllm_config)

Qwen使用的是__Dynamo编译__,所以会执行这一步,init 一个backend对象

VLLMBackend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class VllmBackend:
"""
vllmbackend是vllm自定义的torch.compile后端,仅在 CompilationLevel.PIECEWISE 下使用
它接收 Dynamo 生成的 fx.GraphModule
按配置切分成若干子图进行编译,并返回一个可调用的拼接图(或带输入复制包装的可调用函数)
"""

vllm_config: VllmConfig
compilation_config: CompilationConfig
_called: bool = False
# the graph we compiled
graph: fx.GraphModule
# the stiching graph module for all the piecewise graphs
split_gm: fx.GraphModule
piecewise_graphs: list[SplitItem]
returned_callable: Callable
# Inductor passes to run on the graph pre-defunctionalization
post_grad_passes: Sequence[Callable]
sym_tensor_indices: list[int]
input_buffers: list[torch.Tensor]
compiler_manager: CompilerManager

def __init__(
self,
vllm_config: VllmConfig,
prefix: str = "",
):
self.prefix = prefix or model_tag # 设置前缀,用于区分不同的子模型

# 初始化后处理pass管理器
self.post_grad_pass_manager = PostGradPassManager()
# 准备运行期需要的成员,用于 cudagraph 模式下“输入复制”的索引与静态缓冲。
self.sym_tensor_indices = []
self.input_buffers = []
# 保存配置对象
self.vllm_config = vllm_config
self.compilation_config = vllm_config.compilation_config
# 构建编译器管理器
self.compiler_manager: CompilerManager = CompilerManager(
self.compilation_config
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def enable_trace_function_call_for_thread(vllm_config: VllmConfig) -> None:
"""Set up function tracing for the current thread,
if enabled via the VLLM_TRACE_FUNCTION environment variable
"""

if envs.VLLM_TRACE_FUNCTION:
tmp_dir = tempfile.gettempdir()
# add username to tmp_dir to avoid permission issues
tmp_dir = os.path.join(tmp_dir, getpass.getuser())
filename = (
f"VLLM_TRACE_FUNCTION_for_process_{os.getpid()}"
f"_thread_{threading.get_ident()}_"
f"at_{datetime.datetime.now()}.log"
).replace(" ", "_")
log_path = os.path.join(
tmp_dir, "vllm", f"vllm-instance-{vllm_config.instance_id}", filename
)
os.makedirs(os.path.dirname(log_path), exist_ok=True)
enable_trace_function_call(log_path)
  • 预推理(预热/捕获)从 gpu_worker.py 发起,主要通过两条路径完成:
    1. 内存与编译的预热(profile_run_dummy_run);
    2. CUDA Graph 的捕获(capture_model_capture_cudagraphs_dummy_run 触发捕获)。

first_run_finished 的运行起点

  • PiecewiseBackend 中同名变量的真实使用:
    • 初始化:self.first_run_finished = Falsevllm/vllm/compilation/piecewise_backend.py:59)。
    • 首次调用:__call__ 的第一次执行把它置为 True,并调用 check_for_ending_compilation(),随后返回“一般形状”的已编译可运行体(compiled_graph_for_general_shape)(piecewise_backend.py:89-97)。
    • 后续调用:按 runtime_shape 选择或触发静态形状编译,并在全部必需形状编译结束后再次调用 check_for_ending_compilation()piecewise_backend.py:101-120)。
  • 总结:first_run_finished 的“开始运行”发生在 PiecewiseBackend.__call__ 的第一次执行;在 CUDAGraphWrapper 中该标志目前未被使用。

预推理/预热整体流程(从 gpu_worker.py 出发)

  • 设备与模型加载

    • Worker.init_device 设置本地 GPU 设备(见 gpu_worker.py)。
    • Worker.load_modelGPUModelRunner.load_model 加载模型,并按配置包裹:
      • CompilationLevel.DYNAMO_AS_IS,直接 self.model.compile(fullgraph=True, backend=...)gpu_model_runner.py:2866-2876)。
      • 其他编译级别:根据 cudagraph_mode 和是否启用 DBO,包裹为 CUDAGraphWrapper(FULL)或 UBatchWrappergpu_model_runner.py:2897-2916)。
  • 内存探测与首次预热

    • Worker.determine_available_memory 调用 GPUModelRunner.profile_run() 做一次完整的 dummy 前向以测峰值显存(gpu_worker.py:236-310)。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      # vllm/vllm/v1/worker/gpu_worker.py > Worker		
      @torch.inference_mode()
      def determine_available_memory(self) -> int:
      """Profiles the peak memory usage of the model to determine how much
      memory can be used for KV cache without OOMs.

      The engine will first conduct a profiling of the existing memory usage.
      Then, it calculates the free memory that can be used for KV cache in
      bytes.

      Tip:
      You may limit the usage of GPU memory
      by adjusting the `gpu_memory_utilization` parameter.
      """
      self.model_runner.profile_run()
      msg = (
      f"Initial free memory {GiB(self.init_snapshot.free_memory):.2f} "
      f"GiB, reserved {GiB(kv_cache_memory_bytes):.2f} GiB memory for ...")
    • GPUModelRunner.profile_run 内部执行 _dummy_run(self.max_num_tokens, is_profile=True)

      1
      2
      3
      4
      5
      6
      7
      # vllm/vllm/v1/worker/gpu_model_runner.py > GPUModelRunner
      def profile_run(self) -> None:
      # Profile with multimodal encoder & encoder cache.
      # Add `is_profile` here to pre-allocate communication buffers
      hidden_states, last_hidden_states = self._dummy_run(
      self.max_num_tokens, is_profile=True
      )
      • 通过 set_forward_context(..., cudagraph_runtime_mode=CUDAGraphMode.NONE, ...) 注入上下文,确保不触发 CUDA Graph 捕获(gpu_model_runner.py:3664-3668forward_context.py:268-306)。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      @torch.inference_mode()
      def _dummy_run(
      self,
      num_tokens: int,
      cudagraph_runtime_mode: Optional[CUDAGraphMode] = None,
      force_attention: bool = False,
      uniform_decode: bool = False,
      allow_microbatching: bool = True,
      skip_eplb: bool = False,
      is_profile: bool = False,
      create_mixed_batch: bool = False,
      remove_lora: bool = True,
      ) -> tuple[torch.Tensor, torch.Tensor]:
      """
      Run a dummy forward pass to warm up/profile run or capture the
      CUDA graph for the model.
      """
      # ...
      with (
      self.maybe_randomize_inputs(input_ids),
      set_forward_context(
      attn_metadata,
      self.vllm_config,
      num_tokens=num_tokens_after_padding,
      num_tokens_across_dp=num_tokens_across_dp,
      cudagraph_runtime_mode=cudagraph_runtime_mode,
      batch_descriptor=batch_descriptor,
      ubatch_slices=ubatch_slices,
      ),
      ):
      outputs = self.model(
      input_ids=input_ids,
      positions=positions,
      intermediate_tensors=intermediate_tensors,
      inputs_embeds=inputs_embeds,
      **model_kwargs,
      )
      # ...

      • 若是最后 PP rank,随后预热 pooler 或 sampler(gpu_model_runner.py:3669-3687)。
  • 分配 KV Cache

    • Worker.initialize_cache 按探测结果分配 KV Cache(gpu_worker.py:162-180)。

      1
      2
      3
      def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None:
      self.cache_config.num_gpu_blocks = num_gpu_blocks
      self.cache_config.num_cpu_blocks = num_cpu_blocks
  • 编译与 CUDA Graph 预热/捕获

    • Worker.compile_or_warm_up_model 负责正式推理前的全部预热与捕获(gpu_worker.py:338-460):

      1
      2
      3
      4
      def compile_or_warm_up_model(self) -> None:
      # warm up sizes that are not in cudagraph capture sizes,
      # but users still want to compile for better performance,
      # e.g. for the max-num-batched token size in chunked prefill.
      • 预热编译尺寸:对 compile_sizes 中不参与 CUDA Graph 捕获的尺寸,逐一调用 self.model_runner._dummy_run(size, skip_eplb=True, remove_lora=False),以触发编译与算子预热(gpu_worker.py:351-357
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      warmup_sizes = self.vllm_config.compilation_config.compile_sizes.copy()
      if not self.model_config.enforce_eager:
      warmup_sizes = [
      x
      for x in warmup_sizes
      if x not in self.vllm_config.compilation_config.cudagraph_capture_sizes
      ]
      # We skip EPLB here since we don't want to record dummy metrics
      for size in sorted(warmup_sizes, reverse=True):
      logger.info("Compile and warming up model for size %d", size)
      self.model_runner._dummy_run(size, skip_eplb=True, remove_lora=False)
      self.model_runner.maybe_remove_all_loras(self.model_runner.lora_config)
      • 内核预热:kernel_warmup(self)gpu_worker.py:360)。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        # vllm/vllm/model_executor/warmup/kernel_warmup.py
        def kernel_warmup(worker: "Worker"):
        # Deep GEMM warmup
        do_deep_gemm_warmup = (
        envs.VLLM_USE_DEEP_GEMM
        and is_deep_gemm_supported()
        and not envs.VLLM_SKIP_DEEP_GEMM_WARMUP
        )
        if do_deep_gemm_warmup:
        model = worker.get_model()
        max_tokens = worker.scheduler_config.max_num_batched_tokens
        deep_gemm_warmup(model, max_tokens)

        # FlashInfer autotune for Hopper (SM 9.0) and Blackwell (SM 10.0) GPUs
        if has_flashinfer() and current_platform.has_device_capability(90):
        flashinfer_autotune(worker.model_runner)

        # FlashInfer attention warmup
        # Only warmup if the model has FlashInfer attention groups
        # and is not a pooling model
        def _is_flashinfer_backend(backend):
        try:
        return backend.get_name() == "FLASHINFER"
        except NotImplementedError:
        return False

        if not worker.model_runner.is_pooling_model and all(
        _is_flashinfer_backend(group.backend)
        for groups in worker.model_runner.attn_groups
        for group in groups
        ):
        logger.info("Warming up FlashInfer attention.")
        # Warmup with mixed batch containing both prefill and decode tokens
        # This is to warm up both prefill and decode attention kernels
        worker.model_runner._dummy_run(
        num_tokens=16,
        skip_eplb=True,
        is_profile=True,
        force_attention=True,
        create_mixed_batch=True,
        )
      • CUDA Graph 捕获:若不强制 eager,则调用 self.model_runner.capture_model()gpu_worker.py:363)。

        1
        2
        3
        cuda_graph_memory_bytes = 0
        if not self.model_config.enforce_eager:
        cuda_graph_memory_bytes = self.model_runner.capture_model()
        • GPUModelRunner.capture_model 会:

          • 初始化并解析可用的 CUDA Graph 模式与 keys(initialize_cudagraph_capturegpu_model_runner.py:3928-3995)。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          # vllm/vllm/v1/worker/gpu_model_runner.py
          def capture_model(self) -> int:
          # 初始化并解析可用的CUDA Graph模式与keys
          if self.compilation_config.cudagraph_mode == CUDAGraphMode.NONE:
          logger.warning(
          "Skipping CUDA graph capture. To turn on CUDA graph capture, "
          "ensure `cudagraph_mode` was not manually set to `NONE`"
          )
          return 0
          else:
          self.initialize_cudagraph_capture()
          • 开启捕获上下文,按配置的 batch sizes 运行 _capture_cudagraphs(...)gpu_model_runner.py:3700-3775)。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          set_cudagraph_capturing_enabled(True)  # 开启捕获上下文
          with freeze_gc(), graph_capture(device=self.device):
          start_free_gpu_memory = torch.cuda.mem_get_info()[0]
          cudagraph_mode = self.compilation_config.cudagraph_mode
          assert cudagraph_mode is not None
          if cudagraph_mode.mixed_mode() != CUDAGraphMode.NONE:
          cudagraph_runtime_mode = cudagraph_mode.mixed_mode()

          compilation_cases=list(reversed(self.cudagraph_batch_sizes))
          self._capture_cudagraphs( # 运行
          compilation_cases,
          cudagraph_runtime_mode=cudagraph_runtime_mode,
          uniform_decode=False,
          )
        • _capture_cudagraphs 对每个 num_tokens

          • 先做若干次“非捕获”的预热:_dummy_run(..., cudagraph_runtime_mode=NONE, force_attention=FULL)gpu_model_runner.py:3799-3813)。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          39
          40
          41
          42
          43
          44
          45
          # vllm/vllm/v1/worker/gpu_model_runner.py
          def _capture_cudagraphs(
          self,
          compilation_cases: list[int],
          cudagraph_runtime_mode: CUDAGraphMode,
          uniform_decode: bool,
          ):
          # Only rank 0 should print progress bar during capture
          if is_global_first_rank():
          compilation_cases = tqdm(
          compilation_cases,
          disable=not self.load_config.use_tqdm_on_load,
          desc="Capturing CUDA graphs ({}, {})".format(
          "decode" if uniform_decode else "mixed prefill-decode",
          cudagraph_runtime_mode.name,
          ),
          )
          # We skip EPLB here since we don't want to record dummy metrics
          for num_tokens in compilation_cases:
          for _ in range(self.compilation_config.cudagraph_num_of_warmups):
          # Use CUDAGraphRuntimeStyle.NONE (default) for warmup.
          # But be careful, warm up with `NONE`is orthogonal to
          # if we want to warm up attention or not. This is
          # different from the case where `FULL` implies capture
          # attention while `PIECEWISE` implies no attention.
          force_attention = cudagraph_runtime_mode==CUDAGraphMode.FULL
          self._dummy_run(
          num_tokens,
          cudagraph_runtime_mode=CUDAGraphMode.NONE,
          force_attention=force_attention,
          uniform_decode=uniform_decode,
          allow_microbatching=allow_microbatching,
          skip_eplb=True,
          remove_lora=False,
          )
          # 捕获运行
          self._dummy_run(
          num_tokens,
          cudagraph_runtime_mode=cudagraph_runtime_mode, # PIECEWISE
          uniform_decode=uniform_decode,
          allow_microbatching=allow_microbatching,
          skip_eplb=True,
          remove_lora=False,
          )
          self.maybe_remove_all_loras(self.lora_config)
          • 再做一次“捕获运行”:_dummy_run(..., cudagraph_runtime_mode=FULL/PIECEWISE, ...)gpu_model_runner.py:3814-3823)。
        • 在该“捕获运行”里:

          • _dummy_run 会设置 forward_contextcudagraph_runtime_modebatch_descriptor(来自 CudagraphDispatcher.dispatch)(gpu_model_runner.py:3476-3497forward_context.py:268-306cudagraph_dispatcher.py:89-133)。
        • self.model(...) 实际进入 CUDAGraphWrapper.__call__(或 UBatchWrapper)。当 entry.cudagraph is None 时,触发真正的捕获:创建 torch.cuda.CUDAGraph,在 torch.cuda.graph(...) 中执行一次模型前向并保存弱引用输出,随后将图挂到该 BatchDescriptor 的缓存项上(cuda_graph.py:112-166)。

      • 采样器/池化器预热:在最后 PP rank 上再做一次 _dummy_run(..., skip_eplb=True),随后 _dummy_sampler_run_dummy_pooler_run 以预先分配日志与采样缓存,减少碎片化(gpu_worker.py:420-452)。

      • 复位随机种子,避免预热影响正式推理(gpu_worker.py:457-460)。

_dummy_run函数

_dummy_run 自身不直接调用 torch.cuda.graph ;根据形状与模式构造好 ForwardContext ,然后调用 self.model(...) ;真正的 CUDA Graph 创建与重放发生在模型的包装器( CUDAGraphWrapperUBatchWrapper )里,且仅在 cudagraph_runtime_modeFULL/PIECEWISE 且该形状尚未捕获时进行首捕获,之后相同形状走 replay()

捕获与重放的触发点

  • 真实的 torch.cuda.CUDAGraph() 捕获/重放不在 _dummy_run 内部,而在 self.model(...) 的外层包装器:

    • 非 DBO 时, self.model 常被替换为 CUDAGraphWrapper(runtime_mode=FULL 或 PIECEWISE) 。
    • DBO 微批场景下, self.model 可能是 UBatchWrapper (可捕获两段 ubatch 的组合图)。
  • set_forward_context 里设置的 cudagraph_runtime_mode 为 FULL 或 PIECEWISE ,且 batch_descriptor 为可捕获的形状时:

    • 首次前向:包装器检测到该形状尚未捕获,进入 torch.cuda.graph(…) 作用域完成一次前向并缓存图。
    • 后续相同形状:包装器直接 replay() 。
  • 以下情况不会触发捕获:

    • cudagraph_runtime_mode=NONE (例如 is_profile=True 或强制预热)。
    • 包装器模式与运行期模式不匹配。
    • 调度器返回 NONE (该形状未在键集合中或配置不支持)。
    • attn_metadata 指示需要计算 KV scales 等导致强制 NONE (在常规路径中会做此回退)。
      常见变体与影响
  • uniform_decode=True :倾向捕获纯解码优化路径;同时影响 BatchDescriptor 和图键匹配。

  • create_mixed_batch=True :用于捕获混合 prefill+decode 的路径,prefill 的序列较短以提速。

  • allow_microbatching=True 且满足阈值:可能走 UBatchWrapper ,捕获针对 num_tokens 的微批组合图。

  • force_attention=True :即使运行模式不是 FULL,也会构造注意力元数据用于预热后端(不一定捕获图)。
    这就是 _dummy_run 从入参解析、批次构造、上下文注入,到触发模型包装器完成 CUDA Graph 捕获/重放的完整流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# vllm/vllm/v1/worker/gou_model_runner.py > GPUModelRunner
@torch.inference_mode()
def _dummy_run(
self,
num_tokens: int, # 本次假前向总token数
# 运行期模式
# None则表示不使用图(纯预热/剖析),PIECEWISE表示分片图,FULL表示完整图
cudagraph_runtime_mode: Optional[CUDAGraphMode] = None,
# 即使不是FULL模式,也强制构建注意力元数据(用于预热注意力后端)
force_attention: bool = False,
# 构造“所有请求query_len相同“的解码批;通常为纯解码或推测解码
uniform_decode: bool = False,
# 允许把大批次分成微批,用于DBO
allow_microbatching: bool = True,
# 跳过eplb状态更新(默认不跳过)
skip_eplb: bool = False,
# 标记为剖析运行,强制禁用图
is_profile: bool = False,
# 构造混合批(前半是解码,后半是一个prefill)
create_mixed_batch: bool = False,
# 假前向后是否清理临时lora
remove_lora: bool = True,
) -> tuple[torch.Tensor, torch.Tensor]:
"""
执行一次“假前向”(没有真实请求)已完成预热、性能剖析或出触发cuda graph捕获。
它不直接调用 torch.cuda.graph ,而是把捕获所需的上下文参数注入到一次模型前向中,由模型外层包装器在那次前向里完成捕获或重放。
* 当 cudagraph_mode.decode_mode() == FULL 且 separate_routine() 为真时,系统会为两类批次分别维护/捕获不同的 CUDA Graph:
1. prefill+decode批次: prefill(多个tokens)+decode(1个token)
2. decode only批次:所有请求的 query 长度完全一致(通常是纯解码或推测解码)。
统一解码批次”有两种常见形态:
- 纯解码:每个请求本步只处理 1 个 token( max_query_len == 1 )。
- 推测解码:每个请求本步处理 1 + num_spec_decode_tokens 个 token。
seperate_routine()表示-“uniform-decode”和“混合 prefill+decode”两类批次在 FULL 图下用不同的执行例程/图
* 将 max_query_len 设为 1 时,会切换并捕获 FA2 的纯解码优化例程(FlashDecode + 针对 GQA/MQA 的优化),这条路径比通用注意力更快,适合纯解码场景。

"""
assert ( # 检验cudagraph_runtime_mode是否合法
cudagraph_runtime_mode is None
or cudagraph_runtime_mode.valid_runtime_modes()
)

# 当 cudagraph_mode.decode_mode() == FULL 且 separate_routine() 为真时,系统会为两类批次分别维护/捕获不同的 CUDA Graph
# prefill+decode批次: prefill(多个tokens)+decode(1个token)
# decode only批次:所有请求的 query 长度完全一致(通常是纯解码或推测解码)。
# 统一解码批次”有两种常见形态:
# - 纯解码:每个请求本步只处理 1 个 token( max_query_len == 1 )。
# - 推测解码:每个请求本步处理 1 + num_spec_decode_tokens 个 token。
# seperate_routine()表示-“uniform-decode”和“混合 prefill+decode”两类批次在 FULL 图下用不同的执行例程/图
# 将 max_query_len 设为 1 时,会切换并捕获 FA2 的纯解码优化例程(FlashDecode + 针对 GQA/MQA 的优化),这条路径比通用注意力更快,适合纯解码场景。

# When setting max_query_len = 1, we switch to and capture the optimized
# routine of FA2 for pure decode, i.e., Flashdecode + an optimization
# for GQA/MQA.

# 设定max_query_len
max_query_len = self.uniform_decode_query_len if uniform_decode else num_tokens


# 根据num_tokens和max_num_seqs计算请求数num_reqs
# 并构造每个请求的num_scheduled_tokens_list,保证总和为num_tokens
assert num_tokens <= self.scheduler_config.max_num_batched_tokens
max_num_reqs = self.scheduler_config.max_num_seqs
# 前半部分每个请求1 token(解码),后半部分一个请求承载剩余token(prefill)
if create_mixed_batch:
assert not uniform_decode
num_decode_tokens = min(max_num_reqs - 1, num_tokens // 2)
num_prefill_tokens = num_tokens - num_decode_tokens
num_reqs = num_decode_tokens + 1
# Create decode requests (1 token each) followed by prefill request
num_scheduled_tokens_list = [1] * num_decode_tokens + [num_prefill_tokens]
# Note: Overriding max_query_len to be the prefill tokens
max_query_len = num_prefill_tokens
# 每个请求的token数均为max_query_len,最后一个请求可能为余数
elif uniform_decode:
assert not create_mixed_batch
num_reqs = min(max_num_reqs, cdiv(num_tokens, max_query_len))
num_scheduled_tokens_list = [max_query_len] * num_reqs
if num_tokens % max_query_len != 0:
num_scheduled_tokens_list[-1] = num_tokens % max_query_len
# 否则,尽量平均分配token到各请求,最后一个请求承载余数
else:
num_reqs = min(num_tokens, max_num_reqs)
min_tokens_per_req = num_tokens // num_reqs
num_scheduled_tokens_list = [min_tokens_per_req] * num_reqs
num_scheduled_tokens_list[-1] += num_tokens % num_reqs

assert sum(num_scheduled_tokens_list) == num_tokens
assert len(num_scheduled_tokens_list) == num_reqs
num_scheduled_tokens = np.array(num_scheduled_tokens_list, dtype=np.int32)
total_num_scheduled_tokens = int(num_scheduled_tokens.sum())

# 调用coordinate_batch_across_dp
# 返回ubatch_slices(若需要微批次)和 num_tokens_across_dp(DP 下每卡的 padding 后 token 数)
ubatch_slices, num_tokens_across_dp = coordinate_batch_across_dp(
num_scheduled_tokens,
total_num_scheduled_tokens,
total_num_scheduled_tokens,
self.vllm_config.parallel_config,
allow_microbatching,
uniform_decode,
)
num_tokens_after_padding = num_tokens
# 若有 num_tokens_across_dp ,将 num_tokens_after_padding 设为本卡的 token 数。
# 所以这个函数实际上在每张卡上执行
if num_tokens_across_dp is not None:
num_tokens_after_padding = int(num_tokens_across_dp[0])

# 构建注意力元数据(按需)
attn_metadata: Optional[PerLayerAttnMetadata] = None

# attn_metadata 仅在 force_attention=True 或 cudagraph_runtime_mode==FULL 时构建:
if force_attention or cudagraph_runtime_mode == CUDAGraphMode.FULL:
attn_metadata = {}
# 为无微批构造一个“每层名到元数据”的字典,有微批则构造“每个微批一个字典”的列表。
if ubatch_slices is not None:
attn_metadata = [dict() for _ in range(len(ubatch_slices))]

if create_mixed_batch:
# In the mixed batch mode (used for FI warmup), we use
# shorter sequence lengths to run faster.
seq_lens = [1] * num_decode_tokens + [num_prefill_tokens + 1]
else:
seq_lens = max_query_len
self.seq_lens.np[:num_reqs] = seq_lens
self.seq_lens.np[num_reqs:] = 0
self.seq_lens.copy_to_gpu()

cum_num_tokens, _ = self._get_cumsum_and_arange(num_scheduled_tokens)
self.query_start_loc.np[1 : num_reqs + 1] = cum_num_tokens
self.query_start_loc.copy_to_gpu()
# 对每个KV缓存组装CommonAttentionMetadata
for kv_cache_group_id, kv_cache_group_spec in enumerate(
self.kv_cache_config.kv_cache_groups
):
common_attn_metadata = CommonAttentionMetadata(
query_start_loc=self.query_start_loc.gpu[: num_reqs + 1],
query_start_loc_cpu=self.query_start_loc.cpu[: num_reqs + 1],
seq_lens=self.seq_lens.gpu[:num_reqs],
seq_lens_cpu=self.seq_lens.cpu[:num_reqs],
num_computed_tokens_cpu=self.input_batch.num_computed_tokens_cpu_tensor[
:num_reqs
],
num_reqs=num_reqs,
num_actual_tokens=num_tokens,
max_query_len=max_query_len,
max_seq_len=self.max_model_len,
block_table_tensor=self.input_batch.block_table[
kv_cache_group_id
].get_device_tensor(num_reqs),
slot_mapping=self.input_batch.block_table[
kv_cache_group_id
].slot_mapping.gpu[:num_tokens],
causal=True,
dcp_local_seq_lens=self.dcp_local_seq_lens.gpu[:num_reqs]
if self.dcp_world_size > 1
else None,
)
# 对于每个注意力组用其metadata_builder生成用于CUDA Graph捕获的元数据,并按层名写入
for attn_group in self.attn_groups[kv_cache_group_id]:
if ubatch_slices is not None:
common_attn_metadata_list = split_attn_metadata(
ubatch_slices, common_attn_metadata
)
for ubid, common_attn_metadata in enumerate(
common_attn_metadata_list
):
assert common_attn_metadata.max_query_len == 1
attn_metadata_i = attn_group.get_metadata_builder(
ubatch_id=ubid
).build_for_cudagraph_capture(common_attn_metadata)
for layer_name in attn_group.layer_names:
assert type(attn_metadata) is list
attn_metadata[ubid][layer_name] = attn_metadata_i
else:
assert type(attn_metadata) is dict
metadata_builder = attn_group.get_metadata_builder()
attn_metadata_i = metadata_builder.build_for_cudagraph_capture(
common_attn_metadata
)
for layer_name in attn_group.layer_names:
attn_metadata[layer_name] = attn_metadata_i
# lora假前向上下文:
# 进入maybe_dummy_run_with_lora(...)上下文,允许为假批次暂时挂载lora
# 若remove_lora=True,结束后清理
with self.maybe_dummy_run_with_lora(
self.lora_config, num_scheduled_tokens, remove_lora
):
# Make sure padding doesn't exceed max_num_tokens
assert num_tokens_after_padding <= self.max_num_tokens
# 构造model_kwargs 基础参数(例如最大长度用到的缓存等)
model_kwargs = self._init_model_kwargs(num_tokens_after_padding)
# 选择输入:
if self.supports_mm_inputs and not self.model_config.is_encoder_decoder:
input_ids = None # 多模态且非编码-解码
inputs_embeds = self.inputs_embeds.gpu[:num_tokens_after_padding]
model_kwargs = {
**model_kwargs,
**self._dummy_mm_kwargs(num_reqs),
}
# enable_prompt_embeds=true,同样用imput_embeds
elif self.enable_prompt_embeds:
input_ids = None
inputs_embeds = self.inputs_embeds.gpu[:num_tokens_after_padding]
model_kwargs = self._init_model_kwargs(num_tokens_after_padding)
# 否则,使用input_ids
else:
input_ids = self.input_ids.gpu[:num_tokens_after_padding]
inputs_embeds = None
# 选择位置张量
if self.uses_mrope:
positions = self.mrope_positions.gpu[:, :num_tokens_after_padding]
else:
positions = self.positions.gpu[:num_tokens_after_padding]

# 准备中间张量:
if get_pp_group().is_first_rank:
intermediate_tensors = None
else: # 若不是pp首rank,确保intermediate_tensors 已创建并按本次 token 数做切分
if self.intermediate_tensors is None:
self.intermediate_tensors = (
self.model.make_empty_intermediate_tensors(
batch_size=self.max_num_tokens,
dtype=self.model_config.dtype,
device=self.device,
)
)

intermediate_tensors = self.sync_and_slice_intermediate_tensors(
num_tokens_after_padding, None, False
)

# filter out the valid batch descriptor
# 如果不是剖析运行用 cudagraph_dispatcher.dispatch 返回 _cg_mode, batch_descriptor
# 若传入了cudagraph_runtime_mode参数,允许强制 NONE(用于只预热不捕获)
# 否则必须与调度器返回的 _cg_mode 一致(断言校验)。
_cg_mode, batch_descriptor = (
self.cudagraph_dispatcher.dispatch(
BatchDescriptor(
num_tokens=num_tokens_after_padding,
uniform_decode=uniform_decode,
)
)
if not is_profile
else (CUDAGraphMode.NONE, None)
)
if cudagraph_runtime_mode is not None:
# we allow forcing NONE when the dispatcher disagrees to support
# warm ups for cudagraph capture
assert (
cudagraph_runtime_mode == CUDAGraphMode.NONE
or cudagraph_runtime_mode == _cg_mode
), (
f"Cudagraph runtime mode mismatch at dummy_run. "
f"Expected {_cg_mode}, but got {cudagraph_runtime_mode}."
)
# 若未传入:以 _cg_mode 为准
else:
cudagraph_runtime_mode = _cg_mode
# 微批修正
# 若存在 ubatch_slices :将 num_tokens_after_padding 改为首个微批的 token 数,并更新 num_tokens_across_dp
if ubatch_slices is not None:
num_tokens_after_padding = ubatch_slices[0].num_tokens
if num_tokens_across_dp is not None:
num_tokens_across_dp[:] = num_tokens_after_padding

# 注入前向上下文,进入set_forward_context(...)
# 注入attn_metadata 、 num_tokens 、 num_tokens_across_dp 、 cudagraph_runtime_mode 、 batch_descriptor 、 ubatch_slices
with (
self.maybe_randomize_inputs(input_ids),
set_forward_context(
attn_metadata,
self.vllm_config,
num_tokens=num_tokens_after_padding,
num_tokens_across_dp=num_tokens_across_dp,
cudagraph_runtime_mode=cudagraph_runtime_mode,
batch_descriptor=batch_descriptor,
ubatch_slices=ubatch_slices,
),
):
# 随后调用self.model(...)执行一次前向(这一步在包装器内决定是否捕获或重放)
outputs = self.model(
input_ids=input_ids,
positions=positions,
intermediate_tensors=intermediate_tensors,
inputs_embeds=inputs_embeds,
**model_kwargs,
)

# 处理输出与辅助路径
# use_aux_hidden_state_outputs=True,取前者
if self.use_aux_hidden_state_outputs:
hidden_states, _ = outputs
else:
hidden_states = outputs
# 若开启 Eagle 推测解码:
# 调用 drafter.dummy_run(...) ,是否用 cudagraphs 取决于 cudagraph_runtime_mode==PIECEWISE
if self.speculative_config and self.speculative_config.use_eagle():
assert isinstance(self.drafter, EagleProposer)
use_cudagraphs = cudagraph_runtime_mode == CUDAGraphMode.PIECEWISE
self.drafter.dummy_run(num_tokens, use_cudagraphs=use_cudagraphs)

# 如果不跳过EPLB同步
# 调用 self.eplb_step(is_dummy=True, is_profile=is_profile) ,避免 DP 阻塞,使各 rank 在假批次也能做必要的重排同步
if not skip_eplb:
self.eplb_step(is_dummy=True, is_profile=is_profile)
# 返回值
logit_indices = np.cumsum(num_scheduled_tokens) - 1
return hidden_states, hidden_states[logit_indices]

CUDA Graph 捕获触发点

  • 只有当 _dummy_runcudagraph_runtime_mode != NONE 进入模型时,且对应 BatchDescriptor 的缓存项尚未捕获,CUDAGraphWrapper.__call__ 才会执行捕获流程(cuda_graph.py:96-167)。
  • 预热(不捕获)与捕获的切换是显式由 capture_model → _capture_cudagraphs → _dummy_runcudagraph_runtime_mode 参数控制的。

补充提示

  • 若需确认捕获时机,可将日志级别设为 DEBUGCUDAGraphWrapper 在首次捕获时会打印 “Capturing a cudagraph on (…, batch_descriptor=…)”(cuda_graph.py:104-112)。
  • CudagraphDispatcher 只会为配置允许的 batch sizes 提供合法的 batch_descriptor;其余尺寸会被派发为 CUDAGraphMode.NONE,不会触发捕获(cudagraph_dispatcher.py:89-133)。

如果你想进一步定位具体首个“捕获运行”的 batch size 或查看哪些尺寸已被捕获,我可以帮你在当前进程中打印 self.model_runner.cudagraph_batch_sizes 以及 CUDAGraphWrapper.concrete_cudagraph_entries 的 keys,以核对与配置的 cudagraph_capture_sizes 是否一致。

评论