diff --git a/custom_ops/gpu_ops/cpp_extensions.cc b/custom_ops/gpu_ops/cpp_extensions.cc index 8516bd5858..4abd710beb 100644 --- a/custom_ops/gpu_ops/cpp_extensions.cc +++ b/custom_ops/gpu_ops/cpp_extensions.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/extension.h" +#include "pybind11/numpy.h" #include "pybind11/pybind11.h" namespace py = pybind11; @@ -49,9 +50,19 @@ void cuda_host_free(uintptr_t ptr) { check_cuda_error(cudaFreeHost(reinterpret_cast(ptr))); } -paddle::Tensor GetStop(paddle::Tensor& not_need_stop); - -void SetStop(paddle::Tensor& not_need_stop, bool flag); +paddle::Tensor CustomNumpyToTensor(py::array numpy_array, + paddle::Tensor tensor) { + py::buffer_info buf_info = numpy_array.request(); + void* numpy_data = buf_info.ptr; + size_t data_size = buf_info.size * buf_info.itemsize; + auto stream = tensor.stream(); + cudaMemcpyAsync((void*)(tensor.data()), + numpy_data, + data_size, + cudaMemcpyHostToDevice, + stream); + return tensor; +} void FlashAttentionMask(const paddle::Tensor& q_input, const paddle::Tensor& k_input, @@ -1722,7 +1733,7 @@ PYBIND11_MODULE(fastdeploy_ops, m) { m.def("get_attn_mask_q", &get_attn_mask_q, "get_attn_mask_q function"); - m.def("get_stop", &GetStop, "get_stop function"); - - m.def("set_stop", &SetStop, "set_stop function"); + m.def("custom_numpy_to_tensor", + &CustomNumpyToTensor, + "custom_numpy_to_tensor function"); } diff --git a/custom_ops/gpu_ops/set_stop.cu b/custom_ops/gpu_ops/set_stop.cu deleted file mode 100644 index ca214a921a..0000000000 --- a/custom_ops/gpu_ops/set_stop.cu +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "helper.h" - -paddle::Tensor GetStop(paddle::Tensor& not_need_stop) { - bool* not_need_stop_data = const_cast(not_need_stop.data()); - auto not_need_stop_cpu = - GetEmptyTensor({1}, paddle::DataType::BOOL, paddle::CPUPlace()); - bool* not_need_stop_cpu_data = - const_cast(not_need_stop_cpu.data()); - not_need_stop_cpu_data[0] = not_need_stop_data[0]; - return not_need_stop_cpu; -} - -void SetStop(paddle::Tensor& not_need_stop, bool flag) { - bool* not_need_stop_data = const_cast(not_need_stop.data()); - not_need_stop_data[0] = flag; -} diff --git a/custom_ops/gpu_ops/speculate_decoding/speculate_preprocess.cu b/custom_ops/gpu_ops/speculate_decoding/speculate_preprocess.cu index b3c5ec7c14..d81d6ecb41 100644 --- a/custom_ops/gpu_ops/speculate_decoding/speculate_preprocess.cu +++ b/custom_ops/gpu_ops/speculate_decoding/speculate_preprocess.cu @@ -155,15 +155,6 @@ std::vector SpeculatePreProcess( auto cu_seqlens_k = paddle::empty({bsz + 1}, paddle::DataType::INT32, input_ids.place()); - if (token_num_data == 0) { - return {ids_remove_padding, - batch_id_per_token, - cu_seqlens_q, - cu_seqlens_k, - paddle::Tensor(), - paddle::Tensor(), - paddle::Tensor()}; - } #ifdef PADDLE_WITH_COREX int blockSize = std::min((token_num_data + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE, 128); @@ -185,6 +176,16 @@ std::vector SpeculatePreProcess( auto real_output_token_num = paddle::empty({1}, paddle::DataType::INT32, input_ids.place()); + if (token_num_data == 0) { + return {ids_remove_padding, + batch_id_per_token, + cu_seqlens_q, + cu_seqlens_k, + cu_seq_lens_q_output, + batch_id_per_token_output, + real_output_token_num}; + } + int64_t *ids_remove_padding_ptr = ids_remove_padding.data(); int *batch_id_per_token_ptr = batch_id_per_token.data(); int *cu_seqlens_q_ptr = cu_seqlens_q.data(); diff --git a/custom_ops/iluvatar_ops/cpp_extensions.cc b/custom_ops/iluvatar_ops/cpp_extensions.cc index c085f2a612..f84192e12e 100644 --- a/custom_ops/iluvatar_ops/cpp_extensions.cc +++ b/custom_ops/iluvatar_ops/cpp_extensions.cc @@ -49,10 +49,6 @@ void cuda_host_free(uintptr_t ptr) { check_cuda_error(cudaFreeHost(reinterpret_cast(ptr))); } -paddle::Tensor GetStop(paddle::Tensor& not_need_stop); - -void SetStop(paddle::Tensor& not_need_stop, bool flag); - PYBIND11_MODULE(fastdeploy_ops, m) { /** * alloc_cache_pinned.cc @@ -67,8 +63,4 @@ PYBIND11_MODULE(fastdeploy_ops, m) { m.def( "cuda_host_free", &cuda_host_free, "Free pinned memory", py::arg("ptr")); py::register_exception(m, "CudaError"); - - m.def("get_stop", &GetStop, "get_stop function"); - - m.def("set_stop", &SetStop, "set_stop function"); } diff --git a/custom_ops/metax_ops/cpp_extensions.cc b/custom_ops/metax_ops/cpp_extensions.cc index 2d468b7cc1..4dfb2704f0 100644 --- a/custom_ops/metax_ops/cpp_extensions.cc +++ b/custom_ops/metax_ops/cpp_extensions.cc @@ -49,10 +49,6 @@ void cuda_host_free(uintptr_t ptr) { check_cuda_error(cudaFreeHost(reinterpret_cast(ptr))); } -paddle::Tensor GetStop(paddle::Tensor& not_need_stop); - -void SetStop(paddle::Tensor& not_need_stop, bool flag); - std::vector SpeculatePreProcess( const int64_t cpu_token_num, const paddle::Tensor& input_ids, @@ -76,10 +72,6 @@ PYBIND11_MODULE(fastdeploy_ops, m) { "cuda_host_free", &cuda_host_free, "Free pinned memory", py::arg("ptr")); py::register_exception(m, "CudaError"); - m.def("get_stop", &GetStop, "get_stop function"); - - m.def("set_stop", &SetStop, "set_stop function"); - m.def("speculate_pre_process", &SpeculatePreProcess, "speculate_pre_process function"); diff --git a/custom_ops/setup_ops.py b/custom_ops/setup_ops.py index fa1e6559ba..72dbbaac2e 100644 --- a/custom_ops/setup_ops.py +++ b/custom_ops/setup_ops.py @@ -270,7 +270,6 @@ elif paddle.is_compiled_with_cuda(): "gpu_ops/stop_generation.cu", "gpu_ops/stop_generation_multi_ends.cu", "gpu_ops/set_flags.cu", - "gpu_ops/set_stop.cu", "gpu_ops/update_inputs_v1.cu", "gpu_ops/recover_decode_task.cu", "gpu_ops/step.cu", @@ -562,7 +561,6 @@ elif paddle.is_compiled_with_custom_device("iluvatar_gpu"): "gpu_ops/recover_decode_task.cu", "gpu_ops/update_inputs_v1.cu", "gpu_ops/get_img_boundaries.cc", - "gpu_ops/set_stop.cu", "gpu_ops/fused_neox_rope_embedding.cu", "gpu_ops/get_output_ep.cc", "iluvatar_ops/moe_dispatch.cu", @@ -649,7 +647,6 @@ elif paddle.device.is_compiled_with_custom_device("metax_gpu"): "gpu_ops/unset_data_ipc.cu", "gpu_ops/swap_cache_batch.cu", "gpu_ops/gelu_tanh.cu", - "gpu_ops/set_stop.cu", "metax_ops/moe_dispatch.cu", "metax_ops/moe_ffn.cu", "metax_ops/moe_reduce.cu", diff --git a/fastdeploy/model_executor/pre_and_post_process.py b/fastdeploy/model_executor/pre_and_post_process.py index f7c34c835c..5c9c08b297 100644 --- a/fastdeploy/model_executor/pre_and_post_process.py +++ b/fastdeploy/model_executor/pre_and_post_process.py @@ -101,6 +101,7 @@ else: speculate_step_reschedule, limit_thinking_content_length, speculate_limit_thinking_content_length, + custom_numpy_to_tensor, ) from fastdeploy.model_executor.entropy_utils import ( @@ -114,6 +115,37 @@ from fastdeploy.worker.output import LogprobsTensors, ModelOutputData, SamplerOu DISABLE_RECOVER = envs.FD_DISABLED_RECOVER == "1" +if current_platform.is_cuda(): + + def async_set_value(tgt, src): + if isinstance(src, (int, float, bool)): + src = paddle.full(tgt.shape, fill_value=src, dtype=tgt.dtype) + elif isinstance(src, (list, np.array)): + dtype_str = str(tgt.dtype).split(".")[1] + if isinstance(src, list): + src = np.array(src, dtype=dtype_str if dtype_str != "bfloat16" else "float32") + if str(src.dtype) != dtype_str: + srt_tensor = paddle.empty(tgt.shape, dtype=str(src.dtype)) + src = custom_numpy_to_tensor(src, srt_tensor) + else: + return custom_numpy_to_tensor(src, tgt) + elif isinstance(src, paddle.Tensor): + pass + else: + raise ValueError("async_set_value unsupported src type: {}".format(type(src))) + if src.shape != tgt.shape: + src = src.reshape(tgt.shape) + if src.dtype != tgt.dtype: + src = src.cast(tgt.dtype) + if src.place != tgt.place: + src = src.to(tgt.place) + tgt.copy_(src, blocking=False) + +else: + + def async_set_value(*args, **kwargs): + raise RuntimeError("async_set_value is only available on CUDA") + def pre_process( token_num_cpu: int, @@ -870,7 +902,7 @@ def post_process_pooling( ) update_inputs_v1( model_output.stop_flags, - model_output.not_need_stop, + model_output.not_need_stop_device, model_output.seq_lens_this_time, model_output.seq_lens_encoder, model_output.seq_lens_decoder, diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 5e9b7d1692..302f22b57e 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -57,10 +57,8 @@ from fastdeploy.worker.input_batch import InputBatch, reorder_split_prefill_and_ if current_platform.is_iluvatar(): from fastdeploy.model_executor.ops.iluvatar import ( - get_stop, recover_decode_task, set_data_ipc, - set_stop, set_value_by_flags_and_idx, ) @@ -72,9 +70,7 @@ elif current_platform.is_dcu(): share_external_data = None else: from fastdeploy.model_executor.ops.gpu import ( - get_stop, recover_decode_task, - set_stop, set_value_by_flags_and_idx, share_external_data, speculate_schedule_cache, @@ -83,6 +79,7 @@ else: ) from fastdeploy.model_executor.pre_and_post_process import ( + async_set_value, post_process, pre_process, rebuild_padding, @@ -267,14 +264,16 @@ class GPUModelRunner(ModelRunnerBase): ) # for overlap - self.last_model_output_data = None - self.last_sampler_output = None - self.last_post_process_event = None - self.last_token_num = -1 - + self._cached_model_output_data = None + self._cached_sampler_output = None + self._cached_post_process_event = None + # Cached token count for next batch prediction in overlap scheduling. + # Used to avoid synchronization overhead when preparing inputs for the next batch. + self._cached_launch_token_num = -1 self.enable_overlap_schedule = fd_config.scheduler_config.enable_overlap_schedule and ( not self.speculative_decoding ) + self.current_launch_token_num = 0 def _async_output_busy_loop(self): """Entrypoint for the thread which handles outputs asynchronously.""" @@ -297,6 +296,46 @@ class GPUModelRunner(ModelRunnerBase): """ return (self.share_inputs["seq_lens_decoder"] > 0).any().cpu().numpy().item() + def _resolve_current_launch_token_num( + self, cached_token_num: int, token_num_event, is_dummy_or_profile_run: bool + ) -> int: + """ + Resolve token count for current batch. + + In overlap mode, uses cached value from previous batch prediction to avoid GPU-CPU sync. + Falls back to fresh computation in certain conditions: + - dummy/profile runs need accurate counts + - non-overlap mode doesn't support caching + - prefill stage changes batch composition + - invalid cached value + """ + if ( + is_dummy_or_profile_run + or (not self.enable_overlap_schedule) + or self.exist_prefill() + or cached_token_num <= 0 + ): + token_num_event.synchronize() + return self.share_inputs["seq_lens_this_time_cpu"].numpy().sum().item() + return cached_token_num + + def _predict_next_launch_token_num(self) -> int: + """ + Predict token count for next batch. + + In overlap scheduling, while current batch executes model forward, + the scheduler may have prepared decode requests for next batch. + This prediction allows next batch to skip synchronization. + + Returns -1 if prediction is not applicable (non-overlap or prefill exists). + """ + if self.exist_prefill(): + return -1 + return ( + self.share_inputs["seq_lens_this_time_cpu"].numpy().sum().item() + + self.share_inputs["is_block_step_cpu"].numpy().sum().item() + ) + def only_prefill(self): """ check whether prefill only @@ -711,8 +750,6 @@ class GPUModelRunner(ModelRunnerBase): self.initialize_kv_cache() req_len = len(req_dicts) - has_prefill_task = False - has_decode_task = False batch_pooling_params = [] self.share_inputs["num_running_requests"] = num_running_requests @@ -828,7 +865,6 @@ class GPUModelRunner(ModelRunnerBase): if request.sampling_params is not None and request.sampling_params.prompt_logprobs is not None: self.prompt_logprobs_reqs[request.request_id] = request self.forward_batch_reqs_list[idx] = request - has_prefill_task = True if self.speculative_decoding and self.speculative_method == "suffix" and self.proposer is not None: if isinstance(request.prompt_token_ids, np.ndarray): @@ -859,11 +895,14 @@ class GPUModelRunner(ModelRunnerBase): encoder_block_num = len(request.block_tables) self.share_inputs["encoder_block_lens"][idx : idx + 1] = encoder_block_num self.share_inputs["block_tables"][idx : idx + 1, :] = -1 - self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array( - request.block_tables, dtype="int32" - ) - if self.share_inputs["is_block_step"][idx]: # has tasks to continue to decode - has_decode_task = True + if current_platform.is_cuda(): + async_set_value( + self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num], request.block_tables + ) + else: + self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array( + request.block_tables, dtype="int32" + ) self.share_inputs["preempted_idx"][idx : idx + 1, :] = 0 continue else: # preempted task @@ -940,8 +979,6 @@ class GPUModelRunner(ModelRunnerBase): self.sampler.apply_logits_processor(idx, logits_info, prefill_tokens) self._process_mm_features(req_dicts) - if has_prefill_task or has_decode_task: - set_stop(self.share_inputs["not_need_stop"], True) self.share_inputs["seq_lens_this_time"] = self.share_inputs["seq_lens_this_time_buffer"][:num_running_requests] if self.speculative_method in ["mtp"]: @@ -1067,7 +1104,7 @@ class GPUModelRunner(ModelRunnerBase): ) self.share_inputs["seq_lens_this_time"] = self.share_inputs["seq_lens_this_time_buffer"] - def _prepare_inputs(self, last_token_num=-1, is_dummy_or_profile_run=False) -> None: + def _prepare_inputs(self, cached_token_num=-1, is_dummy_or_profile_run=False) -> None: """Prepare the model inputs""" if self.enable_mm and self.share_inputs["image_features_list"] is not None: tensor_feats = [t for t in self.share_inputs["image_features_list"] if isinstance(t, paddle.Tensor)] @@ -1115,16 +1152,7 @@ class GPUModelRunner(ModelRunnerBase): self.share_inputs["is_block_step_cpu"].copy_(self.share_inputs["is_block_step"], False) token_num_event = paddle.device.cuda.create_event() token_num_event.record() - if ( - is_dummy_or_profile_run - or (not self.enable_overlap_schedule) - or self.exist_prefill() - or last_token_num <= 0 - ): - token_num_event.synchronize() - token_num = self.share_inputs["seq_lens_this_time_cpu"].numpy().sum().item() - else: - token_num = last_token_num + token_num = self._resolve_current_launch_token_num(cached_token_num, token_num_event, is_dummy_or_profile_run) ( ids_remove_padding, batch_id_per_token, @@ -1189,7 +1217,7 @@ class GPUModelRunner(ModelRunnerBase): logits_processors=self.share_inputs["logits_processors"], share_inputs=self.share_inputs, ) - return token_num_event + return token_num, token_num_event def _process_reorder(self) -> None: if self.attn_backends and getattr(self.attn_backends[0], "enable_ids_reorder", False): @@ -1968,14 +1996,17 @@ class GPUModelRunner(ModelRunnerBase): model_forward_batch: Optional[List[Request]] = None, num_running_requests: int = None, ) -> None: - model_output, p_done_idxs, token_num_event = self._preprocess_and_execute_model( - model_forward_batch, num_running_requests - ) - model_output_data, sampler_output, post_process_event, _ = self._postprocess( - model_output, p_done_idxs, token_num_event, model_forward_batch, num_running_requests + model_output, p_done_idxs, _ = self._preprocess_and_execute_model(model_forward_batch, num_running_requests) + if model_output is None: + return + + model_output_data, sampler_output, post_process_event = self._postprocess( + model_output, p_done_idxs, model_forward_batch, num_running_requests ) if model_output_data is not None and not self.speculative_decoding: - self._save_model_output(model_output_data, sampler_output, post_process_event) + # synchronizes the async DtoH copies of sampled_token_ids. + post_process_event.synchronize() + self._save_model_output(model_output_data, sampler_output) def execute_model_overlap( self, @@ -1984,40 +2015,61 @@ class GPUModelRunner(ModelRunnerBase): ) -> None: # preprocess and execute model (current batch) model_output, p_done_idxs, token_num_event = self._preprocess_and_execute_model( - model_forward_batch, num_running_requests, self.last_token_num + model_forward_batch, num_running_requests, self._cached_launch_token_num ) # save output (last batch) - if self.last_model_output_data is not None and not self.speculative_decoding: + if self._cached_model_output_data is not None: + # synchronizes the async DtoH copies of sampled_token_ids. + self._cached_post_process_event.synchronize() self._save_model_output( - self.last_model_output_data, self.last_sampler_output, self.last_post_process_event + self._cached_model_output_data, + self._cached_sampler_output, ) # postprocess (current batch) - model_output_data, sampler_output, post_process_event, token_num = self._postprocess( - model_output, p_done_idxs, token_num_event, model_forward_batch, num_running_requests - ) - self.last_model_output_data = model_output_data - self.last_sampler_output = sampler_output - self.last_post_process_event = post_process_event - self.last_token_num = token_num + # synchronizes the async DtoH copies of seq_lens_this_time_cpu and is_block_step_cpu, + # ensuring that the token count for the current batch is ready to be computed and reused in the subsequent batch. + token_num_event.synchronize() + next_launch_token_num = self._predict_next_launch_token_num() + if self.share_inputs["seq_lens_this_time_cpu"].numpy().sum().item() > 0 and model_output is not None: + model_output_data, sampler_output, post_process_event = self._postprocess( + model_output, p_done_idxs, model_forward_batch, num_running_requests + ) + self._cached_model_output_data = model_output_data + self._cached_sampler_output = sampler_output + self._cached_post_process_event = post_process_event + else: + self._cached_model_output_data = None + self._cached_sampler_output = None + self._cached_post_process_event = None + self._cached_launch_token_num = next_launch_token_num def _preprocess_and_execute_model( self, model_forward_batch: Optional[List[Request]] = None, num_running_requests: int = None, - last_token_num: int = -1, + cached_token_num: int = -1, ) -> None: if self.deterministic_logger is not None: self.deterministic_logger.log_batch_start(model_forward_batch) - # 1. Prepare inputs of model and sampler. - p_done_idxs = self._get_p_done_idxs_gd(model_forward_batch, num_running_requests) - # Reorder inputs to split prefill and decode tokens self._process_reorder() - token_num_event = self._prepare_inputs(last_token_num) + # 1. Prepare inputs of model and sampler. + current_launch_token_num, token_num_event = self._prepare_inputs(cached_token_num) + self.current_launch_token_num = current_launch_token_num + + # NOTE(sunxin): + # If current_launch_token_num is 0, it means the current worker is in an idle state, + # and no further processing is required in TP mode. + # However, in EP (Expert Parallelism) mode, there is data on other runner, + # the current runner is required to execute part of the model. + if current_launch_token_num == 0 and not self.parallel_config.use_ep: + return None, None, token_num_event + + p_done_idxs = self._get_p_done_idxs_gd(model_forward_batch, num_running_requests) self.sampler.pre_process(p_done_idxs) # 1.1 Update state of logits processor @@ -2039,39 +2091,18 @@ class GPUModelRunner(ModelRunnerBase): ids_remove_padding=self.forward_meta.ids_remove_padding, forward_meta=self.forward_meta, ) + if self.use_cudagraph: + model_output = model_output[: self.real_token_num] return model_output, p_done_idxs, token_num_event def _postprocess( self, model_output: paddle.Tensor, p_done_idxs: List[int], - token_num_event, model_forward_batch: Optional[List[Request]] = None, num_running_requests: int = None, ) -> None: - # NOTE(wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state. - # This logic is not used in TP (Tensor Parallelism) mode. However, in EP (Expert Parallelism) mode, - # Then there is data on other runner, the current runner is required to execute part of the model. - # But not need to run the below code. - if not self.not_need_stop(): - return None, None, None, -1 - - if self.use_cudagraph: - model_output = model_output[: self.real_token_num] - - # NOTE(sunxin): - # token_num_event synchronizes the async DtoH copies of seq_lens_this_time_cpu and is_block_step_cpu, - # ensuring that the token count for the current batch is ready to be computed and reused in the subsequent batch. - token_num_event.synchronize() - if (not self.enable_overlap_schedule) or self.exist_prefill(): - token_num = -1 - else: - token_num = ( - self.share_inputs["seq_lens_this_time_cpu"].numpy().sum().item() - + self.share_inputs["is_block_step_cpu"].numpy().sum().item() - ) - if self.speculative_decoding: self.output_token_num_event.synchronize() real_num = int(self._real_output_token_num_host) @@ -2125,9 +2156,8 @@ class GPUModelRunner(ModelRunnerBase): async_output_queue=self.async_output_queue, enable_entropy=self.enable_entropy and self.parallel_config.tensor_parallel_rank == 0, ) - self.share_inputs["not_need_stop"].copy_(self.share_inputs["not_need_stop_device"], True) - return None, None, None, -1 + return None, None, None else: hidden_states = rebuild_padding( model_output, @@ -2323,7 +2353,6 @@ class GPUModelRunner(ModelRunnerBase): post_process_event = paddle.device.cuda.create_event() if not self.speculative_decoding: self.share_inputs["sampled_token_ids"].copy_(sampler_output.sampled_token_ids, False) - self.share_inputs["not_need_stop"].copy_(self.share_inputs["not_need_stop_device"], False) post_process_event.record() self.exist_prefill_flag = False @@ -2336,17 +2365,13 @@ class GPUModelRunner(ModelRunnerBase): and self.share_inputs["is_chunk_step"].sum() == 0 ): self.routing_replay_manager.put_table_to_store() - return model_output_data, sampler_output, post_process_event, token_num + return model_output_data, sampler_output, post_process_event def _save_model_output( self, model_output_data, sampler_output, - post_process_event, ): - # NOTE(sunxin): - # post_process_event synchronizes the async DtoH copies of not_need_stop and sampled_token_ids. - post_process_event.synchronize() save_output_normal( model_output=model_output_data, sampler_output=sampler_output, @@ -2515,11 +2540,6 @@ class GPUModelRunner(ModelRunnerBase): required_memory = byte_of_dtype * 2 * (self.cache_config.block_size * hidden_dim) * num_layers # k + v return required_memory - # TODO(sunxin): Remove not_need_stop!!! - def not_need_stop(self) -> bool: - """Stop decoding if the tensor meets the termination condition""" - return get_stop(self.share_inputs["not_need_stop"]).item() - def clear_cache(self, profile=False): """Clear cached data from shared inputs and forward metadata""" create_cache_tensor = profile or not ( diff --git a/fastdeploy/worker/input_batch.py b/fastdeploy/worker/input_batch.py index 03d5862f05..e808a5bed7 100644 --- a/fastdeploy/worker/input_batch.py +++ b/fastdeploy/worker/input_batch.py @@ -152,7 +152,7 @@ class InputBatch: self.seq_lens_this_time_cpu = paddle.full([max_num_seqs, 1], 0, dtype="int32", device="cpu") self.is_block_step_cpu = paddle.full([max_num_seqs], False, dtype="bool", device="cpu") else: - self.not_need_stop = paddle.full([1], False, dtype="bool").pin_memory() + self.not_need_stop = paddle.full([1], False, dtype="bool").cpu() self.sampled_token_ids = paddle.full([max_num_seqs, 1], -1, dtype="int64").pin_memory() self.seq_lens_this_time_cpu = paddle.full([max_num_seqs, 1], 0, dtype="int32").pin_memory() self.is_block_step_cpu = paddle.full([max_num_seqs], False, dtype="bool").pin_memory() diff --git a/fastdeploy/worker/metax_model_runner.py b/fastdeploy/worker/metax_model_runner.py index 02a43f6ae3..b265b5b525 100644 --- a/fastdeploy/worker/metax_model_runner.py +++ b/fastdeploy/worker/metax_model_runner.py @@ -62,10 +62,8 @@ from fastdeploy.model_executor.model_loader import get_model_loader from fastdeploy.model_executor.models.ernie4_5_vl.modeling_resampler import ScatterOp from fastdeploy.model_executor.models.interfaces_base import FdModelForPooling from fastdeploy.model_executor.ops.gpu import ( - get_stop, recover_decode_task, set_data_ipc, - set_stop, set_value_by_flags_and_idx, share_external_data, speculate_schedule_cache, @@ -884,7 +882,7 @@ class MetaxModelRunner(ModelRunnerBase): self._process_mm_features(req_dicts) if has_prefill_task or has_decode_task: - set_stop(self.share_inputs["not_need_stop"], True) + self.share_inputs["not_need_stop"][0] = True self.share_inputs["seq_lens_this_time"] = self.share_inputs["seq_lens_this_time_buffer"][:num_running_requests] if self.speculative_method in ["mtp"]: @@ -967,7 +965,7 @@ class MetaxModelRunner(ModelRunnerBase): self.sampler.apply_logits_processor(idx, logits_info, prefill_tokens) - set_stop(self.share_inputs["not_need_stop"], True) + self.share_inputs["not_need_stop"][0] = True self.share_inputs["seq_lens_this_time"] = self.share_inputs["seq_lens_this_time_buffer"][:num_running_requests] @@ -2489,7 +2487,7 @@ class MetaxModelRunner(ModelRunnerBase): def not_need_stop(self) -> bool: """Stop decoding if the tensor meets the termination condition""" - return get_stop(self.share_inputs["not_need_stop"]).item() + return self.share_inputs["not_need_stop"][0] def clear_cache(self, profile=False): """Clear cached data from shared inputs and forward metadata""" diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 8eebcae7d7..72206fce5f 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -171,8 +171,10 @@ class PaddleDisWorkerProc: self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8 self.speculative_decoding = fd_config.speculative_config.method is not None - self.enable_overlap_schedule = self.scheduler_config.enable_overlap_schedule and ( - not self.speculative_decoding + self.enable_overlap_schedule = ( + current_platform.is_cuda() + and self.scheduler_config.enable_overlap_schedule + and (not self.speculative_decoding) ) def init_control(self): @@ -560,12 +562,11 @@ class PaddleDisWorkerProc: self.worker.preprocess_new_task(req_dicts, max_occupied_batch_index) if ( - (not self.parallel_config.use_ep) - and (not self.worker.model_runner.not_need_stop()) - and (not self.enable_overlap_schedule) + not self.parallel_config.use_ep + and not current_platform.is_cuda() + and not self.worker.model_runner.not_need_stop() ): self._tp_barrier_wait() if tp_size > 1 else None - time.sleep(0.001) continue @@ -578,6 +579,14 @@ class PaddleDisWorkerProc: self.exist_prefill_task_signal.value[0] = self.worker.exist_prefill() logger.debug(f"execute model cost: {time.time()-start_execute_time:.5f} s") + if ( + not self.parallel_config.use_ep + and current_platform.is_cuda() + and self.worker.model_runner.current_launch_token_num == 0 + ): + self._tp_barrier_wait() if tp_size > 1 else None + time.sleep(0.001) + def initialize_kv_cache(self) -> None: """Profiles the peak memory usage of the model to determine how many KV blocks may be allocated without OOMs. diff --git a/tests/e2e/4cards_cases/test_Qwen3_30b_tp4.py b/tests/e2e/4cards_cases/test_Qwen3_30b_tp4.py new file mode 100644 index 0000000000..b002ecf7ce --- /dev/null +++ b/tests/e2e/4cards_cases/test_Qwen3_30b_tp4.py @@ -0,0 +1,347 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +import shutil +import signal +import subprocess +import sys +import time + +import pytest +import requests + +tests_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +sys.path.insert(0, tests_dir) + +from e2e.utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean_ports, + is_port_open, +) + +os.environ["FD_ATTENTION_BACKEND"] = "FLASH_ATTN" +os.environ["FLAGS_flash_attn_version"] = "3" +os.environ["FD_SAMPLING_CLASS"] = "rejection" + + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_server(): + """ + Pytest fixture that runs once per test session: + - Cleans ports before tests + - Starts the API server as a subprocess + - Waits for server port to open (up to 30 seconds) + - Tears down server after all tests finish + """ + print("Pre-test port cleanup...") + clean_ports() + print("log dir clean ") + if os.path.exists("log") and os.path.isdir("log"): + shutil.rmtree("log") + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "Qwen3-30B-A3B") + else: + model_path = "./Qwen3-30B-A3B" + + log_path = "server.log" + cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT), + "--tensor-parallel-size", + "4", + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", + str(FD_METRICS_PORT), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT), + "--max-model-len", + "32768", + "--max-num-seqs", + "50", + "--enable-overlap-schedule", + ] + + # Start subprocess in new process group + with open(log_path, "w") as logfile: + process = subprocess.Popen( + cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + ) + + # Wait up to 300 seconds for API server to be ready + for _ in range(480): + if is_port_open("127.0.0.1", FD_API_PORT): + print(f"API server is up on port {FD_API_PORT}") + break + time.sleep(1) + else: + print("API server failed to start in time. Cleaning up...") + try: + os.killpg(process.pid, signal.SIGTERM) + except Exception as e: + print(f"Failed to kill process group: {e}") + raise RuntimeError(f"API server did not start on port {FD_API_PORT}") + + yield # Run tests + + print("\n===== Post-test server cleanup... =====") + try: + os.killpg(process.pid, signal.SIGTERM) + print(f"API server (pid={process.pid}) terminated") + except Exception as e: + print(f"Failed to terminate API server: {e}") + + +@pytest.fixture(scope="session") +def api_url(request): + """ + Returns the API endpoint URL for chat completions. + """ + return f"http://0.0.0.0:{FD_API_PORT}/v1/chat/completions" + + +@pytest.fixture(scope="session") +def metrics_url(request): + """ + Returns the metrics endpoint URL. + """ + return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics" + + +@pytest.fixture +def headers(): + """ + Returns common HTTP request headers. + """ + return {"Content-Type": "application/json"} + + +@pytest.fixture +def consistent_payload(): + """ + Returns a fixed payload for consistency testing, + including a fixed random seed and temperature. + """ + return { + "messages": [ + { + "role": "user", + "content": "用一句话介绍 PaddlePaddle, 30字以内 /no_think", + } + ], + "temperature": 0.8, + "top_p": 0, # fix top_p to reduce randomness + "seed": 13, # fixed random seed + } + + +# ========================== +# Helper function to calculate difference rate between two texts +# ========================== +def calculate_diff_rate(text1, text2): + """ + Calculate the difference rate between two strings + based on the normalized Levenshtein edit distance. + Returns a float in [0,1], where 0 means identical. + """ + if text1 == text2: + return 0.0 + + len1, len2 = len(text1), len(text2) + dp = [[0] * (len2 + 1) for _ in range(len1 + 1)] + + for i in range(len1 + 1): + for j in range(len2 + 1): + if i == 0 or j == 0: + dp[i][j] = i + j + elif text1[i - 1] == text2[j - 1]: + dp[i][j] = dp[i - 1][j - 1] + else: + dp[i][j] = 1 + min(dp[i - 1][j], dp[i][j - 1], dp[i - 1][j - 1]) + + edit_distance = dp[len1][len2] + max_len = max(len1, len2) + return edit_distance / max_len if max_len > 0 else 0.0 + + +# ========================== +# Consistency test for repeated runs with fixed payload +# ========================== +def test_consistency_between_runs(api_url, headers, consistent_payload): + """ + Test that two runs with the same fixed input produce similar outputs. + """ + # First request + resp1 = requests.post(api_url, headers=headers, json=consistent_payload) + assert resp1.status_code == 200 + result1 = resp1.json() + content1 = result1["choices"][0]["message"]["content"] + + # Second request + resp2 = requests.post(api_url, headers=headers, json=consistent_payload) + assert resp2.status_code == 200 + result2 = resp2.json() + content2 = result2["choices"][0]["message"]["content"] + + # Calculate difference rate + diff_rate = calculate_diff_rate(content1, content2) + + # Verify that the difference rate is below the threshold + assert diff_rate < 0.05, f"Output difference too large ({diff_rate:.4%})" + + +# ========================== +# think Prompt Test +# ========================== + + +def test_thinking_prompt(api_url, headers): + """ + Test case to verify normal 'thinking' behavior (no '/no_think' appended). + """ + messages = [{"role": "user", "content": "北京天安门在哪里"}] + + payload = { + "messages": messages, + "max_tokens": 100, + "temperature": 0.8, + "top_p": 0.01, + } + + resp = requests.post(api_url, headers=headers, json=payload) + assert resp.status_code == 200, f"Unexpected status code: {resp.status_code}" + + try: + response_json = resp.json() + except Exception as e: + assert False, f"Response is not valid JSON: {e}" + + content = response_json.get("choices", [{}])[0].get("message", {}).get("content", "").lower() + assert "天安门" in content or "北京" in content, "Expected a location-related response with reasoning" + + +# ========================== +# no_think Prompt Test +# ========================== + + +def test_non_thinking_prompt(api_url, headers): + """ + Test case to verify non-thinking behavior (with '/no_think'). + """ + messages = [{"role": "user", "content": "北京天安门在哪里 /no_think"}] + + payload = { + "messages": messages, + "max_tokens": 100, + "temperature": 0.8, + "top_p": 0.01, + } + + resp = requests.post(api_url, headers=headers, json=payload) + assert resp.status_code == 200, f"Unexpected status code: {resp.status_code}" + + try: + response_json = resp.json() + except Exception as e: + assert False, f"Response is not valid JSON: {e}" + + content = response_json.get("choices", [{}])[0].get("message", {}).get("content", "").lower() + assert not any( + x in content for x in ["根据", "我认为", "推测", "可能"] + ), "Expected no reasoning in non-thinking response" + + +def test_profile_reset_block_num(): + """测试profile reset_block_num功能,与baseline diff不能超过5%""" + log_file = "./log/config.log" + baseline = 40000 + + if not os.path.exists(log_file): + pytest.fail(f"Log file not found: {log_file}") + + with open(log_file, "r") as f: + log_lines = f.readlines() + + target_line = None + for line in log_lines: + if "Reset block num" in line: + target_line = line.strip() + break + + if target_line is None: + pytest.fail("日志中没有Reset block num信息") + + match = re.search(r"total_block_num:(\d+)", target_line) + if not match: + pytest.fail(f"Failed to extract total_block_num from line: {target_line}") + + try: + actual_value = int(match.group(1)) + except ValueError: + pytest.fail(f"Invalid number format: {match.group(1)}") + + lower_bound = baseline * (1 - 0.05) + upper_bound = baseline * (1 + 0.05) + print(f"Reset total_block_num: {actual_value}. baseline: {baseline}") + + assert lower_bound <= actual_value <= upper_bound, ( + f"Reset total_block_num {actual_value} 与 baseline {baseline} diff需要在5%以内" + f"Allowed range: [{lower_bound:.1f}, {upper_bound:.1f}]" + ) + + +def test_thinking_with_stop_token_ids(api_url, headers): + """ + Test case to verify thinking behavior when stop token ids are provided. + """ + messages = [{"role": "user", "content": "北京天安门在哪里"}] + + payload = { + "messages": messages, + "max_tokens": 100, + "temperature": 0.8, + "seed": 1, + "stop_token_ids": [105930], + } + + resp = requests.post(api_url, headers=headers, json=payload) + assert resp.status_code == 200, f"Unexpected status code: {resp.status_code}" + + try: + response_json = resp.json() + except Exception as e: + assert False, f"Response is not valid JSON: {e}" + + content = response_json.get("choices", [{}])[0].get("message", {}).get("content", "") + + expected_output = "\n好的,用户问“北京天安门在哪里" + assert content == expected_output, ( + f"Unexpected response content.\n" f"Expected: {expected_output!r}\n" f"Actual: {content!r}" + )