From b134e6afe6f2e1e83d6b64b73d924318d4660fe2 Mon Sep 17 00:00:00 2001 From: chen <103103266+ckl117@users.noreply.github.com> Date: Fri, 17 Oct 2025 11:47:16 +0800 Subject: [PATCH] [BugFix]Dev fix custom ar unstable result (#4437) --- fastdeploy/distributed/communication.py | 3 ++- .../distributed/custom_all_reduce/custom_all_reduce.py | 4 ++-- .../layers/backends/dcu/fused_moe_triton_backends.py | 2 +- .../backends/gcu/moe/fused_moe_method_gcu_backend.py | 2 +- .../backends/metax/moe/fused_moe_cutlass_metax_backend.py | 2 +- .../backends/metax/moe/fused_moe_triton_metax_backend.py | 2 +- .../model_executor/layers/backends/xpu/moe/fused_moe.py | 6 +++--- fastdeploy/model_executor/layers/linear.py | 2 +- .../layers/moe/fused_moe_cutlass_backend.py | 2 +- .../layers/moe/fused_moe_deepgemm_backend.py | 2 +- .../model_executor/layers/moe/fused_moe_marlin_backend.py | 2 +- .../model_executor/layers/moe/fused_moe_triton_backend.py | 8 ++++---- .../model_executor/layers/moe/fused_moe_wint2_backend.py | 4 ++-- fastdeploy/model_executor/models/deepseek_v3.py | 2 +- .../model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py | 2 +- fastdeploy/model_executor/models/glm4_moe.py | 2 +- tests/distributed/custom_all_reduce.py | 2 +- 17 files changed, 25 insertions(+), 24 deletions(-) diff --git a/fastdeploy/distributed/communication.py b/fastdeploy/distributed/communication.py index 5c78d125c5..c05671aae2 100644 --- a/fastdeploy/distributed/communication.py +++ b/fastdeploy/distributed/communication.py @@ -59,7 +59,7 @@ try: global _TP_AR if _TP_AR is not None and _TP_AR.should_custom_ar(input_): # TODO: supports different_group custom allreduce - _TP_AR.custom_all_reduce(input_) + input_ = _TP_AR.custom_all_reduce(input_) elif paddle.in_dynamic_mode(): if group_ is not None: dist.all_reduce(input_, group=group_) @@ -69,6 +69,7 @@ try: dist.all_reduce(input_, group=mp_group) else: dist.all_reduce(input_) + return input_ except: tensor_model_parallel_all_reduce = None diff --git a/fastdeploy/distributed/custom_all_reduce/custom_all_reduce.py b/fastdeploy/distributed/custom_all_reduce/custom_all_reduce.py index b2e61c71d9..dfbed094dd 100644 --- a/fastdeploy/distributed/custom_all_reduce/custom_all_reduce.py +++ b/fastdeploy/distributed/custom_all_reduce/custom_all_reduce.py @@ -213,13 +213,13 @@ class CustomAllreduce: stream_capturing = lib.cudaStreamIsCapturing(stream) if stream_capturing.value == 1: # 1 is cudaStreamCaptureStatusActive: The stream is capturing. - return self.all_reduce(input, input, registered=True) + return self.all_reduce(input, registered=True) else: # If warm up, mimic the allocation pattern since custom # allreduce is out-of-place. return paddle.empty_like(input) else: - return self.all_reduce(input, input, registered=False) + return self.all_reduce(input, registered=False) def clear_ipc_handles(self): clear_ipc_handles(self._ptr) diff --git a/fastdeploy/model_executor/layers/backends/dcu/fused_moe_triton_backends.py b/fastdeploy/model_executor/layers/backends/dcu/fused_moe_triton_backends.py index 0038ed1497..f1ea6572fe 100644 --- a/fastdeploy/model_executor/layers/backends/dcu/fused_moe_triton_backends.py +++ b/fastdeploy/model_executor/layers/backends/dcu/fused_moe_triton_backends.py @@ -243,5 +243,5 @@ class DCUTritonWeightOnlyMoEMethod(QuantMethodBase): out = intermediate_cache3.sum(axis=1) if layer.tp_size > 1: - tensor_model_parallel_all_reduce(out) + out = tensor_model_parallel_all_reduce(out) return out diff --git a/fastdeploy/model_executor/layers/backends/gcu/moe/fused_moe_method_gcu_backend.py b/fastdeploy/model_executor/layers/backends/gcu/moe/fused_moe_method_gcu_backend.py index c899cafc79..c13a68f311 100644 --- a/fastdeploy/model_executor/layers/backends/gcu/moe/fused_moe_method_gcu_backend.py +++ b/fastdeploy/model_executor/layers/backends/gcu/moe/fused_moe_method_gcu_backend.py @@ -180,7 +180,7 @@ class GCUFusedMoeMethod(UnquantizedFusedMoEMethod): tensor_model_parallel_all_reduce, ) - tensor_model_parallel_all_reduce(fused_moe_out) + fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out) return fused_moe_out diff --git a/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_cutlass_metax_backend.py b/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_cutlass_metax_backend.py index 19b2ba8f8d..97926b76ad 100644 --- a/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_cutlass_metax_backend.py +++ b/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_cutlass_metax_backend.py @@ -110,7 +110,7 @@ class MetaxCutlassMoEMethod(MoEMethodBase): False, ) if layer.reduce_results and layer.tp_size > 1: - tensor_model_parallel_all_reduce(fused_moe_out) + fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out) return fused_moe_out diff --git a/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_triton_metax_backend.py b/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_triton_metax_backend.py index 4e4e867899..20c24e29c9 100644 --- a/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_triton_metax_backend.py +++ b/fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_triton_metax_backend.py @@ -282,5 +282,5 @@ class MetaxTritonWeightOnlyMoEMethod(QuantMethodBase): down_proj_out.reshape_([token_num, top_k, hidden_size]) out = down_proj_out.sum(axis=1) if layer.tp_size > 1: - tensor_model_parallel_all_reduce(out) + out = tensor_model_parallel_all_reduce(out) return out diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py index 2e74e53461..fd25098a1f 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py @@ -171,7 +171,7 @@ class XPUMoEMethod(MoEMethodBase): False, # moe group, used in deepseek ) if layer.reduce_results and layer.tp_size > 1: - tensor_model_parallel_all_reduce(fused_moe_out) + fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out) return fused_moe_out @@ -459,7 +459,7 @@ class XPUWeightOnlyMoEMethod(XPUMoEMethod): tmp_ffn_out = paddle.empty(x.shape, x.dtype) if layer.reduce_results and layer.tp_size > 1: - tensor_model_parallel_all_reduce(tmp_ffn_out) + tmp_ffn_out = tensor_model_parallel_all_reduce(tmp_ffn_out) return tmp_ffn_out @@ -640,5 +640,5 @@ class XPUW4A8MoEMethod(XPUMoEMethod): permute_indices_per_token.shape[1], ) if layer.reduce_results and layer.tp_size > 1: - tensor_model_parallel_all_reduce(tmp_ffn_out) + tmp_ffn_out = tensor_model_parallel_all_reduce(tmp_ffn_out) return tmp_ffn_out diff --git a/fastdeploy/model_executor/layers/linear.py b/fastdeploy/model_executor/layers/linear.py index 83c84454f9..beee8f940a 100644 --- a/fastdeploy/model_executor/layers/linear.py +++ b/fastdeploy/model_executor/layers/linear.py @@ -863,7 +863,7 @@ class RowParallelLinear(LinearBase): out = paddle.matmul(x, self.weight) if self.reduce_results and self.nranks > 1: - tensor_model_parallel_all_reduce(out, self.tp_group) + out = tensor_model_parallel_all_reduce(out, self.tp_group) if not self.fd_config.quant_config and self.add_bias: out = paddle.add(out, self.bias) return out diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py index 22460676b9..8ddf69a80c 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py @@ -298,7 +298,7 @@ class CutlassMoEMethod(UnquantizedFusedMoEMethod): ) if layer.reduce_results and layer.tp_size > 1: - tensor_model_parallel_all_reduce(fused_moe_out, layer.fd_config.parallel_config.tp_group) + fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out, layer.fd_config.parallel_config.tp_group) return fused_moe_out diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py index c973f1901b..06cc329491 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py @@ -594,6 +594,6 @@ class DeepGemmFusedMoeMethod(MoEMethodBase): 1.0, )[0] if layer.tp_size > 1: - tensor_model_parallel_all_reduce(tmp_ffn_out) + tmp_ffn_out = tensor_model_parallel_all_reduce(tmp_ffn_out) return tmp_ffn_out diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_marlin_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_marlin_backend.py index cc09dd3adc..7277b9697f 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_marlin_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_marlin_backend.py @@ -354,6 +354,6 @@ class MarlinWeightOnlyMoEMethod(QuantMethodBase): ffn_out = ffn_out.sum(axis=1) if layer.reduce_results and layer.tp_size > 1: - tensor_model_parallel_all_reduce(ffn_out) + ffn_out = tensor_model_parallel_all_reduce(ffn_out) return ffn_out diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py index d5b6cf970e..d6d1a5a4dc 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py @@ -393,7 +393,7 @@ class TritonWeightOnlyMoEMethod(QuantMethodBase): down_proj_out.reshape_([token_num, top_k, hidden_size]) out = down_proj_out.sum(axis=1) if layer.reduce_results and layer.tp_size > 1: - tensor_model_parallel_all_reduce(out) + out = tensor_model_parallel_all_reduce(out) return out @@ -767,7 +767,7 @@ class Wfp8Afp8MoEMethod(QuantMethodBase): out = down_proj_out.sum(axis=1) if layer.reduce_results and layer.tp_size > 1: - tensor_model_parallel_all_reduce(out) + out = tensor_model_parallel_all_reduce(out) return out @@ -1056,7 +1056,7 @@ class TensorWiseFP8MoEMethod(QuantMethodBase): out = down_proj_out.sum(axis=1) if layer.tp_size > 1: - tensor_model_parallel_all_reduce(out) + out = tensor_model_parallel_all_reduce(out) return out @@ -1460,6 +1460,6 @@ class BlockWiseFP8MoEMethod(QuantMethodBase): out = intermediate_cache3.sum(axis=1) if layer.tp_size > 1: - tensor_model_parallel_all_reduce(out) + out = tensor_model_parallel_all_reduce(out) return out diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_wint2_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_wint2_backend.py index f9f717d313..7cbb46dc10 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_wint2_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_wint2_backend.py @@ -318,7 +318,7 @@ class CutlassWint2FusedMoeMethod(Wint2MoeMethod): ) if layer.tp_size > 1: - tensor_model_parallel_all_reduce(fused_moe_out) + fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out) return fused_moe_out @@ -488,6 +488,6 @@ class TritonWint2FusedMoeMethod(CutlassWint2FusedMoeMethod): fused_moe_out = paddle.sum(intermediate_cache3, axis=1) if layer.tp_size > 1: - tensor_model_parallel_all_reduce(fused_moe_out) + fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out) return fused_moe_out diff --git a/fastdeploy/model_executor/models/deepseek_v3.py b/fastdeploy/model_executor/models/deepseek_v3.py index faa76be8d2..5ff2bc4cf3 100644 --- a/fastdeploy/model_executor/models/deepseek_v3.py +++ b/fastdeploy/model_executor/models/deepseek_v3.py @@ -194,7 +194,7 @@ class DeepSeekV3MoE(nn.Layer): moe_out = moe_out + shared_experts_out # We do to TP all reduce after the sum of experts. if self.tp_size > 1: - tensor_model_parallel_all_reduce(moe_out) + moe_out = tensor_model_parallel_all_reduce(moe_out) return moe_out diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py index 6a96adeabd..3f81bc3a5a 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py @@ -300,7 +300,7 @@ class Ernie4_5_VLMoE(nn.Layer): if self.num_shared_experts > 0: hidden_states += shared_experts_out if self.tp_size > 1: - tensor_model_parallel_all_reduce(hidden_states) + hidden_states = tensor_model_parallel_all_reduce(hidden_states) return hidden_states diff --git a/fastdeploy/model_executor/models/glm4_moe.py b/fastdeploy/model_executor/models/glm4_moe.py index 22e07a3c39..4ddecb5118 100644 --- a/fastdeploy/model_executor/models/glm4_moe.py +++ b/fastdeploy/model_executor/models/glm4_moe.py @@ -167,7 +167,7 @@ class Glm4Moe(nn.Layer): out = out + shared_experts_out # We do to TP all reduce after the sum of experts. if self.tensor_parallel_size > 1: - tensor_model_parallel_all_reduce(out, self.tp_group) + out = tensor_model_parallel_all_reduce(out, self.tp_group) return out diff --git a/tests/distributed/custom_all_reduce.py b/tests/distributed/custom_all_reduce.py index 651b692446..be4fd9d5c9 100644 --- a/tests/distributed/custom_all_reduce.py +++ b/tests/distributed/custom_all_reduce.py @@ -57,7 +57,7 @@ class Test(unittest.TestCase): data_custom_ar = paddle.rand([m, n], dtype="bfloat16") data_paddle = data_custom_ar.clone() if fa.should_custom_ar(data_custom_ar): - fa.custom_all_reduce(data_custom_ar) + data_custom_ar = fa.custom_all_reduce(data_custom_ar) dist.all_reduce(data_paddle) if dist.get_rank() == 0: np.testing.assert_allclose(