[Model Runner] Deprecate not_need_stop (#6356)

* Deprecate not_need_stop
This commit is contained in:
sunxin
2026-03-05 10:55:42 +08:00
committed by GitHub
parent fa4815b93a
commit 0dc7034ce0
12 changed files with 534 additions and 165 deletions
+108 -88
View File
@@ -57,10 +57,8 @@ from fastdeploy.worker.input_batch import InputBatch, reorder_split_prefill_and_
if current_platform.is_iluvatar():
from fastdeploy.model_executor.ops.iluvatar import (
get_stop,
recover_decode_task,
set_data_ipc,
set_stop,
set_value_by_flags_and_idx,
)
@@ -72,9 +70,7 @@ elif current_platform.is_dcu():
share_external_data = None
else:
from fastdeploy.model_executor.ops.gpu import (
get_stop,
recover_decode_task,
set_stop,
set_value_by_flags_and_idx,
share_external_data,
speculate_schedule_cache,
@@ -83,6 +79,7 @@ else:
)
from fastdeploy.model_executor.pre_and_post_process import (
async_set_value,
post_process,
pre_process,
rebuild_padding,
@@ -267,14 +264,16 @@ class GPUModelRunner(ModelRunnerBase):
)
# for overlap
self.last_model_output_data = None
self.last_sampler_output = None
self.last_post_process_event = None
self.last_token_num = -1
self._cached_model_output_data = None
self._cached_sampler_output = None
self._cached_post_process_event = None
# Cached token count for next batch prediction in overlap scheduling.
# Used to avoid synchronization overhead when preparing inputs for the next batch.
self._cached_launch_token_num = -1
self.enable_overlap_schedule = fd_config.scheduler_config.enable_overlap_schedule and (
not self.speculative_decoding
)
self.current_launch_token_num = 0
def _async_output_busy_loop(self):
"""Entrypoint for the thread which handles outputs asynchronously."""
@@ -297,6 +296,46 @@ class GPUModelRunner(ModelRunnerBase):
"""
return (self.share_inputs["seq_lens_decoder"] > 0).any().cpu().numpy().item()
def _resolve_current_launch_token_num(
self, cached_token_num: int, token_num_event, is_dummy_or_profile_run: bool
) -> int:
"""
Resolve token count for current batch.
In overlap mode, uses cached value from previous batch prediction to avoid GPU-CPU sync.
Falls back to fresh computation in certain conditions:
- dummy/profile runs need accurate counts
- non-overlap mode doesn't support caching
- prefill stage changes batch composition
- invalid cached value
"""
if (
is_dummy_or_profile_run
or (not self.enable_overlap_schedule)
or self.exist_prefill()
or cached_token_num <= 0
):
token_num_event.synchronize()
return self.share_inputs["seq_lens_this_time_cpu"].numpy().sum().item()
return cached_token_num
def _predict_next_launch_token_num(self) -> int:
"""
Predict token count for next batch.
In overlap scheduling, while current batch executes model forward,
the scheduler may have prepared decode requests for next batch.
This prediction allows next batch to skip synchronization.
Returns -1 if prediction is not applicable (non-overlap or prefill exists).
"""
if self.exist_prefill():
return -1
return (
self.share_inputs["seq_lens_this_time_cpu"].numpy().sum().item()
+ self.share_inputs["is_block_step_cpu"].numpy().sum().item()
)
def only_prefill(self):
"""
check whether prefill only
@@ -711,8 +750,6 @@ class GPUModelRunner(ModelRunnerBase):
self.initialize_kv_cache()
req_len = len(req_dicts)
has_prefill_task = False
has_decode_task = False
batch_pooling_params = []
self.share_inputs["num_running_requests"] = num_running_requests
@@ -828,7 +865,6 @@ class GPUModelRunner(ModelRunnerBase):
if request.sampling_params is not None and request.sampling_params.prompt_logprobs is not None:
self.prompt_logprobs_reqs[request.request_id] = request
self.forward_batch_reqs_list[idx] = request
has_prefill_task = True
if self.speculative_decoding and self.speculative_method == "suffix" and self.proposer is not None:
if isinstance(request.prompt_token_ids, np.ndarray):
@@ -859,11 +895,14 @@ class GPUModelRunner(ModelRunnerBase):
encoder_block_num = len(request.block_tables)
self.share_inputs["encoder_block_lens"][idx : idx + 1] = encoder_block_num
self.share_inputs["block_tables"][idx : idx + 1, :] = -1
self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array(
request.block_tables, dtype="int32"
)
if self.share_inputs["is_block_step"][idx]: # has tasks to continue to decode
has_decode_task = True
if current_platform.is_cuda():
async_set_value(
self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num], request.block_tables
)
else:
self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array(
request.block_tables, dtype="int32"
)
self.share_inputs["preempted_idx"][idx : idx + 1, :] = 0
continue
else: # preempted task
@@ -940,8 +979,6 @@ class GPUModelRunner(ModelRunnerBase):
self.sampler.apply_logits_processor(idx, logits_info, prefill_tokens)
self._process_mm_features(req_dicts)
if has_prefill_task or has_decode_task:
set_stop(self.share_inputs["not_need_stop"], True)
self.share_inputs["seq_lens_this_time"] = self.share_inputs["seq_lens_this_time_buffer"][:num_running_requests]
if self.speculative_method in ["mtp"]:
@@ -1067,7 +1104,7 @@ class GPUModelRunner(ModelRunnerBase):
)
self.share_inputs["seq_lens_this_time"] = self.share_inputs["seq_lens_this_time_buffer"]
def _prepare_inputs(self, last_token_num=-1, is_dummy_or_profile_run=False) -> None:
def _prepare_inputs(self, cached_token_num=-1, is_dummy_or_profile_run=False) -> None:
"""Prepare the model inputs"""
if self.enable_mm and self.share_inputs["image_features_list"] is not None:
tensor_feats = [t for t in self.share_inputs["image_features_list"] if isinstance(t, paddle.Tensor)]
@@ -1115,16 +1152,7 @@ class GPUModelRunner(ModelRunnerBase):
self.share_inputs["is_block_step_cpu"].copy_(self.share_inputs["is_block_step"], False)
token_num_event = paddle.device.cuda.create_event()
token_num_event.record()
if (
is_dummy_or_profile_run
or (not self.enable_overlap_schedule)
or self.exist_prefill()
or last_token_num <= 0
):
token_num_event.synchronize()
token_num = self.share_inputs["seq_lens_this_time_cpu"].numpy().sum().item()
else:
token_num = last_token_num
token_num = self._resolve_current_launch_token_num(cached_token_num, token_num_event, is_dummy_or_profile_run)
(
ids_remove_padding,
batch_id_per_token,
@@ -1189,7 +1217,7 @@ class GPUModelRunner(ModelRunnerBase):
logits_processors=self.share_inputs["logits_processors"],
share_inputs=self.share_inputs,
)
return token_num_event
return token_num, token_num_event
def _process_reorder(self) -> None:
if self.attn_backends and getattr(self.attn_backends[0], "enable_ids_reorder", False):
@@ -1968,14 +1996,17 @@ class GPUModelRunner(ModelRunnerBase):
model_forward_batch: Optional[List[Request]] = None,
num_running_requests: int = None,
) -> None:
model_output, p_done_idxs, token_num_event = self._preprocess_and_execute_model(
model_forward_batch, num_running_requests
)
model_output_data, sampler_output, post_process_event, _ = self._postprocess(
model_output, p_done_idxs, token_num_event, model_forward_batch, num_running_requests
model_output, p_done_idxs, _ = self._preprocess_and_execute_model(model_forward_batch, num_running_requests)
if model_output is None:
return
model_output_data, sampler_output, post_process_event = self._postprocess(
model_output, p_done_idxs, model_forward_batch, num_running_requests
)
if model_output_data is not None and not self.speculative_decoding:
self._save_model_output(model_output_data, sampler_output, post_process_event)
# synchronizes the async DtoH copies of sampled_token_ids.
post_process_event.synchronize()
self._save_model_output(model_output_data, sampler_output)
def execute_model_overlap(
self,
@@ -1984,40 +2015,61 @@ class GPUModelRunner(ModelRunnerBase):
) -> None:
# preprocess and execute model (current batch)
model_output, p_done_idxs, token_num_event = self._preprocess_and_execute_model(
model_forward_batch, num_running_requests, self.last_token_num
model_forward_batch, num_running_requests, self._cached_launch_token_num
)
# save output (last batch)
if self.last_model_output_data is not None and not self.speculative_decoding:
if self._cached_model_output_data is not None:
# synchronizes the async DtoH copies of sampled_token_ids.
self._cached_post_process_event.synchronize()
self._save_model_output(
self.last_model_output_data, self.last_sampler_output, self.last_post_process_event
self._cached_model_output_data,
self._cached_sampler_output,
)
# postprocess (current batch)
model_output_data, sampler_output, post_process_event, token_num = self._postprocess(
model_output, p_done_idxs, token_num_event, model_forward_batch, num_running_requests
)
self.last_model_output_data = model_output_data
self.last_sampler_output = sampler_output
self.last_post_process_event = post_process_event
self.last_token_num = token_num
# synchronizes the async DtoH copies of seq_lens_this_time_cpu and is_block_step_cpu,
# ensuring that the token count for the current batch is ready to be computed and reused in the subsequent batch.
token_num_event.synchronize()
next_launch_token_num = self._predict_next_launch_token_num()
if self.share_inputs["seq_lens_this_time_cpu"].numpy().sum().item() > 0 and model_output is not None:
model_output_data, sampler_output, post_process_event = self._postprocess(
model_output, p_done_idxs, model_forward_batch, num_running_requests
)
self._cached_model_output_data = model_output_data
self._cached_sampler_output = sampler_output
self._cached_post_process_event = post_process_event
else:
self._cached_model_output_data = None
self._cached_sampler_output = None
self._cached_post_process_event = None
self._cached_launch_token_num = next_launch_token_num
def _preprocess_and_execute_model(
self,
model_forward_batch: Optional[List[Request]] = None,
num_running_requests: int = None,
last_token_num: int = -1,
cached_token_num: int = -1,
) -> None:
if self.deterministic_logger is not None:
self.deterministic_logger.log_batch_start(model_forward_batch)
# 1. Prepare inputs of model and sampler.
p_done_idxs = self._get_p_done_idxs_gd(model_forward_batch, num_running_requests)
# Reorder inputs to split prefill and decode tokens
self._process_reorder()
token_num_event = self._prepare_inputs(last_token_num)
# 1. Prepare inputs of model and sampler.
current_launch_token_num, token_num_event = self._prepare_inputs(cached_token_num)
self.current_launch_token_num = current_launch_token_num
# NOTE(sunxin):
# If current_launch_token_num is 0, it means the current worker is in an idle state,
# and no further processing is required in TP mode.
# However, in EP (Expert Parallelism) mode, there is data on other runner,
# the current runner is required to execute part of the model.
if current_launch_token_num == 0 and not self.parallel_config.use_ep:
return None, None, token_num_event
p_done_idxs = self._get_p_done_idxs_gd(model_forward_batch, num_running_requests)
self.sampler.pre_process(p_done_idxs)
# 1.1 Update state of logits processor
@@ -2039,39 +2091,18 @@ class GPUModelRunner(ModelRunnerBase):
ids_remove_padding=self.forward_meta.ids_remove_padding,
forward_meta=self.forward_meta,
)
if self.use_cudagraph:
model_output = model_output[: self.real_token_num]
return model_output, p_done_idxs, token_num_event
def _postprocess(
self,
model_output: paddle.Tensor,
p_done_idxs: List[int],
token_num_event,
model_forward_batch: Optional[List[Request]] = None,
num_running_requests: int = None,
) -> None:
# NOTE(wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state.
# This logic is not used in TP (Tensor Parallelism) mode. However, in EP (Expert Parallelism) mode,
# Then there is data on other runner, the current runner is required to execute part of the model.
# But not need to run the below code.
if not self.not_need_stop():
return None, None, None, -1
if self.use_cudagraph:
model_output = model_output[: self.real_token_num]
# NOTE(sunxin):
# token_num_event synchronizes the async DtoH copies of seq_lens_this_time_cpu and is_block_step_cpu,
# ensuring that the token count for the current batch is ready to be computed and reused in the subsequent batch.
token_num_event.synchronize()
if (not self.enable_overlap_schedule) or self.exist_prefill():
token_num = -1
else:
token_num = (
self.share_inputs["seq_lens_this_time_cpu"].numpy().sum().item()
+ self.share_inputs["is_block_step_cpu"].numpy().sum().item()
)
if self.speculative_decoding:
self.output_token_num_event.synchronize()
real_num = int(self._real_output_token_num_host)
@@ -2125,9 +2156,8 @@ class GPUModelRunner(ModelRunnerBase):
async_output_queue=self.async_output_queue,
enable_entropy=self.enable_entropy and self.parallel_config.tensor_parallel_rank == 0,
)
self.share_inputs["not_need_stop"].copy_(self.share_inputs["not_need_stop_device"], True)
return None, None, None, -1
return None, None, None
else:
hidden_states = rebuild_padding(
model_output,
@@ -2323,7 +2353,6 @@ class GPUModelRunner(ModelRunnerBase):
post_process_event = paddle.device.cuda.create_event()
if not self.speculative_decoding:
self.share_inputs["sampled_token_ids"].copy_(sampler_output.sampled_token_ids, False)
self.share_inputs["not_need_stop"].copy_(self.share_inputs["not_need_stop_device"], False)
post_process_event.record()
self.exist_prefill_flag = False
@@ -2336,17 +2365,13 @@ class GPUModelRunner(ModelRunnerBase):
and self.share_inputs["is_chunk_step"].sum() == 0
):
self.routing_replay_manager.put_table_to_store()
return model_output_data, sampler_output, post_process_event, token_num
return model_output_data, sampler_output, post_process_event
def _save_model_output(
self,
model_output_data,
sampler_output,
post_process_event,
):
# NOTE(sunxin):
# post_process_event synchronizes the async DtoH copies of not_need_stop and sampled_token_ids.
post_process_event.synchronize()
save_output_normal(
model_output=model_output_data,
sampler_output=sampler_output,
@@ -2515,11 +2540,6 @@ class GPUModelRunner(ModelRunnerBase):
required_memory = byte_of_dtype * 2 * (self.cache_config.block_size * hidden_dim) * num_layers # k + v
return required_memory
# TODO(sunxin): Remove not_need_stop!!!
def not_need_stop(self) -> bool:
"""Stop decoding if the tensor meets the termination condition"""
return get_stop(self.share_inputs["not_need_stop"]).item()
def clear_cache(self, profile=False):
"""Clear cached data from shared inputs and forward metadata"""
create_cache_tensor = profile or not (