From a6351dea0b30fee8e9c4e663b14e445c29020eb4 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Mon, 16 Mar 2026 21:32:43 +0800 Subject: [PATCH] [BugFix][Optimization] Replace silent failures with catchable exceptions and informative error messages (#6533) * init * init * fix format * add * add files * add ut * fix some * add ut * add more * add * fix pre-commit * fix pre-commit * fix cover * skip long seq * add * add * fix * remove not need * fix set attr * fix comments * fix comments * fix failed tests --------- Co-authored-by: gongweibao --- custom_ops/gpu_ops/cuda_multiprocess.h | 20 +- .../gpu_ops/custom_all_reduce/all_reduce.cu | 8 +- .../gpu_ops/custom_all_reduce/all_reduce.cuh | 20 +- custom_ops/gpu_ops/get_data_ptr_ipc.cu | 7 +- .../ipc_sent_key_value_cache_by_remote_ptr.cu | 11 +- custom_ops/gpu_ops/mla_attn/kernel_traits.cuh | 7 +- custom_ops/gpu_ops/read_data_ipc.cu | 8 +- custom_ops/gpu_ops/set_data_ipc.cu | 8 +- custom_ops/gpu_ops/share_external_data.cu | 8 +- custom_ops/gpu_ops/swap_cache.cu | 25 +-- custom_ops/gpu_ops/swap_cache_batch.cu | 21 +- custom_ops/gpu_ops/tune_cublaslt_gemm.cu | 14 +- fastdeploy/cache_manager/ops.py | 4 +- fastdeploy/distributed/communication.py | 28 ++- .../custom_all_reduce/custom_all_reduce.py | 8 +- fastdeploy/engine/async_llm.py | 5 +- fastdeploy/engine/expert_service.py | 9 +- fastdeploy/engine/resource_manager.py | 2 +- .../ernie_45_vl_thinking_tool_parser.py | 3 +- .../tool_parsers/ernie_x1_tool_parser.py | 3 +- fastdeploy/envs.py | 50 +++-- fastdeploy/input/preprocess.py | 4 +- fastdeploy/input/tokenzier_client.py | 10 +- .../inter_communicator/engine_worker_queue.py | 4 +- .../layers/attention/flash_attn_backend.py | 6 +- .../layers/attention/mla_attention_backend.py | 4 +- .../attention/moba_attention_backend.py | 4 +- .../batch_invariant_ops.py | 7 +- .../layers/quantization/__init__.py | 5 +- .../quantization/ops/cutlass_scaled_mm.py | 4 +- .../model_executor/models/model_base.py | 4 +- .../ops/triton_ops/triton_utils_v2.py | 4 +- fastdeploy/model_executor/utils.py | 4 +- fastdeploy/multimodal/utils.py | 4 +- fastdeploy/router/router.py | 6 +- fastdeploy/router/utils.py | 4 +- fastdeploy/scheduler/config.py | 2 +- fastdeploy/spec_decode/mtp.py | 4 +- fastdeploy/trace/trace_logger.py | 4 +- fastdeploy/worker/gcu_model_runner.py | 4 +- fastdeploy/worker/gpu_model_runner.py | 4 +- fastdeploy/worker/gpu_worker.py | 3 +- fastdeploy/worker/hpu_model_runner.py | 4 +- fastdeploy/worker/iluvatar_model_runner.py | 4 +- fastdeploy/worker/metax_model_runner.py | 4 +- fastdeploy/worker/xpu_model_runner.py | 4 +- fastdeploy/worker/xpu_worker.py | 3 +- .../test_communication_fallback.py | 182 +++++++++++++++++ .../test_custom_allreduce_guard.py | 77 ++++++++ tests/engine/test_expert_service.py | 49 +++++ .../tool_parsers/test_ernie_x1_tool_parser.py | 55 ++++-- tests/input/test_preprocess.py | 87 +++++++++ tests/input/test_tokenizer_client.py | 60 ++++++ .../ops/triton_ops/test_triton_utils_v2.py | 39 ++++ .../test_model_executor_utils.py | 184 ++++++++++++++++++ tests/quantization/test_quantization_init.py | 60 ++++++ tests/router/test_router.py | 148 ++++++++++++++ tests/router/test_router_utils.py | 102 ++++++++++ tests/scheduler/test_scheduler_config.py | 152 +++++++++++++-- tests/spec_decode/test_mtp_proposer.py | 20 ++ tests/test_envs.py | 168 ++++++++++++++++ 61 files changed, 1595 insertions(+), 171 deletions(-) create mode 100644 tests/distributed/test_communication_fallback.py create mode 100644 tests/distributed/test_custom_allreduce_guard.py create mode 100644 tests/input/test_preprocess.py create mode 100644 tests/model_executor/test_model_executor_utils.py create mode 100644 tests/quantization/test_quantization_init.py create mode 100644 tests/router/test_router.py create mode 100644 tests/router/test_router_utils.py create mode 100644 tests/test_envs.py diff --git a/custom_ops/gpu_ops/cuda_multiprocess.h b/custom_ops/gpu_ops/cuda_multiprocess.h index c8a138e133..a001b601f4 100644 --- a/custom_ops/gpu_ops/cuda_multiprocess.h +++ b/custom_ops/gpu_ops/cuda_multiprocess.h @@ -41,6 +41,8 @@ #include #include #endif +#include +#include #include #ifdef PADDLE_WITH_HIP @@ -52,16 +54,14 @@ namespace cub = hipcub; #define GPU(str) cuda##str #endif -#define checkCudaErrors(call) \ - do { \ - GPU(Error_t) err = call; \ - if (err != GPU(Success)) { \ - printf("CUDA error at %s %d: %s\n", \ - __FILE__, \ - __LINE__, \ - GPU(GetErrorString)(err)); \ - exit(EXIT_FAILURE); \ - } \ +#define checkCudaErrors(call) \ + do { \ + GPU(Error_t) err = call; \ + if (err != GPU(Success)) { \ + throw std::runtime_error(std::string("CUDA error at ") + __FILE__ + \ + ":" + std::to_string(__LINE__) + " '" + \ + GPU(GetErrorString)(err) + "'"); \ + } \ } while (0) typedef struct shmStruct_st { diff --git a/custom_ops/gpu_ops/custom_all_reduce/all_reduce.cu b/custom_ops/gpu_ops/custom_all_reduce/all_reduce.cu index ca659c0074..f3143c423a 100644 --- a/custom_ops/gpu_ops/custom_all_reduce/all_reduce.cu +++ b/custom_ops/gpu_ops/custom_all_reduce/all_reduce.cu @@ -63,8 +63,8 @@ void decode_alltoall_transpose(paddle::Tensor& inp, auto hidden_size = inp.shape()[1]; auto reg_buffer = reinterpret_cast(_reg_buffer); if (reg_buffer) { - cudaMemcpyAsync( - reg_buffer, inp.data(), input_size, cudaMemcpyDeviceToDevice, stream); + CUDACHECK(cudaMemcpyAsync( + reg_buffer, inp.data(), input_size, cudaMemcpyDeviceToDevice, stream)); } else { reg_buffer = inp.data(); } @@ -124,8 +124,8 @@ void all_reduce(paddle::Tensor& inp, auto input_size = inp.numel() * phi::SizeOf(inp.dtype()); auto reg_buffer = reinterpret_cast(_reg_buffer); if (reg_buffer) { - cudaMemcpyAsync( - reg_buffer, inp.data(), input_size, cudaMemcpyDeviceToDevice, stream); + CUDACHECK(cudaMemcpyAsync( + reg_buffer, inp.data(), input_size, cudaMemcpyDeviceToDevice, stream)); } else { reg_buffer = inp.data(); } diff --git a/custom_ops/gpu_ops/custom_all_reduce/all_reduce.cuh b/custom_ops/gpu_ops/custom_all_reduce/all_reduce.cuh index 744d0576f5..cb4c25bcf4 100644 --- a/custom_ops/gpu_ops/custom_all_reduce/all_reduce.cuh +++ b/custom_ops/gpu_ops/custom_all_reduce/all_reduce.cuh @@ -22,19 +22,19 @@ #include #include #include +#include +#include #include #include -#define CUDACHECK(cmd) \ - do { \ - cudaError_t e = cmd; \ - if (e != cudaSuccess) { \ - printf("Failed: Cuda error %s:%d '%s'\n", \ - __FILE__, \ - __LINE__, \ - cudaGetErrorString(e)); \ - exit(EXIT_FAILURE); \ - } \ +#define CUDACHECK(cmd) \ + do { \ + cudaError_t e = cmd; \ + if (e != cudaSuccess) { \ + throw std::runtime_error(std::string("CUDA error at ") + __FILE__ + \ + ":" + std::to_string(__LINE__) + " '" + \ + cudaGetErrorString(e) + "'"); \ + } \ } while (0) namespace paddle { diff --git a/custom_ops/gpu_ops/get_data_ptr_ipc.cu b/custom_ops/gpu_ops/get_data_ptr_ipc.cu index 1b1eddc5ae..b866785429 100644 --- a/custom_ops/gpu_ops/get_data_ptr_ipc.cu +++ b/custom_ops/gpu_ops/get_data_ptr_ipc.cu @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include "cuda_multiprocess.h" #include "helper.h" @@ -40,9 +41,9 @@ std::vector GetDataPtrIpc(const paddle::Tensor &tmp_input, volatile shmStruct *shm = NULL; sharedMemoryInfo info; if (sharedMemoryOpen2(shm_name.c_str(), sizeof(shmStruct), &info) != 0) { - printf("Failed to create shared memory slab\n"); - printf("Func GetDataPtrIpc. Shm_name: %s\n", shm_name.c_str()); - exit(EXIT_FAILURE); + throw std::runtime_error( + "Failed to open shared memory slab in GetDataPtrIpc, shm_name: " + + shm_name + ", errno: " + std::string(strerror(errno))); } shm = (volatile shmStruct *)info.addr; void *ptr = nullptr; diff --git a/custom_ops/gpu_ops/ipc_sent_key_value_cache_by_remote_ptr.cu b/custom_ops/gpu_ops/ipc_sent_key_value_cache_by_remote_ptr.cu index 1ee35b21ca..f56982747d 100644 --- a/custom_ops/gpu_ops/ipc_sent_key_value_cache_by_remote_ptr.cu +++ b/custom_ops/gpu_ops/ipc_sent_key_value_cache_by_remote_ptr.cu @@ -14,6 +14,8 @@ // limitations under the License. #include #include +#include +#include #include "fstream" #include "helper.h" #include "iomanip" @@ -136,7 +138,9 @@ void sent_key_value_by_remote_ptr( #endif cudaError_t err = cudaGetLastError(); if (err != cudaSuccess) { - printf("CUDA Error: %s\n", cudaGetErrorString(err)); + throw std::runtime_error( + std::string("CUDA Error in IPC KV cache transfer: ") + + cudaGetErrorString(err)); } #ifdef DEBUG_IPC_SENT_SYNC_AND_PRINT cudaDeviceSynchronize(); @@ -325,8 +329,11 @@ void SentKeyValueByRemotePtr(const paddle::Tensor& local_key_tensor, reinterpret_cast((void*)remote_value_ptr), cuda_stream); } + default: { + PD_THROW("Unsupported dtype for IPC KV cache transfer: ", + local_key_tensor.type()); + } } - // using dataT=std::remove_pointer; } void SentKeyValueByRemotePtrBlockSync(const paddle::Tensor& local_key_tensor, diff --git a/custom_ops/gpu_ops/mla_attn/kernel_traits.cuh b/custom_ops/gpu_ops/mla_attn/kernel_traits.cuh index dcbdcc7474..cef2ddbe0a 100644 --- a/custom_ops/gpu_ops/mla_attn/kernel_traits.cuh +++ b/custom_ops/gpu_ops/mla_attn/kernel_traits.cuh @@ -90,13 +90,14 @@ struct AttentionKernelTraits { static constexpr bool USE_TMA_LOAD_KV = USE_TMA_LOAD_KV_; static constexpr int GROUP_SIZE = GROUP_SIZE_; static constexpr int BLOCK_SHAPE_Q = BLOCK_SHAPE_Q_; - static_assert(BLOCK_SHAPE_Q % 64 == 0); + static_assert(BLOCK_SHAPE_Q % 64 == 0, + "BLOCK_SHAPE_Q must be a multiple of 64"); static constexpr int BLOCK_SHAPE_KV = BLOCK_SHAPE_KV_; static constexpr int HEAD_DIM_QK = HEAD_DIM_QK_; static constexpr int HEAD_DIM_VO = HEAD_DIM_VO_; static constexpr int NUM_PER_STAGE = BLOCK_SHAPE_KV * HEAD_DIM_QK; - static_assert(HEAD_DIM_QK % 32 == 0); - static_assert(HEAD_DIM_VO % 32 == 0); + static_assert(HEAD_DIM_QK % 32 == 0, "HEAD_DIM_QK must be a multiple of 32"); + static_assert(HEAD_DIM_VO % 32 == 0, "HEAD_DIM_VO must be a multiple of 32"); static constexpr int NUM_WARPS = 12; static constexpr int NUM_THREADS = 384; diff --git a/custom_ops/gpu_ops/read_data_ipc.cu b/custom_ops/gpu_ops/read_data_ipc.cu index fffe29c5d1..4cec9a5c74 100644 --- a/custom_ops/gpu_ops/read_data_ipc.cu +++ b/custom_ops/gpu_ops/read_data_ipc.cu @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "cuda_multiprocess.h" #include "paddle/extension.h" @@ -53,9 +55,9 @@ void ReadDataIpc(const paddle::Tensor &tmp_input, volatile shmStruct *shm = NULL; sharedMemoryInfo info; if (sharedMemoryOpen(shm_name.c_str(), sizeof(shmStruct), &info) != 0) { - printf("Failed to create shared memory slab\n"); - printf("Func ReadDataIpc. Shm_name: %s\n", shm_name.c_str()); - exit(EXIT_FAILURE); + throw std::runtime_error( + "Failed to open shared memory slab in ReadDataIpc, shm_name: " + + shm_name + ", errno: " + std::string(strerror(errno))); } shm = (volatile shmStruct *)info.addr; void *ptr = nullptr; diff --git a/custom_ops/gpu_ops/set_data_ipc.cu b/custom_ops/gpu_ops/set_data_ipc.cu index b8deb0e5d8..22f7e6ee36 100644 --- a/custom_ops/gpu_ops/set_data_ipc.cu +++ b/custom_ops/gpu_ops/set_data_ipc.cu @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include "cuda_multiprocess.h" #include "helper.h" @@ -85,9 +86,10 @@ void set_data_ipc(const paddle::Tensor& tmp_input, sharedMemoryInfo info; volatile shmStruct* shm = NULL; if (sharedMemoryCreate(shm_name.c_str(), sizeof(*shm), &info) != 0) { - printf("Failed to create shared memory slab\n"); - printf("Func sharedMemoryCreate. Shm_name: %s\n", shm_name.c_str()); - exit(EXIT_FAILURE); + throw std::runtime_error( + "Failed to create shared memory slab in sharedMemoryCreate, " + "shm_name: " + + shm_name + ", errno: " + std::string(strerror(errno))); } shm = (volatile shmStruct*)info.addr; memset((void*)shm, 0, sizeof(*shm)); diff --git a/custom_ops/gpu_ops/share_external_data.cu b/custom_ops/gpu_ops/share_external_data.cu index 8fd462582e..5fd6df58f4 100644 --- a/custom_ops/gpu_ops/share_external_data.cu +++ b/custom_ops/gpu_ops/share_external_data.cu @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "helper.h" #include #include #include @@ -22,6 +21,7 @@ #include #include #include "cuda_multiprocess.h" +#include "helper.h" #include "paddle/phi/core/tensor_meta.h" std::vector ShareExternalData(paddle::Tensor &input, @@ -30,9 +30,9 @@ std::vector ShareExternalData(paddle::Tensor &input, volatile shmStruct *shm = NULL; sharedMemoryInfo info; if (sharedMemoryOpen(shm_name.c_str(), sizeof(shmStruct), &info) != 0) { - printf("Failed to create shared memory slab\n"); - printf("Func ShareExternalData. Shm_name: %s\n", shm_name.c_str()); - exit(EXIT_FAILURE); + throw std::runtime_error( + "Failed to open shared memory slab in ShareExternalData, shm_name: " + + shm_name + ", errno: " + std::string(strerror(errno))); } shm = (volatile shmStruct *)info.addr; void *ptr = nullptr; diff --git a/custom_ops/gpu_ops/swap_cache.cu b/custom_ops/gpu_ops/swap_cache.cu index 9a4bed316d..5e72f5fe22 100644 --- a/custom_ops/gpu_ops/swap_cache.cu +++ b/custom_ops/gpu_ops/swap_cache.cu @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "cuda_multiprocess.h" #include "helper.h" #include "paddle/extension.h" @@ -47,24 +48,24 @@ void SwapCacheImpl(const paddle::Tensor& cache_gpu, // gpu auto* cache_gpu_ptr_now = cache_gpu_ptr + gpu_block_id * cache_stride; auto* cache_cpu_ptr_now = cache_cpu_ptr + cpu_block_id * cache_stride; if (mode == 0) { // copy from device to host - cudaMemcpyAsync(cache_cpu_ptr_now, - cache_gpu_ptr_now, - cache_stride * sizeof(DataType_), - cudaMemcpyDeviceToHost, - stream); + checkCudaErrors(cudaMemcpyAsync(cache_cpu_ptr_now, + cache_gpu_ptr_now, + cache_stride * sizeof(DataType_), + cudaMemcpyDeviceToHost, + stream)); // cudaMemcpy(cache_dst_ptr_now, cache_src_ptr_now, cache_stride * // sizeof(DataType_), cudaMemcpyDeviceToHost); } else { // copy from host to device - cudaMemcpyAsync(cache_gpu_ptr_now, - cache_cpu_ptr_now, - cache_stride * sizeof(DataType_), - cudaMemcpyHostToDevice, - stream); + checkCudaErrors(cudaMemcpyAsync(cache_gpu_ptr_now, + cache_cpu_ptr_now, + cache_stride * sizeof(DataType_), + cudaMemcpyHostToDevice, + stream)); // cudaMemcpy(cache_dst_ptr_now, cache_src_ptr_now, cache_stride * // sizeof(DataType_), cudaMemcpyHostToDevice); } } - cudaStreamSynchronize(stream); + checkCudaErrors(cudaStreamSynchronize(stream)); } void SwapCache(const paddle::Tensor& cache_gpu, // gpu @@ -74,7 +75,7 @@ void SwapCache(const paddle::Tensor& cache_gpu, // gpu const std::vector& swap_block_ids_cpu, int rank, int mode) { - cudaSetDevice(rank); // used for distributed launch + checkCudaErrors(cudaSetDevice(rank)); // used for distributed launch switch (cache_gpu.dtype()) { case paddle::DataType::BFLOAT16: return SwapCacheImpl(cache_gpu, diff --git a/custom_ops/gpu_ops/swap_cache_batch.cu b/custom_ops/gpu_ops/swap_cache_batch.cu index 82cedd926d..d8c2c1d59b 100644 --- a/custom_ops/gpu_ops/swap_cache_batch.cu +++ b/custom_ops/gpu_ops/swap_cache_batch.cu @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "cuda_multiprocess.h" #include "helper.h" #include "paddle/extension.h" @@ -74,19 +75,19 @@ void SwapCacheImplAllLayers( auto* cache_cpu_ptr_now = cache_cpu_ptr + first_cpu_block_id * cache_stride; if (mode == 0) { // copy from device to host - cudaMemcpyAsync( + checkCudaErrors(cudaMemcpyAsync( cache_cpu_ptr_now, cache_gpu_ptr_now, cache_stride * sizeof(DataType_) * consecutive_block_count, cudaMemcpyDeviceToHost, - stream); + stream)); } else { // copy from host to device - cudaMemcpyAsync( + checkCudaErrors(cudaMemcpyAsync( cache_gpu_ptr_now, cache_cpu_ptr_now, cache_stride * sizeof(DataType_) * consecutive_block_count, cudaMemcpyHostToDevice, - stream); + stream)); } first_gpu_block_id = gpu_block_id; first_cpu_block_id = cpu_block_id; @@ -100,22 +101,22 @@ void SwapCacheImplAllLayers( auto* cache_gpu_ptr_now = cache_gpu_ptr + first_gpu_block_id * cache_stride; auto* cache_cpu_ptr_now = cache_cpu_ptr + first_cpu_block_id * cache_stride; if (mode == 0) { // copy from device to host - cudaMemcpyAsync( + checkCudaErrors(cudaMemcpyAsync( cache_cpu_ptr_now, cache_gpu_ptr_now, cache_stride * sizeof(DataType_) * consecutive_block_count, cudaMemcpyDeviceToHost, - stream); + stream)); } else { // copy from host to device - cudaMemcpyAsync( + checkCudaErrors(cudaMemcpyAsync( cache_gpu_ptr_now, cache_cpu_ptr_now, cache_stride * sizeof(DataType_) * consecutive_block_count, cudaMemcpyHostToDevice, - stream); + stream)); } } - cudaStreamSynchronize(stream); + checkCudaErrors(cudaStreamSynchronize(stream)); } void SwapCacheAllLayers( @@ -126,7 +127,7 @@ void SwapCacheAllLayers( const std::vector& swap_block_ids_cpu, int rank, int mode) { - cudaSetDevice(rank); // used for distributed launch + checkCudaErrors(cudaSetDevice(rank)); // used for distributed launch assert(cache_gpu_tensors.size() > 0 && cache_gpu_tensors.size() == cache_cpu_ptrs.size()); switch (cache_gpu_tensors[0].dtype()) { diff --git a/custom_ops/gpu_ops/tune_cublaslt_gemm.cu b/custom_ops/gpu_ops/tune_cublaslt_gemm.cu index f3b1e3aca2..b2c370de63 100644 --- a/custom_ops/gpu_ops/tune_cublaslt_gemm.cu +++ b/custom_ops/gpu_ops/tune_cublaslt_gemm.cu @@ -734,10 +734,12 @@ void TuneCublasltGemm(const paddle::Tensor& K, const bool is_test, const bool is_read_from_file, const std::string& path) { - assert(M_end >= M_start); - assert(M_start >= 1); - assert(K.dims().size() == 1 && N.dims().size() == 1); - assert(is_test != is_read_from_file); + assert(M_end >= M_start && "M_end must be >= M_start"); + assert(M_start >= 1 && "M_start must be >= 1"); + assert(K.dims().size() == 1 && N.dims().size() == 1 && + "K and N must be 1D tensors"); + assert(is_test != is_read_from_file && + "Exactly one of is_test or is_read_from_file must be true"); auto K_cpu = K.copy_to(paddle::CPUPlace(), false); auto N_cpu = N.copy_to(paddle::CPUPlace(), false); @@ -746,7 +748,7 @@ void TuneCublasltGemm(const paddle::Tensor& K, int K_size = K.numel(); int N_size = N.numel(); - assert(K_size == N_size); + assert(K_size == N_size && "K and N must have the same number of elements"); std::vector mm; int m = M_start, step = 1; @@ -796,7 +798,7 @@ void TuneCublasltGemm(const paddle::Tensor& K, path); } else { // other dtype - throw std::runtime_error(dtype + "not currently supported"); + throw std::runtime_error(dtype + " is not currently supported"); } } } diff --git a/fastdeploy/cache_manager/ops.py b/fastdeploy/cache_manager/ops.py index 370188d217..ff52a8b861 100644 --- a/fastdeploy/cache_manager/ops.py +++ b/fastdeploy/cache_manager/ops.py @@ -19,6 +19,7 @@ import os import paddle from fastdeploy.platforms import current_platform +from fastdeploy.utils import llm_logger as logger try: if current_platform.is_cuda(): @@ -120,7 +121,8 @@ try: else: return "CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7" -except: +except Exception as e: + logger.warning(f"Failed to import cache manager ops: {e}") cuda_host_alloc = None cuda_host_free = None set_data_ipc = None diff --git a/fastdeploy/distributed/communication.py b/fastdeploy/distributed/communication.py index 7a3f0b8aea..aa0164f9ea 100644 --- a/fastdeploy/distributed/communication.py +++ b/fastdeploy/distributed/communication.py @@ -21,7 +21,9 @@ import paddle.distributed as dist from paddle.distributed import fleet import fastdeploy.envs as envs -from fastdeploy.utils import register_custom_python_op +from fastdeploy.utils import get_logger, register_custom_python_op + +logger = get_logger("communication") # Constants SUPPORTED_DTYPES = (paddle.float32, paddle.float16, paddle.bfloat16) @@ -181,8 +183,17 @@ try: input_ = _TP_AR.decode_alltoall_transpose(input_, out) return input_ -except: - tensor_model_parallel_all_reduce = None +except Exception as e: + logger.warning(f"Failed to register tensor_model_parallel_all_reduce: {e}") + + _reg_err = e + + def tensor_model_parallel_all_reduce(input_: "paddle.Tensor", group_=None) -> "paddle.Tensor": + raise RuntimeError(f"tensor_model_parallel_all_reduce is not available. Registration failed with: {_reg_err}") + + def decode_alltoall_transpose(input_: "paddle.Tensor", out=None) -> "paddle.Tensor": + raise RuntimeError(f"decode_alltoall_transpose is not available. Registration failed with: {_reg_err}") + from paddle.distributed.communication import stream from paddle.distributed.communication.reduce import ReduceOp @@ -209,5 +220,12 @@ try: else: dist.all_reduce(input_) -except: - tensor_model_parallel_all_reduce_custom = None +except Exception as e: + logger.warning(f"Failed to register tensor_model_parallel_all_reduce_custom: {e}") + + _reg_err2 = e + + def tensor_model_parallel_all_reduce_custom(input_: "paddle.Tensor") -> "paddle.Tensor": + raise RuntimeError( + f"tensor_model_parallel_all_reduce_custom is not available. Registration failed with: {_reg_err2}" + ) diff --git a/fastdeploy/distributed/custom_all_reduce/custom_all_reduce.py b/fastdeploy/distributed/custom_all_reduce/custom_all_reduce.py index 9c8b65f43a..65bb7ad13d 100644 --- a/fastdeploy/distributed/custom_all_reduce/custom_all_reduce.py +++ b/fastdeploy/distributed/custom_all_reduce/custom_all_reduce.py @@ -35,11 +35,13 @@ from fastdeploy.model_executor.ops.gpu import ( register_buffer, register_graph_buffers, ) +from fastdeploy.utils import llm_logger as logger try: meta_size() custom_ar = True -except Exception: +except Exception as e: + logger.debug(f"Custom allreduce not available: {e}") custom_ar = False _instances = [] @@ -61,6 +63,7 @@ class CustomAllreduce: """ self.capturing = False self.group = group + self._initialized = False if not custom_ar: # disable because of missing custom allreduce library @@ -102,6 +105,7 @@ class CustomAllreduce: self._ptr = init_custom_all_reduce(self.meta_ptrs, self.rank_data, rank, self.full_nvlink) register_buffer(self._ptr, self.buffer_ptrs) + self._initialized = True _instances.append(self) @staticmethod @@ -134,6 +138,8 @@ class CustomAllreduce: lib.cudaFree(ctypes.c_void_p(pointers[rank])) def should_custom_ar(self, inp: paddle.Tensor): + if not self._initialized: + return False inp_size = tensor_byte_size(inp) if inp_size > self.max_size: return False diff --git a/fastdeploy/engine/async_llm.py b/fastdeploy/engine/async_llm.py index a83c9dda31..25adcb877f 100644 --- a/fastdeploy/engine/async_llm.py +++ b/fastdeploy/engine/async_llm.py @@ -198,7 +198,9 @@ class EngineServiceClient: suffix=ipc_suffix, create=False, ) - except: + except ( + Exception + ): # IPCSignal may not yet be created by workers; broad except covers platform-specific IPC errors # Signal not ready yet time.sleep(wait_interval) elapsed_time += wait_interval @@ -523,7 +525,6 @@ class AsyncLLM(EngineServiceClient): remaining = num_choices while remaining > 0: response_list = await response_queue.get() - for response_item in response_list: if ( isinstance(response_item, dict) or isinstance(response_item, Request) diff --git a/fastdeploy/engine/expert_service.py b/fastdeploy/engine/expert_service.py index 96778d4c7f..81fe93e52a 100644 --- a/fastdeploy/engine/expert_service.py +++ b/fastdeploy/engine/expert_service.py @@ -152,6 +152,7 @@ class ExpertService: if self.do_profile: get_profile_block_num = np.zeros([1], dtype=np.int32) + attempt = 0 while True: try: self.get_profile_block_num_signal = IPCSignal( @@ -162,7 +163,13 @@ class ExpertService: create=False, ) break - except: + except Exception as e: + attempt += 1 + if attempt % 30 == 0: + console_logger.warning( + f"Waiting for IPC signal 'get_profile_block_num' to be created, " + f"retried {attempt} times: {e}" + ) time.sleep(1) self.reset_kvcache_blocks() diff --git a/fastdeploy/engine/resource_manager.py b/fastdeploy/engine/resource_manager.py index b7dbb61c6b..609c88533b 100644 --- a/fastdeploy/engine/resource_manager.py +++ b/fastdeploy/engine/resource_manager.py @@ -131,7 +131,7 @@ class ResourceManager: elif required_type == "decoder": block_num = self.get_decoder_block_number() else: - raise ValueError("unknown required type") + raise ValueError(f"unknown required type: '{required_type}', expected 'all', 'encoder', or 'decoder'") block_list = list() current_block_num = self.available_block_num() diff --git a/fastdeploy/entrypoints/openai/tool_parsers/ernie_45_vl_thinking_tool_parser.py b/fastdeploy/entrypoints/openai/tool_parsers/ernie_45_vl_thinking_tool_parser.py index 1cb8c0ab71..79eb3058b1 100644 --- a/fastdeploy/entrypoints/openai/tool_parsers/ernie_45_vl_thinking_tool_parser.py +++ b/fastdeploy/entrypoints/openai/tool_parsers/ernie_45_vl_thinking_tool_parser.py @@ -164,7 +164,8 @@ class Ernie45VLThinkingToolParser(ToolParser): if args_match: try: tool_data["arguments"] = partial_json_parser.loads(args_match.group(1), flags=flags) - except: + except Exception as e: + data_processor_logger.debug(f"Failed to parse tool arguments: {e}") tool_data["arguments"] = None if isinstance(tool_data, dict): diff --git a/fastdeploy/entrypoints/openai/tool_parsers/ernie_x1_tool_parser.py b/fastdeploy/entrypoints/openai/tool_parsers/ernie_x1_tool_parser.py index 8a14abee87..3427a1814f 100644 --- a/fastdeploy/entrypoints/openai/tool_parsers/ernie_x1_tool_parser.py +++ b/fastdeploy/entrypoints/openai/tool_parsers/ernie_x1_tool_parser.py @@ -162,7 +162,8 @@ class ErnieX1ToolParser(ToolParser): if args_match: try: tool_data["arguments"] = partial_json_parser.loads(args_match.group(1), flags=flags) - except: + except Exception as e: + data_processor_logger.debug(f"Failed to parse tool arguments: {e}") tool_data["arguments"] = None if isinstance(tool_data, dict): diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 3ab5061e78..de210bd722 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -16,6 +16,8 @@ Environment variables used by FastDeploy. """ import os +import sys +from types import ModuleType from typing import Any, Callable @@ -194,7 +196,7 @@ environment_variables: dict[str, Callable[[], Any]] = { "FMQ_CONFIG_JSON": lambda: os.getenv("FMQ_CONFIG_JSON", None), "FD_OTLP_EXPORTER_SCHEDULE_DELAY_MILLIS": lambda: int(os.getenv("FD_OTLP_EXPORTER_SCHEDULE_DELAY_MILLIS", "500")), "FD_OTLP_EXPORTER_MAX_EXPORT_BATCH_SIZE": lambda: int(os.getenv("FD_OTLP_EXPORTER_MAX_EXPORT_BATCH_SIZE", "64")), - "FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT": lambda: int(os.getenv("FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT", "120")), + "FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT": lambda: float(os.getenv("FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT", "120")), "FD_XPU_MOE_FFN_QUANT_TYPE_MAP": lambda: os.getenv("FD_XPU_MOE_FFN_QUANT_TYPE_MAP", ""), # Whether to enable low latency in mixed scenario "FD_XPU_ENABLE_MIXED_EP_MODE": lambda: bool(int(os.getenv("FD_XPU_ENABLE_MIXED_EP_MODE", "0"))), @@ -241,13 +243,6 @@ environment_variables: dict[str, Callable[[], Any]] = { } -def __getattr__(name: str): - # lazy evaluation of environment variables - if name in environment_variables: - return environment_variables[name]() - raise AttributeError(f"module {__name__!r} has no attribute {name!r}") - - def get_unique_name(self, name): """ Get unique name for config @@ -256,10 +251,39 @@ def get_unique_name(self, name): return name + f"_{shm_uuid}" -def __setattr__(name: str, value: Any): - assert name in environment_variables - environment_variables[name] = lambda: value +class _EnvsModule(ModuleType): + """Custom module class to support __setattr__ for environment variables.""" + + def __getattr__(self, name: str): + if name in environment_variables: + return environment_variables[name]() + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + def __setattr__(self, name: str, value: Any): + if name in environment_variables: + # Convert bool to "1"/"0" so int(os.getenv(...)) works correctly + if isinstance(value, bool): + value = int(value) + os.environ[name] = str(value) + elif name.startswith("_"): + # Allow Python-internal attrs (__spec__, __loader__, etc.) + super().__setattr__(name, value) + else: + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + def __delattr__(self, name: str): + # Support unittest.mock.patch cleanup which calls delattr to restore original state + if name in environment_variables: + os.environ.pop(name, None) + elif name.startswith("_"): + super().__delattr__(name) + else: + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + def __dir__(self): + return list(environment_variables.keys()) -def __dir__(): - return list(environment_variables.keys()) +# Replace the module with our custom class +_current_module = sys.modules[__name__] +_current_module.__class__ = _EnvsModule diff --git a/fastdeploy/input/preprocess.py b/fastdeploy/input/preprocess.py index d91b76a7f4..04c028d906 100644 --- a/fastdeploy/input/preprocess.py +++ b/fastdeploy/input/preprocess.py @@ -20,6 +20,7 @@ from fastdeploy.config import ErnieArchitectures, ModelConfig from fastdeploy.entrypoints.openai.tool_parsers import ToolParserManager from fastdeploy.reasoning import ReasoningParserManager from fastdeploy.utils import envs +from fastdeploy.utils import llm_logger as logger class InputPreprocessor: @@ -78,7 +79,8 @@ class InputPreprocessor: tool_parser_obj=tool_parser_obj, mm_processor_kwargs=self.mm_processor_kwargs, ) - except: + except Exception as e: + logger.info(f"Plugin input processor not available ({e}), using built-in processor") if not self.model_config.enable_mm: if not ErnieArchitectures.contains_ernie_arch(architecture): if not envs.ENABLE_V1_DATA_PROCESSOR: diff --git a/fastdeploy/input/tokenzier_client.py b/fastdeploy/input/tokenzier_client.py index cb59c526da..1bee41cbc0 100644 --- a/fastdeploy/input/tokenzier_client.py +++ b/fastdeploy/input/tokenzier_client.py @@ -127,7 +127,7 @@ class AsyncTokenizerClient: elif type == "audio": url = f"{self.base_url}/audio/encode" else: - raise ValueError("Invalid type") + raise ValueError(f"Invalid encode type: '{type}', expected 'image', 'video', or 'audio'") resp = await client.post(url, json=request) resp.raise_for_status() @@ -159,9 +159,9 @@ class AsyncTokenizerClient: elif data.get("state") == "Error": raise RuntimeError(f"Tokenize task failed: {data.get('message')}") - except httpx.RequestError: - # 网络问题时继续轮询 - pass + except httpx.RequestError as e: + # Network error, keep polling + data_processor_logger.debug(f"Request error while polling tokenize task {task_tag}: {e}") # 超时检测 if asyncio.get_event_loop().time() - start_time > self.max_wait: @@ -183,7 +183,7 @@ class AsyncTokenizerClient: elif type == "audio": url = f"{self.base_url}/audio/decode" else: - raise ValueError("Invalid type") + raise ValueError(f"Invalid decode type: '{type}', expected 'image' or 'audio'") for attempt in range(self.max_retries): try: diff --git a/fastdeploy/inter_communicator/engine_worker_queue.py b/fastdeploy/inter_communicator/engine_worker_queue.py index 7489b3ea02..7e3f88486c 100644 --- a/fastdeploy/inter_communicator/engine_worker_queue.py +++ b/fastdeploy/inter_communicator/engine_worker_queue.py @@ -797,7 +797,9 @@ class EngineWorkerQueue: if len(self.finished_add_cache_task_list) > 0: response = self.finished_add_cache_task_list[0] for tmp_response in self.finished_add_cache_task_list: - assert tmp_response == response + assert ( + tmp_response == response + ), f"Inconsistent responses across workers: expected {response}, got {tmp_response}" self.finished_add_cache_task_list[:] = list() self.client_get_finished_add_cache_task_flag[:] = [0] * self.num_client self.can_put_next_add_task_finished_flag.set(1) diff --git a/fastdeploy/model_executor/layers/attention/flash_attn_backend.py b/fastdeploy/model_executor/layers/attention/flash_attn_backend.py index 64aec0889c..a7cd094ac0 100644 --- a/fastdeploy/model_executor/layers/attention/flash_attn_backend.py +++ b/fastdeploy/model_executor/layers/attention/flash_attn_backend.py @@ -25,12 +25,14 @@ from paddleformers.utils.log import logger try: from paddle.nn.functional.flash_attention import flash_attention_v3_varlen -except: +except Exception as e: + logger.debug(f"flash_attention_v3_varlen not available: {e}") flash_attention_v3_varlen = None try: from paddle.nn.functional.flash_attention import flashmask_attention -except: +except Exception as e: + logger.debug(f"flashmask_attention not available: {e}") flashmask_attention = None from fastdeploy.config import FDConfig diff --git a/fastdeploy/model_executor/layers/attention/mla_attention_backend.py b/fastdeploy/model_executor/layers/attention/mla_attention_backend.py index 77156872b8..403374dd05 100644 --- a/fastdeploy/model_executor/layers/attention/mla_attention_backend.py +++ b/fastdeploy/model_executor/layers/attention/mla_attention_backend.py @@ -26,10 +26,12 @@ from typing import TYPE_CHECKING, List, Optional, Tuple import paddle from paddle.nn.functional.flash_attention import flash_attn_unpadded +from paddleformers.utils.log import logger try: from paddle.nn.functional.flash_attention import flash_attention_v3_varlen -except: +except Exception as e: + logger.debug(f"flash_attention_v3_varlen not available: {e}") flash_attention_v3_varlen = None from fastdeploy.model_executor.layers.attention.ops import ( diff --git a/fastdeploy/model_executor/layers/attention/moba_attention_backend.py b/fastdeploy/model_executor/layers/attention/moba_attention_backend.py index dd8b21cba6..31b8e12286 100644 --- a/fastdeploy/model_executor/layers/attention/moba_attention_backend.py +++ b/fastdeploy/model_executor/layers/attention/moba_attention_backend.py @@ -20,10 +20,12 @@ from dataclasses import dataclass from typing import TYPE_CHECKING import paddle +from paddleformers.utils.log import logger try: from fastdeploy.model_executor.ops.gpu import get_cur_cu_seq_len_k, moba_attention -except: +except Exception as e: + logger.debug(f"moba_attention ops not available: {e}") moba_attention = None get_cur_cu_seq_len_k = None diff --git a/fastdeploy/model_executor/layers/batch_invariant_ops/batch_invariant_ops.py b/fastdeploy/model_executor/layers/batch_invariant_ops/batch_invariant_ops.py index 3af4f1d982..7264a361b8 100644 --- a/fastdeploy/model_executor/layers/batch_invariant_ops/batch_invariant_ops.py +++ b/fastdeploy/model_executor/layers/batch_invariant_ops/batch_invariant_ops.py @@ -140,8 +140,8 @@ def get_compute_units(): paddle.device.get_device() # Triton + Paddle may can't get the device device_properties = paddle.cuda.get_device_properties(0) NUM_SMS = device_properties.multi_processor_count - except Exception: - logger.warning("Could not get CUDA device properties. Falling back to CPU threads.") + except Exception as e: + logger.warning(f"Could not get CUDA device properties ({e}), falling back to CPU core count") # TODO(liujundong): Paddle lacks a torch.get_num_threads() equivalent for the *configured* thread count. # Using os.cpu_count() (total logical cores) as a fallback, which may not be correct. # Must check downstream logic to determine if this impacts correctness. @@ -660,6 +660,9 @@ def mm_batch_invariant(a, b, transpose_x=False, transpose_y=False, out=None): if transpose_y: b = b.T result = matmul_persistent(a, b) + if out is not None: + out.copy_(result, False) + return out return result diff --git a/fastdeploy/model_executor/layers/quantization/__init__.py b/fastdeploy/model_executor/layers/quantization/__init__.py index 430724c7e6..3e9e34c54a 100644 --- a/fastdeploy/model_executor/layers/quantization/__init__.py +++ b/fastdeploy/model_executor/layers/quantization/__init__.py @@ -16,6 +16,8 @@ quantization module """ from typing import List, Type +from paddleformers.utils.log import logger + from fastdeploy import envs from fastdeploy.utils import parse_quantization @@ -91,7 +93,8 @@ def parse_quant_config(args, model_config, is_ernie, is_v1_loader): try: quantization_config.update(args.quantization) quant_config_name = quantization_config["quantization"] - except: + except Exception as e: + logger.warning(f"Failed to parse quantization config normally ({e}), trying fallback") quant_config_name = args.quantization["quantization"] quantization_config["quantization"] = quant_config_name # Special handling for Ernie models diff --git a/fastdeploy/model_executor/layers/quantization/ops/cutlass_scaled_mm.py b/fastdeploy/model_executor/layers/quantization/ops/cutlass_scaled_mm.py index 43ebba7b2b..78321da338 100644 --- a/fastdeploy/model_executor/layers/quantization/ops/cutlass_scaled_mm.py +++ b/fastdeploy/model_executor/layers/quantization/ops/cutlass_scaled_mm.py @@ -62,7 +62,9 @@ def cutlass_scaled_mm( m = a.shape[0] n = b.shape[0] cutlass_compatible_b = b.shape[0] % 16 == 0 and b.shape[1] % 16 == 0 - assert cutlass_compatible_b + assert cutlass_compatible_b, ( + f"Tensor 'b' shape {b.shape} is not compatible with CUTLASS: " f"both dimensions must be multiples of 16" + ) out = paddle.empty([m, n], dtype=out_dtype) fastdeploy.model_executor.ops.gpu.cutlass_scaled_mm(out, a, b, scale_a, scale_b, bias) diff --git a/fastdeploy/model_executor/models/model_base.py b/fastdeploy/model_executor/models/model_base.py index f78572ccc8..d55c88947e 100644 --- a/fastdeploy/model_executor/models/model_base.py +++ b/fastdeploy/model_executor/models/model_base.py @@ -245,7 +245,9 @@ class ModelRegistry: architectures = [architectures] if not architectures: - raise ValueError("No model architectures are specified") + raise ValueError( + "No model architectures are specified. " "Please set 'architectures' in the model's config.json." + ) # First, check if PaddleFormers is explicitly requested if model_config is not None and architectures: diff --git a/fastdeploy/model_executor/ops/triton_ops/triton_utils_v2.py b/fastdeploy/model_executor/ops/triton_ops/triton_utils_v2.py index 98589a4c37..ea57ab9ca3 100644 --- a/fastdeploy/model_executor/ops/triton_ops/triton_utils_v2.py +++ b/fastdeploy/model_executor/ops/triton_ops/triton_utils_v2.py @@ -189,7 +189,7 @@ class KernelInterface: else: const_hint_dict[self.arg_names[i]] = ele else: - assert False + assert False, f"Unsupported constexpr type: {type(ele)} for arg '{self.arg_names[i]}'" else: x_list.append(ele) if isinstance(ele, int): @@ -197,7 +197,7 @@ class KernelInterface: elif isinstance(ele, float): decalare_arg_exclude_constexpr[i] = "const float " + decalare_arg_exclude_constexpr[i] else: - assert False + assert False, f"Unsupported arg type: {type(ele)} for arg '{self.arg_names[i]}'" python_package_name = f"{op_name}_package" tp_rank = paddle.distributed.get_rank() diff --git a/fastdeploy/model_executor/utils.py b/fastdeploy/model_executor/utils.py index 6bb9245d9a..b03fa480d3 100644 --- a/fastdeploy/model_executor/utils.py +++ b/fastdeploy/model_executor/utils.py @@ -51,7 +51,7 @@ class BitMaskTracker: end (int): End index (exclusive) """ if start < 0 or end > self.length or start >= end: - raise ValueError("Invalid mark range") + raise ValueError(f"Invalid mark range: start={start}, end={end}, length={self.length}") block = ((1 << (end - start)) - 1) << start self.mask |= block @@ -82,7 +82,7 @@ class TensorTracker: self.track_dim = 2 if output_dim else 1 self.trackers = [BitMaskTracker(shape[self.track_dim]) for _ in range(batch)] else: - raise ValueError("Only 2D or 3D tensors supported") + raise ValueError(f"Only 2D or 3D tensors supported, got {len(shape)}D tensor with shape={shape}") def mark(self, start: int = 0, end: int = None, batch_id: int = None): """ diff --git a/fastdeploy/multimodal/utils.py b/fastdeploy/multimodal/utils.py index fa3ad4cbe2..3a03d002da 100644 --- a/fastdeploy/multimodal/utils.py +++ b/fastdeploy/multimodal/utils.py @@ -42,7 +42,7 @@ def process_transparency(image): if _is_transparent(image): # Check and fix transparent images data_processor_logger.info("Image has transparent background, adding white background.") image = _convert_transparent_paste(image) - except: - pass + except Exception as e: + data_processor_logger.warning(f"Failed to process image transparency: {e}") return ImageOps.exif_transpose(image) diff --git a/fastdeploy/router/router.py b/fastdeploy/router/router.py index f73144dc53..4832a4c85a 100644 --- a/fastdeploy/router/router.py +++ b/fastdeploy/router/router.py @@ -144,9 +144,9 @@ class Router: """Select one prefill and one decode server""" async with self.lock: if not self.prefill_servers: - raise RuntimeError("No prefill servers available") + raise RuntimeError(f"No prefill servers available (decode={len(self.decode_servers)})") if not self.decode_servers: - raise RuntimeError("No decode servers available") + raise RuntimeError(f"No decode servers available (prefill={len(self.prefill_servers)})") pidx = random.randint(0, len(self.prefill_servers) - 1) didx = random.randint(0, len(self.decode_servers) - 1) return self.prefill_servers[pidx], self.decode_servers[didx] @@ -155,7 +155,7 @@ class Router: """Select one mixed server""" async with self.lock: if not self.mixed_servers: - raise RuntimeError("No mixed servers available") + raise RuntimeError(f"No mixed servers available. Registered mixed servers: {len(self.mixed_servers)}") idx = random.randint(0, len(self.mixed_servers) - 1) return self.mixed_servers[idx] diff --git a/fastdeploy/router/utils.py b/fastdeploy/router/utils.py index e695e1d599..b775c15d9f 100644 --- a/fastdeploy/router/utils.py +++ b/fastdeploy/router/utils.py @@ -54,8 +54,10 @@ class InstanceInfo: # handle default and default_factory if field_def.default is not MISSING: value = field_def.default - else: + elif field_def.default_factory is not MISSING: value = field_def.default_factory() + else: + raise KeyError(f"Missing required field '{name}' in instance info dict") kwargs[name] = value return cls(**kwargs) diff --git a/fastdeploy/scheduler/config.py b/fastdeploy/scheduler/config.py index 7e56eec676..1422b2635f 100644 --- a/fastdeploy/scheduler/config.py +++ b/fastdeploy/scheduler/config.py @@ -229,7 +229,7 @@ class GlobalSchedulerConfig: try: response = r.ping() if not response: - raise Exception("connect to redis failed") + raise ConnectionError(f"Failed to connect to Redis at {self.host}:{self.port}") finally: r.close() diff --git a/fastdeploy/spec_decode/mtp.py b/fastdeploy/spec_decode/mtp.py index 7ebe86b852..5b62985e92 100644 --- a/fastdeploy/spec_decode/mtp.py +++ b/fastdeploy/spec_decode/mtp.py @@ -120,7 +120,9 @@ class MTPProposer(Proposer): elif current_platform.is_cuda() or current_platform.is_maca(): self._propose = self._propose_cuda else: - raise RuntimeError("Unsupported platform.") + raise RuntimeError( + f"Unsupported platform for MTP: {current_platform}. " f"Supported platforms: CUDA, MACA, XPU" + ) self.sampler = MTPSampler(fd_config) self.model_inputs = ProposerInputBatch(self.fd_config, self.target_model_inputs) diff --git a/fastdeploy/trace/trace_logger.py b/fastdeploy/trace/trace_logger.py index 7d7bda32bc..ca04aac9f8 100644 --- a/fastdeploy/trace/trace_logger.py +++ b/fastdeploy/trace/trace_logger.py @@ -44,5 +44,5 @@ def print(event, request_id, user): extra={"attributes": attributes}, stacklevel=2, ) - except: - pass + except Exception as e: + trace_logger.debug(f"Failed to log trace event: {e}") diff --git a/fastdeploy/worker/gcu_model_runner.py b/fastdeploy/worker/gcu_model_runner.py index 85ac7ba2c9..44a8c5f357 100644 --- a/fastdeploy/worker/gcu_model_runner.py +++ b/fastdeploy/worker/gcu_model_runner.py @@ -669,7 +669,9 @@ class GCUModelRunner(ModelRunnerBase): """ Initialize attention backends """ - assert len(self.attn_backends) == 0 + assert ( + len(self.attn_backends) == 0 + ), f"attn_backends should be empty before initialization, got {len(self.attn_backends)} backends" num_heads = self.model_config.num_attention_heads // self.parallel_config.tensor_parallel_size self.model_config.kv_num_heads = max( diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 27a2d9e0dc..2d057d5141 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1490,7 +1490,9 @@ class GPUModelRunner(ModelRunnerBase): """ Initialize attention backends """ - assert len(self.attn_backends) == 0 + assert ( + len(self.attn_backends) == 0 + ), f"attn_backends should be empty before initialization, got {len(self.attn_backends)} backends" num_heads = self.model_config.num_attention_heads // self.parallel_config.tensor_parallel_size self.model_config.kv_num_heads = max( diff --git a/fastdeploy/worker/gpu_worker.py b/fastdeploy/worker/gpu_worker.py index 8fec50541e..be065a9991 100644 --- a/fastdeploy/worker/gpu_worker.py +++ b/fastdeploy/worker/gpu_worker.py @@ -36,7 +36,8 @@ logger = get_logger("gpu_worker", "gpu_worker.log") try: ModelRunner = load_model_runner_plugins() -except: +except Exception as e: + logger.info(f"Plugin ModelRunner not available ({e}), using default GPUModelRunner") from fastdeploy.worker.gpu_model_runner import GPUModelRunner as ModelRunner diff --git a/fastdeploy/worker/hpu_model_runner.py b/fastdeploy/worker/hpu_model_runner.py index bcc0079030..09a2bce6f2 100644 --- a/fastdeploy/worker/hpu_model_runner.py +++ b/fastdeploy/worker/hpu_model_runner.py @@ -1160,7 +1160,9 @@ class HPUModelRunner(ModelRunnerBase): """ Initialize attention backends and forward metadata """ - assert len(self.attn_backends) == 0 + assert ( + len(self.attn_backends) == 0 + ), f"attn_backends should be empty before initialization, got {len(self.attn_backends)} backends" # TODO(gongshaotian): Get rank from config num_heads = self.model_config.num_attention_heads // self.parallel_config.tensor_parallel_size diff --git a/fastdeploy/worker/iluvatar_model_runner.py b/fastdeploy/worker/iluvatar_model_runner.py index 94cfdac2b9..b9ec9f62df 100644 --- a/fastdeploy/worker/iluvatar_model_runner.py +++ b/fastdeploy/worker/iluvatar_model_runner.py @@ -81,7 +81,9 @@ class IluvatarModelRunner(GPUModelRunner): """ Initialize attention backends """ - assert len(self.attn_backends) == 0 + assert ( + len(self.attn_backends) == 0 + ), f"attn_backends should be empty before initialization, got {len(self.attn_backends)} backends" num_heads = self.model_config.num_attention_heads // self.parallel_config.tensor_parallel_size self.model_config.kv_num_heads = max( diff --git a/fastdeploy/worker/metax_model_runner.py b/fastdeploy/worker/metax_model_runner.py index 04a82b17e6..fa7daa41cf 100644 --- a/fastdeploy/worker/metax_model_runner.py +++ b/fastdeploy/worker/metax_model_runner.py @@ -1441,7 +1441,9 @@ class MetaxModelRunner(ModelRunnerBase): """ Initialize attention backends """ - assert len(self.attn_backends) == 0 + assert ( + len(self.attn_backends) == 0 + ), f"attn_backends should be empty before initialization, got {len(self.attn_backends)} backends" num_heads = self.model_config.num_attention_heads // self.parallel_config.tensor_parallel_size self.model_config.kv_num_heads = max( diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index 41b164c74f..a984e8788c 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -1291,7 +1291,9 @@ class XPUModelRunner(ModelRunnerBase): """ Initialize attention backends and forward metadata """ - assert len(self.attn_backends) == 0 + assert ( + len(self.attn_backends) == 0 + ), f"attn_backends should be empty before initialization, got {len(self.attn_backends)} backends" # TODO(gongshaotian): Get rank from config num_heads = self.model_config.num_attention_heads // self.parallel_config.tensor_parallel_size diff --git a/fastdeploy/worker/xpu_worker.py b/fastdeploy/worker/xpu_worker.py index e04f1f3dae..9c0629e148 100644 --- a/fastdeploy/worker/xpu_worker.py +++ b/fastdeploy/worker/xpu_worker.py @@ -34,7 +34,8 @@ logger = get_logger("xpu_worker", "xpu_worker.log") try: XPUModelRunner = load_model_runner_plugins() -except: +except Exception as e: + logger.info(f"Plugin ModelRunner not available ({e}), using default XPUModelRunner") from fastdeploy.worker.xpu_model_runner import XPUModelRunner diff --git a/tests/distributed/test_communication_fallback.py b/tests/distributed/test_communication_fallback.py new file mode 100644 index 0000000000..1c73d2cfc7 --- /dev/null +++ b/tests/distributed/test_communication_fallback.py @@ -0,0 +1,182 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for communication.py error handling improvements (aff1eae8 + 029e4cf8). + +Covers: +1. tensor_byte_size() — pure computation, no mocking needed. +2. The _reg_err closure pattern — 029e4cf8 fixed a Python 3 bug where the + except-block variable `e` was garbage-collected, breaking closures that + reference it. Pure Python tests, no mocking needed. +3. Fallback function behavior — when op registration fails, the fallback + functions must raise RuntimeError with the original error message. + In GPU environments where registration succeeds, these tests are skipped. +""" + +import unittest + +import paddle + +from fastdeploy.distributed.communication import tensor_byte_size + + +# --------------------------------------------------------------------------- +# 1. tensor_byte_size() — behaviour tests +# --------------------------------------------------------------------------- +class TestTensorByteSize(unittest.TestCase): + """tensor_byte_size must return shape-product * element_size.""" + + def test_1d_float32(self): + t = paddle.zeros([10], dtype=paddle.float32) + self.assertEqual(tensor_byte_size(t), 10 * 4) + + def test_2d_float16(self): + t = paddle.zeros([4, 8], dtype=paddle.float16) + self.assertEqual(tensor_byte_size(t), 4 * 8 * 2) + + def test_3d_bfloat16(self): + t = paddle.zeros([2, 3, 4], dtype=paddle.bfloat16) + self.assertEqual(tensor_byte_size(t), 2 * 3 * 4 * 2) + + def test_single_element(self): + t = paddle.zeros([1], dtype=paddle.float32) + self.assertEqual(tensor_byte_size(t), 4) + + def test_matches_numel_times_element_size(self): + """Result must be identical to numel * element_size for arbitrary shapes.""" + cases = [ + ([16], paddle.float32), + ([4, 8], paddle.float16), + ([2, 3, 5], paddle.bfloat16), + ([1, 1, 1, 1], paddle.float32), + ] + for shape, dtype in cases: + t = paddle.zeros(shape, dtype=dtype) + expected = t.numel().item() * t.element_size() + self.assertEqual(tensor_byte_size(t), expected, f"shape={shape}, dtype={dtype}") + + +# --------------------------------------------------------------------------- +# 2. _reg_err closure pattern — pure Python behaviour tests +# --------------------------------------------------------------------------- +class TestRegErrClosurePattern(unittest.TestCase): + """029e4cf8 fixed a closure bug in communication.py. + + In Python 3, the `as` target of an except clause is deleted after + the block exits. Using `_reg_err = e` inside the block preserves + the exception for closures defined alongside it. + """ + + def test_fixed_pattern_preserves_exception(self): + """_reg_err = e keeps the exception accessible after except exits.""" + try: + raise ImportError("simulated op registration failure") + except Exception as e: + _reg_err = e + + def fallback(): + raise RuntimeError(f"Not available. Failed with: {_reg_err}") + + with self.assertRaises(RuntimeError) as ctx: + fallback() + self.assertIn("simulated op registration failure", str(ctx.exception)) + + def test_buggy_pattern_loses_exception(self): + """Direct reference to `e` in closure raises NameError after except block.""" + try: + raise ImportError("original error") + except Exception as e: # noqa: F841 — intentionally "unused"; Python 3 deletes it + + def buggy(): + return str(e) # noqa: F821 — `e` is undefined here, that's the point + + # Python 3 deletes `e` after the except block; closure sees unbound var + with self.assertRaises(NameError): + buggy() + + def test_two_independent_except_blocks(self): + """Each except block must use a separate variable (_reg_err / _reg_err2).""" + try: + raise ValueError("first failure") + except Exception as e: + _reg_err = e + + def fallback1(): + raise RuntimeError(f"first: {_reg_err}") + + try: + raise TypeError("second failure") + except Exception as e: + _reg_err2 = e + + def fallback2(): + raise RuntimeError(f"second: {_reg_err2}") + + with self.assertRaises(RuntimeError) as ctx1: + fallback1() + self.assertIn("first failure", str(ctx1.exception)) + + with self.assertRaises(RuntimeError) as ctx2: + fallback2() + self.assertIn("second failure", str(ctx2.exception)) + + +# --------------------------------------------------------------------------- +# 3. Fallback functions — only testable when op registration failed +# --------------------------------------------------------------------------- +class TestCommunicationFallbackFunctions(unittest.TestCase): + """When op registration fails at import time, calling the functions + must raise RuntimeError containing the original error message. + + In GPU environments where registration succeeds, these tests are skipped. + """ + + def test_fallback_tensor_model_parallel_all_reduce(self): + from fastdeploy.distributed import communication + + if not hasattr(communication, "_reg_err"): + self.skipTest("Op registration succeeded; no fallback to test") + + inp = paddle.zeros([2, 16], dtype=paddle.float16) + with self.assertRaises(RuntimeError) as ctx: + communication.tensor_model_parallel_all_reduce(inp) + self.assertIn("not available", str(ctx.exception)) + self.assertIn("Registration failed with", str(ctx.exception)) + + def test_fallback_decode_alltoall_transpose(self): + from fastdeploy.distributed import communication + + if not hasattr(communication, "_reg_err"): + self.skipTest("Op registration succeeded; no fallback to test") + + inp = paddle.zeros([2, 16], dtype=paddle.float16) + with self.assertRaises(RuntimeError) as ctx: + communication.decode_alltoall_transpose(inp) + self.assertIn("not available", str(ctx.exception)) + + def test_fallback_tensor_model_parallel_all_reduce_custom(self): + from fastdeploy.distributed import communication + + if not hasattr(communication, "_reg_err2"): + self.skipTest("Op registration succeeded; no fallback to test") + + inp = paddle.zeros([2, 16], dtype=paddle.float16) + with self.assertRaises(RuntimeError) as ctx: + communication.tensor_model_parallel_all_reduce_custom(inp) + self.assertIn("not available", str(ctx.exception)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/distributed/test_custom_allreduce_guard.py b/tests/distributed/test_custom_allreduce_guard.py new file mode 100644 index 0000000000..34007d84e2 --- /dev/null +++ b/tests/distributed/test_custom_allreduce_guard.py @@ -0,0 +1,77 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for CustomAllreduce._initialized guard (aff1eae8). + +Behavior under test: + - should_custom_ar() returns False when _initialized is False. + - Construction with custom_ar=True but no distributed environment + leaves _initialized=False (world_size=1 early return). + +Why mock: + - paddle.distributed.get_rank / get_world_size are distributed communication + primitives that require a real multi-GPU NCCL group. We mock them at the + external system boundary so the test runs on a single process. +""" + +import unittest +from unittest.mock import MagicMock, patch + +import paddle + +from fastdeploy.distributed.custom_all_reduce.custom_all_reduce import ( + CustomAllreduce, + custom_ar, +) + + +class TestCustomAllreduceInitializedGuard(unittest.TestCase): + """Behavior: should_custom_ar returns False when not fully initialized.""" + + @unittest.skipUnless(custom_ar, "custom allreduce library not available") + @patch("paddle.distributed.get_world_size", return_value=1) + @patch("paddle.distributed.get_rank", return_value=0) + def test_single_gpu_not_initialized(self, _mock_rank, _mock_ws): + """world_size=1 → constructor returns early → _initialized stays False.""" + fake_group = MagicMock() + ar = CustomAllreduce(group=fake_group, max_size=8192 * 1024) + self.assertFalse(ar._initialized) + + @unittest.skipUnless(custom_ar, "custom allreduce library not available") + @patch("paddle.distributed.get_world_size", return_value=1) + @patch("paddle.distributed.get_rank", return_value=0) + def test_should_custom_ar_false_when_not_initialized(self, _mock_rank, _mock_ws): + """should_custom_ar must return False when _initialized is False.""" + fake_group = MagicMock() + ar = CustomAllreduce(group=fake_group, max_size=8192 * 1024) + + inp = paddle.zeros([4, 1024], dtype=paddle.float16) + self.assertFalse(ar.should_custom_ar(inp)) + + @unittest.skipUnless(custom_ar, "custom allreduce library not available") + @patch("paddle.distributed.get_world_size", return_value=3) + @patch("paddle.distributed.get_rank", return_value=0) + def test_unsupported_world_size_not_initialized(self, _mock_rank, _mock_ws): + """world_size=3 (not in SUPPORTED_WORLD_SIZES) → _initialized stays False.""" + fake_group = MagicMock() + ar = CustomAllreduce(group=fake_group, max_size=8192 * 1024) + self.assertFalse(ar._initialized) + + inp = paddle.zeros([4, 1024], dtype=paddle.float16) + self.assertFalse(ar.should_custom_ar(inp)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/engine/test_expert_service.py b/tests/engine/test_expert_service.py index 8bb8d2259b..f564eb6fd5 100644 --- a/tests/engine/test_expert_service.py +++ b/tests/engine/test_expert_service.py @@ -14,6 +14,9 @@ # limitations under the License. """ +# NOTE: Coverage supplement test — uses mock to reach internal branches +# that are hard to exercise without a full GPU/multi-process environment. + import os import sys import unittest @@ -236,6 +239,52 @@ class TestExpertService(unittest.TestCase): # 验证异常被记录 mock_llm_logger.exception.assert_called_once() + @patch("fastdeploy.engine.expert_service.EngineService") + @patch("fastdeploy.engine.expert_service.time") + @patch("fastdeploy.engine.expert_service.threading") + @patch("fastdeploy.engine.expert_service.envs") + @patch("fastdeploy.engine.expert_service.IPCSignal") + @patch("fastdeploy.engine.expert_service.console_logger") + def test_start_with_profile_retry_logic( + self, mock_console_logger, mock_ipc_signal, mock_envs, mock_threading, mock_time, mock_engine_service + ): + """Test IPCSignal retry logic when do_profile is True (lines 169-172).""" + mock_envs.FD_ENABLE_RETURN_TEXT = False + mock_envs.FD_ENABLE_MULTI_API_SERVER = False + + local_data_parallel_id = 0 + + mock_process = Mock() + mock_process.pid = 1234 + + mock_engine_instance = mock_engine_service.return_value + mock_engine_instance.start_cache_service.return_value = [mock_process] + + # Enable profiling + self.mock_cfg.do_profile = True + + expert_service = ExpertService(self.mock_cfg, local_data_parallel_id) + + # Simulate IPCSignal failing twice then succeeding + call_count = [0] + + def ipc_signal_side_effect(*args, **kwargs): + call_count[0] += 1 + if call_count[0] < 3: + raise RuntimeError("IPCSignal not ready") + return Mock(value=[100]) + + mock_ipc_signal.side_effect = ipc_signal_side_effect + + # Mock time.sleep to avoid actual delays + mock_time.time.return_value = 0 + + result = expert_service.start(None, local_data_parallel_id) + + # Verify retry logic was triggered + self.assertEqual(call_count[0], 3) # Failed twice, succeeded on third try + self.assertTrue(result) + if __name__ == "__main__": unittest.main() diff --git a/tests/entrypoints/openai/tool_parsers/test_ernie_x1_tool_parser.py b/tests/entrypoints/openai/tool_parsers/test_ernie_x1_tool_parser.py index 1b8b58d1e9..c8a24c5707 100644 --- a/tests/entrypoints/openai/tool_parsers/test_ernie_x1_tool_parser.py +++ b/tests/entrypoints/openai/tool_parsers/test_ernie_x1_tool_parser.py @@ -15,7 +15,6 @@ """ import unittest -from unittest.mock import patch from fastdeploy.entrypoints.openai.protocol import ChatCompletionRequest, DeltaMessage from fastdeploy.entrypoints.openai.tool_parsers.ernie_x1_tool_parser import ( @@ -24,21 +23,17 @@ from fastdeploy.entrypoints.openai.tool_parsers.ernie_x1_tool_parser import ( class DummyTokenizer: - """Dummy tokenizer with minimal vocab for testing""" + """Dummy tokenizer with vocab containing tool_call tokens""" def __init__(self): self.vocab = {"": 1, "": 2} + def get_vocab(self): + return self.vocab + class TestErnieX1ToolParser(unittest.TestCase): def setUp(self): - class DummyTokenizer: - def __init__(self): - self.vocab = {"": 1, "": 2} - - def get_vocab(self): - return self.vocab - self.tokenizer = DummyTokenizer() self.parser = ErnieX1ToolParser(tokenizer=self.tokenizer) self.dummy_request = ChatCompletionRequest(messages=[{"role": "user", "content": "hi"}]) @@ -47,7 +42,7 @@ class TestErnieX1ToolParser(unittest.TestCase): def test_extract_tool_calls_complete(self): """Test normal extraction of complete tool_call JSON""" - output = '{"name": "get_weather", "arguments": {"location": "北京"}}' + output = '{"name": "get_weather", "arguments": {"location": "Beijing"}}' result = self.parser.extract_tool_calls(output, self.dummy_request) self.assertTrue(result.tools_called) self.assertEqual(result.tool_calls[0].function.name, "get_weather") @@ -59,18 +54,40 @@ class TestErnieX1ToolParser(unittest.TestCase): self.assertFalse(result.tools_called) def test_extract_tool_calls_exception(self): - """Force exception to cover error branch""" - with patch( - "fastdeploy.entrypoints.openai.tool_parsers.ernie_x1_tool_parser.json.loads", side_effect=Exception("boom") - ): - output = '{"name": "get_weather", "arguments": {}}' - result = self.parser.extract_tool_calls(output, self.dummy_request) - self.assertFalse(result.tools_called) + """Completely broken JSON triggers the exception branch""" + output = "not json at all{{{" + result = self.parser.extract_tool_calls(output, self.dummy_request) + self.assertFalse(result.tools_called) + + def test_extract_tool_calls_partial_json_parser_failure(self): + """Test partial_json_parser failure path for arguments (L165-166). + json.loads fails on malformed JSON, partial_json_parser.loads also fails on deeply broken args. + Partial result has _is_partial=True so tools_called=False, but tool_calls is populated.""" + # Malformed JSON: valid name but arguments is a bare invalid token + # that breaks both json.loads and partial_json_parser + output = '{"name": "test", "arguments": @@@INVALID@@@}' + result = self.parser.extract_tool_calls(output, self.dummy_request) + # _is_partial=True → tools_called=False, but tool_calls list is populated + self.assertFalse(result.tools_called) + self.assertIsNotNone(result.tool_calls) + self.assertEqual(result.tool_calls[0].function.name, "test") + # arguments=None → converted to {} → serialized as "{}" + self.assertEqual(result.tool_calls[0].function.arguments, "{}") + + def test_partial_json_parser_exception_triggers_debug_log(self): + """Malformed JSON + partial_json_parser failure exercises L165-166 exactly.""" + # Unclosed string in arguments breaks both json.loads and partial_json_parser + output = '{"name": "my_tool", "arguments": {"key": "unterminated}' + result = self.parser.extract_tool_calls(output, self.dummy_request) + # Partial parse → tools_called=False but tool_calls has entries + self.assertFalse(result.tools_called) + self.assertIsNotNone(result.tool_calls) + self.assertEqual(result.tool_calls[0].function.name, "my_tool") # ---------------- Streaming extraction tests ---------------- def test_streaming_no_toolcall(self): - """Streaming extraction returns normal DeltaMessage when no """ + """Streaming extraction returns normal DeltaMessage when no toolcall tag""" result = self.parser.extract_tool_calls_streaming( "", "abc", "abc", [], [], [], self.dummy_request.model_dump() ) @@ -103,7 +120,7 @@ class TestErnieX1ToolParser(unittest.TestCase): def test_streaming_complete_arguments_and_end(self): """Streaming extraction completes arguments with brackets matched and closes tool_call""" - text = '"arguments": {"location": "北京"}}' + text = '"arguments": {"location": "Beijing"}}' delta = self.parser.extract_tool_calls_streaming( "", "" + text, text, [], [1], [1], self.dummy_request.model_dump() ) diff --git a/tests/input/test_preprocess.py b/tests/input/test_preprocess.py new file mode 100644 index 0000000000..b4659261a8 --- /dev/null +++ b/tests/input/test_preprocess.py @@ -0,0 +1,87 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for InputPreprocessor.create_processor(). + +Why mock: + - ModelConfig, ReasoningParserManager, ToolParserManager, and concrete processor + classes all depend on model files or external resources not available in tests. + We mock them at the import boundary to test InputPreprocessor's routing logic. +""" + +import unittest +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + + +def _make_model_config(arch, enable_mm=False): + cfg = SimpleNamespace( + model="test_model", + architectures=[arch], + enable_mm=enable_mm, + ) + return cfg + + +class TestInputPreprocessorBranching(unittest.TestCase): + """Test that create_processor picks the right processor class based on architecture and flags.""" + + def test_init_stores_params(self): + from fastdeploy.input.preprocess import InputPreprocessor + + config = _make_model_config("LlamaForCausalLM") + pp = InputPreprocessor( + model_config=config, + reasoning_parser="qwen3", + tool_parser="ernie_x1", + limit_mm_per_prompt={"image": 2}, + ) + self.assertEqual(pp.model_name_or_path, "test_model") + self.assertEqual(pp.reasoning_parser, "qwen3") + self.assertEqual(pp.tool_parser, "ernie_x1") + self.assertEqual(pp.limit_mm_per_prompt, {"image": 2}) + + def test_create_processor_text_normal_path(self): + """Normal path: non-Ernie, non-MM arch creates a text DataProcessor.""" + from fastdeploy.input.preprocess import InputPreprocessor + + config = _make_model_config("LlamaForCausalLM", enable_mm=False) + pp = InputPreprocessor(model_config=config) + + mock_dp = MagicMock() + with ( + patch.dict("sys.modules", {"fastdeploy.plugins": None, "fastdeploy.plugins.input_processor": None}), + patch("fastdeploy.input.preprocess.envs") as mock_envs, + patch("fastdeploy.input.text_processor.DataProcessor", return_value=mock_dp), + ): + mock_envs.ENABLE_V1_DATA_PROCESSOR = False + pp.create_processor() + + self.assertIs(pp.processor, mock_dp) + + def test_unsupported_mm_arch_raises(self): + """When enable_mm=True and arch is unrecognized, should raise ValueError.""" + from fastdeploy.input.preprocess import InputPreprocessor + + config = _make_model_config("UnknownMMArch", enable_mm=True) + pp = InputPreprocessor(model_config=config) + + with patch.dict("sys.modules", {"fastdeploy.plugins": None, "fastdeploy.plugins.input_processor": None}): + with self.assertRaises(ValueError): + pp.create_processor() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/input/test_tokenizer_client.py b/tests/input/test_tokenizer_client.py index 06804ebade..42e3e8ee59 100644 --- a/tests/input/test_tokenizer_client.py +++ b/tests/input/test_tokenizer_client.py @@ -99,3 +99,63 @@ async def test_encode_timeout(): with pytest.raises(TimeoutError): await client.encode_image(request) + + +@pytest.mark.asyncio +async def test_encode_invalid_type(): + """Test invalid encode type raises ValueError (line 130). + NOTE: Public methods hardcode the type param, so we test the private method directly + to verify the validation boundary.""" + base_url = "http://testserver" + client = AsyncTokenizerClient(base_url=base_url) + + request = ImageEncodeRequest( + version="v1", req_id="req_invalid", is_gen=False, resolution=256, image_url="http://example.com/image.jpg" + ) + + with pytest.raises(ValueError, match="Invalid encode type"): + await client._async_encode_request("invalid_type", request.model_dump()) + + +@pytest.mark.asyncio +async def test_decode_invalid_type(): + """Test invalid decode type raises ValueError (line 186). + NOTE: Public methods hardcode the type param, so we test the private method directly + to verify the validation boundary.""" + base_url = "http://testserver" + client = AsyncTokenizerClient(base_url=base_url) + + with pytest.raises(ValueError, match="Invalid decode type"): + await client._async_decode_request("invalid_type", {}) + + +@pytest.mark.asyncio +@respx.mock +async def test_encode_network_error_continues_polling(): + """Test network error during polling is caught and logged (line 164).""" + base_url = "http://testserver" + client = AsyncTokenizerClient(base_url=base_url, max_wait=2, poll_interval=0.1) + + # Mock create task + respx.post(f"{base_url}/image/encode").mock( + return_value=httpx.Response(200, json={"code": 0, "task_tag": "task_network_error"}) + ) + + # First poll fails with network error, second succeeds + call_count = 0 + + def side_effect(request): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise httpx.RequestError("Network error") + return httpx.Response(200, json={"state": "Finished", "result": {"key": "value"}}) + + respx.get(f"{base_url}/encode/get").mock(side_effect=side_effect) + + request = ImageEncodeRequest( + version="v1", req_id="req_network", is_gen=False, resolution=256, image_url="http://example.com/image.jpg" + ) + + result = await client.encode_image(request) + assert result["key"] == "value" diff --git a/tests/model_executor/ops/triton_ops/test_triton_utils_v2.py b/tests/model_executor/ops/triton_ops/test_triton_utils_v2.py index 802b32bcbf..ddcdb4e9d3 100644 --- a/tests/model_executor/ops/triton_ops/test_triton_utils_v2.py +++ b/tests/model_executor/ops/triton_ops/test_triton_utils_v2.py @@ -14,6 +14,9 @@ # limitations under the License. """ +# NOTE: Coverage supplement test — uses mock to reach compilation pipeline +# branches that require Triton/CUDA toolchain not available in unit tests. + import unittest from unittest.mock import MagicMock, patch @@ -179,5 +182,41 @@ class TestPaddleUseTritonV2(unittest.TestCase): self.assertEqual(my_kernel.key_args, ["N", "K"]) +class TestKernelInterfaceUnsupportedTypes(unittest.TestCase): + """Test assert False paths for unsupported types in KernelInterface decorator (L192, L200).""" + + def test_unsupported_constexpr_type_raises_assertion(self): + """Passing unsupported constexpr type triggers assert with message (L192).""" + + def kernel(a, N: tl.constexpr, K: tl.constexpr): + return + + ki = tu2.KernelInterface(kernel, other_config={}) + ki.grid = [1, 1, 1] + + # Pass a string (unsupported type) as constexpr arg 'N' + a = paddle.to_tensor([1], dtype="float32") + with self.assertRaises(AssertionError) as ctx: + ki.decorator(a, N="bad_string", K=8) + self.assertIn("Unsupported constexpr type", str(ctx.exception)) + self.assertIn("N", str(ctx.exception)) + + def test_unsupported_non_constexpr_arg_type_raises_assertion(self): + """Passing unsupported non-constexpr arg type triggers assert with message (L200).""" + + def kernel(a, b, N: tl.constexpr, K: tl.constexpr): + return + + ki = tu2.KernelInterface(kernel, other_config={}) + ki.grid = [1, 1, 1] + + # 'a' is a Tensor, 'b' is a non-constexpr non-Tensor — pass a string + a = paddle.to_tensor([1], dtype="float32") + with self.assertRaises(AssertionError) as ctx: + ki.decorator(a, "bad_string", N=8, K=16) + self.assertIn("Unsupported arg type", str(ctx.exception)) + self.assertIn("b", str(ctx.exception)) + + if __name__ == "__main__": unittest.main() diff --git a/tests/model_executor/test_model_executor_utils.py b/tests/model_executor/test_model_executor_utils.py new file mode 100644 index 0000000000..98cba5c330 --- /dev/null +++ b/tests/model_executor/test_model_executor_utils.py @@ -0,0 +1,184 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from fastdeploy.model_executor.utils import ( + BitMaskTracker, + TensorTracker, + WeightsMapper, + remap_weight_keys, + set_weight_attrs, + slice_fn, +) + + +class TestBitMaskTracker(unittest.TestCase): + def test_empty_is_not_full(self): + t = BitMaskTracker(8) + self.assertFalse(t.is_full()) + + def test_mark_all(self): + t = BitMaskTracker(4) + t.mark(0, 4) + self.assertTrue(t.is_full()) + + def test_mark_in_parts(self): + t = BitMaskTracker(8) + t.mark(0, 4) + self.assertFalse(t.is_full()) + t.mark(4, 8) + self.assertTrue(t.is_full()) + + def test_overlapping_marks(self): + t = BitMaskTracker(4) + t.mark(0, 3) + t.mark(2, 4) + self.assertTrue(t.is_full()) + + def test_single_element(self): + t = BitMaskTracker(1) + self.assertFalse(t.is_full()) + t.mark(0, 1) + self.assertTrue(t.is_full()) + + def test_invalid_range_raises(self): + t = BitMaskTracker(4) + with self.assertRaises(ValueError): + t.mark(-1, 2) + with self.assertRaises(ValueError): + t.mark(0, 5) + with self.assertRaises(ValueError): + t.mark(3, 2) + + +class TestTensorTracker2D(unittest.TestCase): + def test_track_columns(self): + tt = TensorTracker((4, 8), output_dim=1) + tt.mark(start=0, end=4) + self.assertFalse(tt.is_fully_copied()) + tt.mark(start=4, end=8) + self.assertTrue(tt.is_fully_copied()) + + def test_track_rows(self): + tt = TensorTracker((4, 8), output_dim=0) + tt.mark(start=0, end=4) + self.assertTrue(tt.is_fully_copied()) + + def test_partial_fill(self): + tt = TensorTracker((4, 8), output_dim=1) + tt.mark(start=0, end=3) + self.assertFalse(tt.is_fully_copied()) + + +class TestTensorTracker3D(unittest.TestCase): + def test_track_all_batches(self): + tt = TensorTracker((2, 4, 8), output_dim=1) + # Must fill both batches + tt.mark(start=0, end=8, batch_id=0) + self.assertFalse(tt.is_fully_copied()) + tt.mark(start=0, end=8, batch_id=1) + self.assertTrue(tt.is_fully_copied()) + + def test_missing_batch_id_raises(self): + tt = TensorTracker((2, 4, 8), output_dim=1) + with self.assertRaises(ValueError): + tt.mark(start=0, end=8) + + +class TestTensorTrackerInvalidDim(unittest.TestCase): + def test_1d_raises(self): + with self.assertRaises(ValueError): + TensorTracker((8,), output_dim=0) + + +class TestWeightsMapper(unittest.TestCase): + def test_prefix_mapping(self): + mapper = WeightsMapper(orig_to_new_prefix={"old.": "new."}) + self.assertEqual(mapper.apply("old.layer1.weight"), "new.layer1.weight") + + def test_no_match(self): + mapper = WeightsMapper(orig_to_new_prefix={"old.": "new."}) + self.assertEqual(mapper.apply("other.layer1.weight"), "other.layer1.weight") + + def test_multiple_prefixes(self): + mapper = WeightsMapper(orig_to_new_prefix={"a.": "x.", "b.": "y."}) + self.assertEqual(mapper.apply("a.foo"), "x.foo") + self.assertEqual(mapper.apply("b.bar"), "y.bar") + + +class TestRemapWeightKeys(unittest.TestCase): + def test_basic_remap(self): + weights = [("model.layer.weight", 1), ("model.layer.bias", 2)] + mapper = {"model.": "new_model."} + result = list(remap_weight_keys(iter(weights), mapper)) + self.assertEqual(result[0][0], "new_model.layer.weight") + self.assertEqual(result[1][0], "new_model.layer.bias") + + def test_include_keys_filter(self): + weights = [("model.a.weight", 1), ("model.b.weight", 2), ("model.c.bias", 3)] + mapper = {} + result = list(remap_weight_keys(iter(weights), mapper, include_keys=["weight"])) + self.assertEqual(len(result), 2) + + def test_no_match_passthrough(self): + weights = [("layer.weight", 1)] + mapper = {"other.": "new."} + result = list(remap_weight_keys(iter(weights), mapper)) + self.assertEqual(result[0][0], "layer.weight") + + +class TestSetWeightAttrs(unittest.TestCase): + def test_sets_attrs(self): + class Param: + pass + + p = Param() + set_weight_attrs(p, {"output_dim": 1, "tp_row_bias": True}) + self.assertEqual(p.output_dim, 1) + self.assertTrue(p.tp_row_bias) + + def test_none_map_noop(self): + class Param: + pass + + p = Param() + set_weight_attrs(p, None) # should not raise + + +class TestSliceFn(unittest.TestCase): + def test_1d_slice(self): + import numpy as np + + w = np.arange(10) + result = slice_fn(w, output_dim=False, start=2, end=5) + self.assertEqual(list(result), [2, 3, 4]) + + def test_2d_output_dim_true(self): + import numpy as np + + w = np.ones((4, 8)) + result = slice_fn(w, output_dim=True, start=0, end=4) + self.assertEqual(result.shape, (4, 4)) + + def test_2d_output_dim_false(self): + import numpy as np + + w = np.ones((4, 8)) + result = slice_fn(w, output_dim=False, start=1, end=3) + self.assertEqual(result.shape, (2, 8)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/quantization/test_quantization_init.py b/tests/quantization/test_quantization_init.py new file mode 100644 index 0000000000..40b4677384 --- /dev/null +++ b/tests/quantization/test_quantization_init.py @@ -0,0 +1,60 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for quantization module initialization and parse_quant_config. +""" + +import unittest + +from fastdeploy.model_executor.layers.quantization import ( + _compute_hadamard_block_size, + get_quantization_config, +) + + +class TestComputeHadamardBlockSize(unittest.TestCase): + """Tests for _compute_hadamard_block_size function.""" + + def test_basic_case(self): + """Test basic computation.""" + result = _compute_hadamard_block_size(4096, 2) + self.assertGreater(result, 0) + self.assertTrue(result & (result - 1) == 0) # Power of 2 + + def test_not_divisible_raises(self): + """Test that non-divisible moe_intermediate_size raises ValueError.""" + with self.assertRaises(ValueError) as ctx: + _compute_hadamard_block_size(4095, 2) + self.assertIn("must be divisible", str(ctx.exception)) + + +class TestGetQuantizationConfig(unittest.TestCase): + """Tests for get_quantization_config function.""" + + def test_valid_quantization_method(self): + """Test getting config for valid quantization method.""" + for method in ["wint4", "wint8", "block_wise_fp8", "w4afp8"]: + config_cls = get_quantization_config(method) + self.assertIsNotNone(config_cls) + + def test_invalid_quantization_method_raises(self): + """Test that invalid method raises ValueError.""" + with self.assertRaises(ValueError) as ctx: + get_quantization_config("invalid_method") + self.assertIn("Invalid quantization method", str(ctx.exception)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/router/test_router.py b/tests/router/test_router.py new file mode 100644 index 0000000000..4b9476883f --- /dev/null +++ b/tests/router/test_router.py @@ -0,0 +1,148 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for Router class. + +Why mock: + - register_instance calls check_service_health_async which does real HTTP. + We mock it at the network boundary to test Router's registration and selection logic. +""" + +import unittest +from types import SimpleNamespace +from unittest.mock import AsyncMock, patch + +from fastdeploy.router.router import Router, RouterArgs + + +def _make_args(**kwargs): + defaults = {"host": "0.0.0.0", "port": 9000, "splitwise": False, "request_timeout_secs": 30} + defaults.update(kwargs) + return SimpleNamespace(**defaults) + + +def _make_instance_dict(host_ip="10.0.0.1", port=8080, role="mixed", **kwargs): + d = { + "host_ip": host_ip, + "port": port, + "role": role, + } + d.update(kwargs) + return d + + +class TestRouterArgs(unittest.TestCase): + def test_defaults(self): + args = RouterArgs() + self.assertEqual(args.host, "0.0.0.0") + self.assertEqual(args.port, 9000) + self.assertFalse(args.splitwise) + self.assertEqual(args.request_timeout_secs, 1800) + + +class TestRouterInit(unittest.TestCase): + def test_init(self): + args = _make_args() + router = Router(args) + self.assertEqual(router.host, "0.0.0.0") + self.assertEqual(router.port, 9000) + self.assertFalse(router.splitwise) + self.assertEqual(router.mixed_servers, []) + self.assertEqual(router.prefill_servers, []) + self.assertEqual(router.decode_servers, []) + + +class TestRouterRegistration(unittest.IsolatedAsyncioTestCase): + @patch("fastdeploy.router.router.check_service_health_async", new_callable=AsyncMock, return_value=True) + async def test_register_mixed_instance(self, mock_health): + router = Router(_make_args(splitwise=False)) + inst_dict = _make_instance_dict(role="mixed") + await router.register_instance(inst_dict) + self.assertEqual(len(router.mixed_servers), 1) + + @patch("fastdeploy.router.router.check_service_health_async", new_callable=AsyncMock, return_value=True) + async def test_register_splitwise_instances(self, mock_health): + router = Router(_make_args(splitwise=True)) + + await router.register_instance(_make_instance_dict(host_ip="10.0.0.1", role="prefill")) + await router.register_instance(_make_instance_dict(host_ip="10.0.0.2", role="decode")) + + self.assertEqual(len(router.prefill_servers), 1) + self.assertEqual(len(router.decode_servers), 1) + + @patch("fastdeploy.router.router.check_service_health_async", new_callable=AsyncMock, return_value=True) + async def test_register_invalid_role_raises(self, mock_health): + """Splitwise mode should reject mixed instances.""" + router = Router(_make_args(splitwise=True)) + with self.assertRaises(ValueError): + await router.register_instance(_make_instance_dict(role="mixed")) + + @patch("fastdeploy.router.router.check_service_health_async", new_callable=AsyncMock, return_value=False) + async def test_register_unhealthy_instance_raises(self, mock_health): + router = Router(_make_args(splitwise=False)) + with self.assertRaises(RuntimeError): + await router.register_instance(_make_instance_dict(role="mixed")) + + +class TestRouterSelection(unittest.IsolatedAsyncioTestCase): + async def test_select_mixed_no_servers_raises(self): + router = Router(_make_args(splitwise=False)) + with self.assertRaises(RuntimeError): + await router.select_mixed() + + async def test_select_pd_no_prefill_raises(self): + router = Router(_make_args(splitwise=True)) + with self.assertRaises(RuntimeError): + await router.select_pd() + + async def test_select_pd_no_decode_raises(self): + """Test select_pd raises when no decode servers available (line 152).""" + router = Router(_make_args(splitwise=True)) + # Manually add a prefill server without going through health check + router.prefill_servers.append(_make_instance_dict(role="prefill")) + with self.assertRaises(RuntimeError) as ctx: + await router.select_pd() + self.assertIn("No decode servers available", str(ctx.exception)) + + @patch("fastdeploy.router.router.check_service_health_async", new_callable=AsyncMock, return_value=True) + async def test_select_mixed_returns_instance(self, mock_health): + router = Router(_make_args(splitwise=False)) + await router.register_instance(_make_instance_dict(role="mixed")) + inst = await router.select_mixed() + self.assertIsNotNone(inst) + + @patch("fastdeploy.router.router.check_service_health_async", new_callable=AsyncMock, return_value=True) + async def test_select_pd_returns_pair(self, mock_health): + router = Router(_make_args(splitwise=True)) + await router.register_instance(_make_instance_dict(host_ip="10.0.0.1", role="prefill")) + await router.register_instance(_make_instance_dict(host_ip="10.0.0.2", role="decode")) + prefill, decode = await router.select_pd() + self.assertIsNotNone(prefill) + self.assertIsNotNone(decode) + + +class TestRouterRegisteredNumber(unittest.IsolatedAsyncioTestCase): + @patch("fastdeploy.router.router.check_service_health_async", new_callable=AsyncMock, return_value=True) + async def test_registered_number(self, mock_health): + router = Router(_make_args(splitwise=False)) + await router.register_instance(_make_instance_dict(role="mixed")) + result = await router.registered_number() + self.assertEqual(result["mixed"], 1) + self.assertEqual(result["prefill"], 0) + self.assertEqual(result["decode"], 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/router/test_router_utils.py b/tests/router/test_router_utils.py new file mode 100644 index 0000000000..dc3ec38a60 --- /dev/null +++ b/tests/router/test_router_utils.py @@ -0,0 +1,102 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for router utils - InstanceInfo class. +""" + +import unittest + +from fastdeploy.router.utils import InstanceInfo, InstanceRole + + +class TestInstanceInfoFromDict(unittest.TestCase): + """Tests for InstanceInfo.from_dict method.""" + + def test_from_dict_success(self): + """Test creating InstanceInfo from dict with all required fields.""" + info_dict = { + "role": "mixed", + "host_ip": "10.0.0.1", + "port": 8080, + } + info = InstanceInfo.from_dict(info_dict) + self.assertEqual(info.role, InstanceRole.MIXED) + self.assertEqual(info.host_ip, "10.0.0.1") + self.assertEqual(info.port, "8080") + + def test_from_dict_missing_required_field_raises_keyerror(self): + """Test from_dict raises KeyError when required field is missing (line 60).""" + # Missing 'host_ip' which is a required field + info_dict = { + "role": "mixed", + "port": 8080, + } + with self.assertRaises(KeyError) as ctx: + InstanceInfo.from_dict(info_dict) + self.assertIn("Missing required field", str(ctx.exception)) + self.assertIn("host_ip", str(ctx.exception)) + + def test_from_dict_missing_role_raises_keyerror(self): + """Test from_dict raises KeyError when role is missing.""" + info_dict = { + "host_ip": "10.0.0.1", + "port": 8080, + } + with self.assertRaises(KeyError) as ctx: + InstanceInfo.from_dict(info_dict) + self.assertIn("Missing required field", str(ctx.exception)) + self.assertIn("role", str(ctx.exception)) + + def test_from_dict_with_optional_fields(self): + """Test from_dict with optional fields uses defaults.""" + info_dict = { + "role": InstanceRole.PREFILL, + "host_ip": "10.0.0.2", + "port": 9090, + "metrics_port": 9091, + "transfer_protocol": ["ipc"], + } + info = InstanceInfo.from_dict(info_dict) + self.assertEqual(info.role, InstanceRole.PREFILL) + self.assertEqual(info.metrics_port, "9091") + self.assertEqual(info.transfer_protocol, ["ipc"]) + # Check defaults + self.assertEqual(info.connector_port, "0") + self.assertEqual(info.tp_size, 1) + + +class TestInstanceInfoPostInit(unittest.TestCase): + """Tests for InstanceInfo.__post_init__ method.""" + + def test_role_string_conversion(self): + """Test role string is converted to InstanceRole enum.""" + info = InstanceInfo(role="decode", host_ip="10.0.0.1", port=8080) + self.assertEqual(info.role, InstanceRole.DECODE) + + def test_invalid_role_string_raises_valueerror(self): + """Test invalid role string raises ValueError.""" + with self.assertRaises(ValueError) as ctx: + InstanceInfo(role="invalid_role", host_ip="10.0.0.1", port=8080) + self.assertIn("Invalid role string", str(ctx.exception)) + + def test_invalid_role_type_raises_typeerror(self): + """Test invalid role type raises TypeError.""" + with self.assertRaises(TypeError) as ctx: + InstanceInfo(role=123, host_ip="10.0.0.1", port=8080) + self.assertIn("role must be InstanceRole or str", str(ctx.exception)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/scheduler/test_scheduler_config.py b/tests/scheduler/test_scheduler_config.py index 05b7fa4007..f9c8372222 100644 --- a/tests/scheduler/test_scheduler_config.py +++ b/tests/scheduler/test_scheduler_config.py @@ -12,16 +12,138 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -Tests for FDConfig and scheduler configuration, specifically for -max_num_batched_tokens assignment when ENABLE_V1_KVCACHE_SCHEDULER is enabled. +Tests for scheduler configuration classes and FDConfig max_num_batched_tokens +assignment when ENABLE_V1_KVCACHE_SCHEDULER is enabled. """ import contextlib +import os import unittest -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch from fastdeploy.config import FDConfig -from fastdeploy.scheduler.config import SchedulerConfig +from fastdeploy.scheduler.config import ( + DPLocalSchedulerConfig, + GlobalSchedulerConfig, + LocalSchedulerConfig, + SchedulerConfig, +) + + +class TestLocalSchedulerConfig(unittest.TestCase): + def test_defaults(self): + cfg = LocalSchedulerConfig() + self.assertEqual(cfg.max_size, -1) + self.assertEqual(cfg.ttl, 900) + self.assertEqual(cfg.max_model_len, 8192) + self.assertFalse(cfg.enable_chunked_prefill) + + def test_auto_threshold(self): + """long_prefill_token_threshold should be 4% of max_model_len when set to 0.""" + cfg = LocalSchedulerConfig(max_model_len=10000, long_prefill_token_threshold=0) + self.assertEqual(cfg.long_prefill_token_threshold, 400) + + def test_explicit_threshold(self): + cfg = LocalSchedulerConfig(long_prefill_token_threshold=512) + self.assertEqual(cfg.long_prefill_token_threshold, 512) + + def test_custom_values(self): + cfg = LocalSchedulerConfig(max_size=100, ttl=300, max_model_len=4096) + self.assertEqual(cfg.max_size, 100) + self.assertEqual(cfg.ttl, 300) + self.assertEqual(cfg.max_model_len, 4096) + + def test_kwargs_ignored(self): + """Extra kwargs should not raise.""" + cfg = LocalSchedulerConfig(unknown_key="value") + self.assertFalse(hasattr(cfg, "unknown_key")) + + +class TestDPLocalSchedulerConfig(unittest.TestCase): + def test_defaults(self): + cfg = DPLocalSchedulerConfig() + self.assertEqual(cfg.splitwise_role, "prefill") + + def test_custom_role(self): + cfg = DPLocalSchedulerConfig(splitwise_role="decode") + self.assertEqual(cfg.splitwise_role, "decode") + + +class TestGlobalSchedulerConfig(unittest.TestCase): + def test_defaults(self): + cfg = GlobalSchedulerConfig() + self.assertEqual(cfg.host, "127.0.0.1") + self.assertEqual(cfg.port, 6379) + self.assertEqual(cfg.db, 0) + self.assertIsNone(cfg.password) + self.assertEqual(cfg.topic, "default") + + def test_check_invalid_ttl(self): + cfg = GlobalSchedulerConfig(ttl=-1) + with self.assertRaises(ValueError): + cfg.check() + + def test_check_invalid_min_load_score(self): + cfg = GlobalSchedulerConfig(min_load_score=0) + with self.assertRaises(ValueError): + cfg.check() + + def test_check_invalid_load_shards_num(self): + cfg = GlobalSchedulerConfig(load_shards_num=0) + with self.assertRaises(ValueError): + cfg.check() + + def test_auto_threshold(self): + cfg = GlobalSchedulerConfig(max_model_len=20000, long_prefill_token_threshold=0) + self.assertEqual(cfg.long_prefill_token_threshold, 800) + + @patch("fastdeploy.scheduler.config.redis") + def test_check_redis_connection_failure_raises(self, mock_redis_mod): + """Redis ping returning False should raise ConnectionError.""" + mock_conn = MagicMock() + mock_conn.ping.return_value = False + mock_redis_mod.Redis.return_value = mock_conn + + cfg = GlobalSchedulerConfig() + with self.assertRaises(ConnectionError): + cfg.check() + + +class TestSchedulerConfig(unittest.TestCase): + def test_local_scheduler(self): + cfg = SchedulerConfig({"name": "local", "max_size": 50, "ttl": 600}) + self.assertEqual(cfg.name, "local") + self.assertIsInstance(cfg.config, LocalSchedulerConfig) + self.assertEqual(cfg.config.max_size, 50) + + def test_dp_scheduler(self): + cfg = SchedulerConfig({"name": "dp", "splitwise_role": "decode"}) + self.assertEqual(cfg.name, "dp") + self.assertIsInstance(cfg.config, DPLocalSchedulerConfig) + + def test_global_scheduler(self): + cfg = SchedulerConfig({"name": "global", "host": "redis.local"}) + self.assertEqual(cfg.name, "global") + self.assertIsInstance(cfg.config, GlobalSchedulerConfig) + self.assertEqual(cfg.config.host, "redis.local") + + def test_check_unknown_name_raises(self): + cfg = SchedulerConfig({"name": "unknown"}) + with self.assertRaises(Exception): + cfg.check() + + def test_default_attrs(self): + cfg = SchedulerConfig({"name": "local"}) + self.assertEqual(cfg.max_num_batched_tokens, 2048) + self.assertEqual(cfg.max_extra_num_batched_tokens, 16384) + self.assertEqual(cfg.max_num_seqs, 34) + self.assertEqual(cfg.splitwise_role, "mixed") + self.assertFalse(cfg.enable_overlap_schedule) + + def test_attrs_override(self): + cfg = SchedulerConfig({"name": "local", "max_num_seqs": 64, "max_num_batched_tokens": 4096}) + self.assertEqual(cfg.max_num_seqs, 64) + self.assertEqual(cfg.max_num_batched_tokens, 4096) def _create_mock_configs(): @@ -113,21 +235,15 @@ def _create_fd_config_instance(mock_scheduler, mock_model, mock_cache, mock_para @contextlib.contextmanager def _patch_env_and_config(enable_v1_scheduler): """Context manager to patch all environment variables and config methods.""" - from fastdeploy import envs as fastdeploy_envs + env_vars = { + "ENABLE_V1_KVCACHE_SCHEDULER": str(enable_v1_scheduler), + "FD_ENABLE_MAX_PREFILL": "0", + "FD_FOR_TORCH_MODEL_FORMAT": "0", + "FD_MAX_STOP_SEQS_NUM": "10", + "FD_STOP_SEQS_MAX_LEN": "100", + } - env_patches = [ - patch.object(fastdeploy_envs, "ENABLE_V1_KVCACHE_SCHEDULER", enable_v1_scheduler), - patch.object(fastdeploy_envs, "FD_ENABLE_MAX_PREFILL", False), - patch.object(fastdeploy_envs, "FD_FOR_TORCH_MODEL_FORMAT", False), - patch.object(fastdeploy_envs, "FD_MAX_STOP_SEQS_NUM", 10), - patch.object(fastdeploy_envs, "FD_STOP_SEQS_MAX_LEN", 100), - patch("fastdeploy.config.envs.ENABLE_V1_KVCACHE_SCHEDULER", enable_v1_scheduler), - ] - - with contextlib.ExitStack() as stack: - for p in env_patches: - stack.enter_context(p) - stack.enter_context(patch.object(FDConfig, "_disable_sequence_parallel_moe_if_needed")) + with patch.dict(os.environ, env_vars): yield diff --git a/tests/spec_decode/test_mtp_proposer.py b/tests/spec_decode/test_mtp_proposer.py index c3ab71e613..fe61862770 100644 --- a/tests/spec_decode/test_mtp_proposer.py +++ b/tests/spec_decode/test_mtp_proposer.py @@ -14,6 +14,9 @@ # limitations under the License. """ +# NOTE: Coverage supplement test — uses mock to reach speculative decoding +# branches that require GPU model loading not available in unit tests. + import unittest from unittest.mock import Mock, patch @@ -647,6 +650,23 @@ class TestMTPProposer(unittest.TestCase): self.assertTrue(proposer.model_inputs["stop_flags"][0].item()) self.assertEqual(proposer.model_inputs["seq_lens_this_time_buffer"][0].item(), 0) + @patch("fastdeploy.spec_decode.mtp.get_model_loader") + @patch("fastdeploy.spec_decode.mtp.get_attention_backend") + @patch("fastdeploy.worker.input_batch.get_rope") + @patch("fastdeploy.spec_decode.mtp.current_platform") + def test_unsupported_platform_raises_runtime_error( + self, mock_platform, mock_rope, mock_attn_backend, mock_model_loader + ): + """Cover RuntimeError in __init__ when platform is unsupported (line 120).""" + mock_platform.is_xpu.return_value = False + mock_platform.is_cuda.return_value = False + mock_platform.is_maca.return_value = False + mock_platform.__str__ = lambda self: "UnsupportedPlatform" + + with self.assertRaises(RuntimeError) as ctx: + MTPProposer(self.fd_config, self.main_model, self.local_rank, self.device_id, self.target_model_inputs) + self.assertIn("Unsupported platform for MTP", str(ctx.exception)) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_envs.py b/tests/test_envs.py new file mode 100644 index 0000000000..41dd4c1046 --- /dev/null +++ b/tests/test_envs.py @@ -0,0 +1,168 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +from fastdeploy import envs + + +class TestEnvsGetattr(unittest.TestCase): + """Test the module-level __getattr__ lazy evaluation.""" + + def test_default_values(self): + with _clean_env("FD_DEBUG"): + self.assertEqual(envs.FD_DEBUG, 0) + + with _clean_env("FD_LOG_DIR"): + self.assertEqual(envs.FD_LOG_DIR, "log") + + with _clean_env("FD_MAX_STOP_SEQS_NUM"): + self.assertEqual(envs.FD_MAX_STOP_SEQS_NUM, 5) + + def test_env_override(self): + with _set_env("FD_DEBUG", "1"): + self.assertEqual(envs.FD_DEBUG, 1) + + with _set_env("FD_LOG_DIR", "/tmp/mylog"): + self.assertEqual(envs.FD_LOG_DIR, "/tmp/mylog") + + def test_bool_env(self): + with _set_env("FD_USE_HF_TOKENIZER", "1"): + self.assertTrue(envs.FD_USE_HF_TOKENIZER) + + with _set_env("FD_USE_HF_TOKENIZER", "0"): + self.assertFalse(envs.FD_USE_HF_TOKENIZER) + + def test_unknown_attr_raises(self): + with self.assertRaises(AttributeError): + _ = envs.THIS_DOES_NOT_EXIST + + def test_list_env_fd_plugins(self): + with _clean_env("FD_PLUGINS"): + self.assertIsNone(envs.FD_PLUGINS) + + with _set_env("FD_PLUGINS", "a,b,c"): + self.assertEqual(envs.FD_PLUGINS, ["a", "b", "c"]) + + def test_list_env_fd_api_key(self): + with _clean_env("FD_API_KEY"): + self.assertEqual(envs.FD_API_KEY, []) + + with _set_env("FD_API_KEY", "key1,key2"): + self.assertEqual(envs.FD_API_KEY, ["key1", "key2"]) + + +class TestEnvsSetattr(unittest.TestCase): + """Test module-level __setattr__.""" + + def test_setattr_known_var(self): + original = envs.FD_DEBUG + try: + envs.FD_DEBUG = 42 + self.assertEqual(envs.FD_DEBUG, 42) + finally: + envs.FD_DEBUG = original + + def test_setattr_unknown_var_raises(self): + with self.assertRaises(AttributeError): + envs.UNKNOWN_VAR_XYZ = 1 + + +class TestValidateSplitKvSize(unittest.TestCase): + """Test _validate_split_kv_size via FD_DETERMINISTIC_SPLIT_KV_SIZE.""" + + def test_valid_power_of_two(self): + with _set_env("FD_DETERMINISTIC_SPLIT_KV_SIZE", "16"): + self.assertEqual(envs.FD_DETERMINISTIC_SPLIT_KV_SIZE, 16) + + with _set_env("FD_DETERMINISTIC_SPLIT_KV_SIZE", "1"): + self.assertEqual(envs.FD_DETERMINISTIC_SPLIT_KV_SIZE, 1) + + def test_invalid_not_power_of_two(self): + with _set_env("FD_DETERMINISTIC_SPLIT_KV_SIZE", "3"): + with self.assertRaises(ValueError): + _ = envs.FD_DETERMINISTIC_SPLIT_KV_SIZE + + def test_invalid_zero(self): + with _set_env("FD_DETERMINISTIC_SPLIT_KV_SIZE", "0"): + with self.assertRaises(ValueError): + _ = envs.FD_DETERMINISTIC_SPLIT_KV_SIZE + + def test_invalid_negative(self): + with _set_env("FD_DETERMINISTIC_SPLIT_KV_SIZE", "-4"): + with self.assertRaises(ValueError): + _ = envs.FD_DETERMINISTIC_SPLIT_KV_SIZE + + +class TestEnvsDir(unittest.TestCase): + def test_dir_returns_keys(self): + result = dir(envs) + self.assertIn("FD_DEBUG", result) + self.assertIn("FD_LOG_DIR", result) + + +class TestGetUniqueName(unittest.TestCase): + def test_with_shm_uuid(self): + with _set_env("SHM_UUID", "abc123"): + result = envs.get_unique_name(None, "prefix") + self.assertEqual(result, "prefix_abc123") + + def test_without_shm_uuid(self): + with _clean_env("SHM_UUID"): + result = envs.get_unique_name(None, "prefix") + self.assertEqual(result, "prefix_") + + +# ---- helpers ---- + + +class _clean_env: + """Context manager to temporarily remove an env var.""" + + def __init__(self, key): + self.key = key + + def __enter__(self): + self.old = os.environ.pop(self.key, None) + return self + + def __exit__(self, *exc): + if self.old is not None: + os.environ[self.key] = self.old + else: + os.environ.pop(self.key, None) + + +class _set_env: + """Context manager to temporarily set an env var.""" + + def __init__(self, key, value): + self.key = key + self.value = value + + def __enter__(self): + self.old = os.environ.get(self.key) + os.environ[self.key] = self.value + return self + + def __exit__(self, *exc): + if self.old is not None: + os.environ[self.key] = self.old + else: + os.environ.pop(self.key, None) + + +if __name__ == "__main__": + unittest.main()