[OP][Feature] 统一 limit_thinking_content_length CUDA 算子,支持回复长度限制与注入序列 (#6493)

* Initial plan

* Migrate PRs #6311, #6129, #6305 to develop and merge unit tests

Co-authored-by: yuanlehome <23653004+yuanlehome@users.noreply.github.com>

* fix

* update

* fix

* fix ci

* fix ci

* Initial plan

* test: add test_chat_with_response_max_tokens to test_EB_VL_Lite_serving.py

Co-authored-by: yuanlehome <23653004+yuanlehome@users.noreply.github.com>

* test: add disable-thinking case to test_chat_with_response_max_tokens

Co-authored-by: yuanlehome <23653004+yuanlehome@users.noreply.github.com>

* test: add both reasoning_max_tokens and response_max_tokens case

Co-authored-by: yuanlehome <23653004+yuanlehome@users.noreply.github.com>

* fix ci

* fix ci

* fix ci

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: yuanlehome <23653004+yuanlehome@users.noreply.github.com>
This commit is contained in:
Yuanle Liu
2026-02-25 21:36:50 +08:00
committed by GitHub
parent e18397134a
commit 6d3fede240
38 changed files with 771 additions and 1690 deletions
+31 -113
View File
@@ -32,8 +32,7 @@ from fastdeploy.worker.input_batch import (
if current_platform.is_iluvatar():
from fastdeploy.model_executor.ops.iluvatar import (
get_padding_offset,
limit_thinking_content_length_v1,
limit_thinking_content_length_v2,
limit_thinking_content_length,
save_output,
set_stop_value_multi_ends,
step_paddle,
@@ -58,14 +57,12 @@ elif current_platform.is_dcu():
elif current_platform.is_maca():
from fastdeploy.model_executor.ops.gpu import (
get_padding_offset,
limit_thinking_content_length_v1,
limit_thinking_content_length_v2,
limit_thinking_content_length,
save_output,
save_output_topk,
set_stop_value_multi_ends,
speculate_get_seq_lens_output,
speculate_limit_thinking_content_length_v1,
speculate_limit_thinking_content_length_v2,
speculate_limit_thinking_content_length,
speculate_save_output,
speculate_save_output_topk,
speculate_set_stop_value_multi_seqs,
@@ -102,10 +99,8 @@ else:
step_reschedule,
update_inputs_v1,
speculate_step_reschedule,
limit_thinking_content_length_v1,
limit_thinking_content_length_v2,
speculate_limit_thinking_content_length_v1,
speculate_limit_thinking_content_length_v2,
limit_thinking_content_length,
speculate_limit_thinking_content_length,
)
from fastdeploy.model_executor.entropy_utils import (
@@ -120,85 +115,6 @@ from fastdeploy.worker.output import LogprobsTensors, ModelOutputData, SamplerOu
DISABLE_RECOVER = envs.FD_DISABLED_RECOVER == "1"
def limit_thinking_content_length(
limit_strategy: str,
sampled_token_ids: paddle.Tensor,
max_think_lens: paddle.Tensor,
step_idx: paddle.Tensor,
limit_think_status: paddle.Tensor,
stop_flags: paddle.Tensor,
eos_token_ids: paddle.Tensor,
think_end_id: int,
line_break_id: int = None,
):
if limit_strategy == "</think>":
# for ernie-45-vl
limit_thinking_content_length_v1(
sampled_token_ids,
max_think_lens,
step_idx,
limit_think_status,
stop_flags,
eos_token_ids, # Address the issue where the model outputs EOS tokens during the reasoning process due to model performance problems
think_end_id,
)
elif limit_strategy == "\n</think>\n\n":
# for ernie-x1
assert line_break_id > 0
limit_thinking_content_length_v2(
sampled_token_ids,
max_think_lens,
step_idx,
limit_think_status,
stop_flags,
think_end_id,
line_break_id,
)
else:
raise NotImplementedError(f"Not support {limit_strategy=} for limit thinking content length.")
def speculate_limit_thinking_content_length(
limit_strategy: str,
accept_tokens: paddle.Tensor,
max_think_lens: paddle.Tensor,
step_idx: paddle.Tensor,
limit_think_status: paddle.Tensor,
accept_num: paddle.Tensor,
stop_flags: paddle.Tensor,
eos_token_ids: paddle.Tensor,
think_end_id: int,
line_break_id: int = None,
):
if limit_strategy == "</think>":
# for ernie-45-vl
speculate_limit_thinking_content_length_v1(
accept_tokens,
max_think_lens,
step_idx,
limit_think_status,
accept_num,
stop_flags,
eos_token_ids, # Address the issue where the model outputs EOS tokens during the reasoning process due to model performance problems
think_end_id,
)
elif limit_strategy == "\n</think>\n\n":
# for ernie-x1
assert line_break_id > 0
speculate_limit_thinking_content_length_v2(
accept_tokens,
max_think_lens,
step_idx,
limit_think_status,
accept_num,
stop_flags,
think_end_id,
line_break_id,
)
else:
raise NotImplementedError(f"Not support {limit_strategy=} for limit thinking content length.")
def pre_process(
token_num_cpu: int,
input_ids: paddle.Tensor,
@@ -321,21 +237,22 @@ def post_process_normal(
sampling_metadata: SamplingMetadata,
block_size: int = 64,
think_end_id: int = -1,
line_break_id: int = -1,
splitwise_role_is_decode: bool = False,
enable_entropy: bool = False,
):
"""Post-processing steps after completing a single token generation."""
if think_end_id > 0:
limit_thinking_content_length(
limit_strategy=envs.FD_LIMIT_THINKING_CONTENT_TRUNCATE_STR,
sampled_token_ids=sampler_output.sampled_token_ids,
max_think_lens=share_inputs["max_think_lens"],
step_idx=share_inputs["step_idx"],
limit_think_status=share_inputs["limit_think_status"],
stop_flags=share_inputs["stop_flags"],
eos_token_ids=share_inputs["eos_token_id"],
think_end_id=think_end_id,
line_break_id=line_break_id,
sampler_output.sampled_token_ids,
share_inputs["max_think_lens"],
share_inputs["max_reply_lens"],
share_inputs["step_idx"],
share_inputs["limit_think_status"],
share_inputs["stop_flags"],
share_inputs["eos_token_id"],
share_inputs["inject_token_ids"],
think_end_id,
splitwise_role_is_decode,
)
# 1. Set stop value
paddle.assign(
@@ -480,21 +397,22 @@ def post_process_specualate(
save_each_rank: bool = False,
skip_save_output: bool = False,
think_end_id: int = -1,
line_break_id: int = -1,
splitwise_role_is_decode: bool = False,
enable_entropy: bool = False,
):
if think_end_id > 0:
speculate_limit_thinking_content_length(
limit_strategy=envs.FD_LIMIT_THINKING_CONTENT_TRUNCATE_STR,
accept_tokens=share_inputs["accept_tokens"],
max_think_lens=share_inputs["max_think_lens"],
step_idx=share_inputs["step_idx"],
limit_think_status=share_inputs["limit_think_status"],
accept_num=share_inputs["accept_num"],
stop_flags=share_inputs["stop_flags"],
eos_token_ids=share_inputs["eos_token_id"],
think_end_id=think_end_id,
line_break_id=line_break_id,
share_inputs["accept_tokens"],
share_inputs["max_think_lens"],
share_inputs["max_reply_lens"],
share_inputs["step_idx"],
share_inputs["limit_think_status"],
share_inputs["accept_num"],
share_inputs["stop_flags"],
share_inputs["eos_token_id"],
share_inputs["inject_token_ids"],
think_end_id,
splitwise_role_is_decode,
)
speculate_set_stop_value_multi_seqs(
model_output.accept_tokens,
@@ -602,7 +520,7 @@ def post_process(
skip_save_output: bool = False,
async_output_queue: queue.Queue = None,
think_end_id: int = -1,
line_break_id: int = -1,
splitwise_role_is_decode: bool = False,
enable_entropy: bool = False,
) -> None:
"""Post-processing steps after completing a single token generation."""
@@ -627,7 +545,7 @@ def post_process(
save_each_rank,
skip_save_output,
think_end_id,
line_break_id,
splitwise_role_is_decode,
enable_entropy,
)
else:
@@ -638,7 +556,7 @@ def post_process(
sampling_metadata,
block_size,
think_end_id,
line_break_id,
splitwise_role_is_decode,
enable_entropy,
)
share_inputs["last_preempted_idx"].copy_(share_inputs["preempted_idx"])