[BugFix]Dev fix custom ar unstable result (#4437)

This commit is contained in:
chen
2025-10-17 11:47:16 +08:00
committed by GitHub
parent 6160145f82
commit b134e6afe6
17 changed files with 25 additions and 24 deletions
+2 -1
View File
@@ -59,7 +59,7 @@ try:
global _TP_AR global _TP_AR
if _TP_AR is not None and _TP_AR.should_custom_ar(input_): if _TP_AR is not None and _TP_AR.should_custom_ar(input_):
# TODO: supports different_group custom allreduce # TODO: supports different_group custom allreduce
_TP_AR.custom_all_reduce(input_) input_ = _TP_AR.custom_all_reduce(input_)
elif paddle.in_dynamic_mode(): elif paddle.in_dynamic_mode():
if group_ is not None: if group_ is not None:
dist.all_reduce(input_, group=group_) dist.all_reduce(input_, group=group_)
@@ -69,6 +69,7 @@ try:
dist.all_reduce(input_, group=mp_group) dist.all_reduce(input_, group=mp_group)
else: else:
dist.all_reduce(input_) dist.all_reduce(input_)
return input_
except: except:
tensor_model_parallel_all_reduce = None tensor_model_parallel_all_reduce = None
@@ -213,13 +213,13 @@ class CustomAllreduce:
stream_capturing = lib.cudaStreamIsCapturing(stream) stream_capturing = lib.cudaStreamIsCapturing(stream)
if stream_capturing.value == 1: if stream_capturing.value == 1:
# 1 is cudaStreamCaptureStatusActive: The stream is capturing. # 1 is cudaStreamCaptureStatusActive: The stream is capturing.
return self.all_reduce(input, input, registered=True) return self.all_reduce(input, registered=True)
else: else:
# If warm up, mimic the allocation pattern since custom # If warm up, mimic the allocation pattern since custom
# allreduce is out-of-place. # allreduce is out-of-place.
return paddle.empty_like(input) return paddle.empty_like(input)
else: else:
return self.all_reduce(input, input, registered=False) return self.all_reduce(input, registered=False)
def clear_ipc_handles(self): def clear_ipc_handles(self):
clear_ipc_handles(self._ptr) clear_ipc_handles(self._ptr)
@@ -243,5 +243,5 @@ class DCUTritonWeightOnlyMoEMethod(QuantMethodBase):
out = intermediate_cache3.sum(axis=1) out = intermediate_cache3.sum(axis=1)
if layer.tp_size > 1: if layer.tp_size > 1:
tensor_model_parallel_all_reduce(out) out = tensor_model_parallel_all_reduce(out)
return out return out
@@ -180,7 +180,7 @@ class GCUFusedMoeMethod(UnquantizedFusedMoEMethod):
tensor_model_parallel_all_reduce, tensor_model_parallel_all_reduce,
) )
tensor_model_parallel_all_reduce(fused_moe_out) fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
return fused_moe_out return fused_moe_out
@@ -110,7 +110,7 @@ class MetaxCutlassMoEMethod(MoEMethodBase):
False, False,
) )
if layer.reduce_results and layer.tp_size > 1: if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce(fused_moe_out) fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
return fused_moe_out return fused_moe_out
@@ -282,5 +282,5 @@ class MetaxTritonWeightOnlyMoEMethod(QuantMethodBase):
down_proj_out.reshape_([token_num, top_k, hidden_size]) down_proj_out.reshape_([token_num, top_k, hidden_size])
out = down_proj_out.sum(axis=1) out = down_proj_out.sum(axis=1)
if layer.tp_size > 1: if layer.tp_size > 1:
tensor_model_parallel_all_reduce(out) out = tensor_model_parallel_all_reduce(out)
return out return out
@@ -171,7 +171,7 @@ class XPUMoEMethod(MoEMethodBase):
False, # moe group, used in deepseek False, # moe group, used in deepseek
) )
if layer.reduce_results and layer.tp_size > 1: if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce(fused_moe_out) fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
return fused_moe_out return fused_moe_out
@@ -459,7 +459,7 @@ class XPUWeightOnlyMoEMethod(XPUMoEMethod):
tmp_ffn_out = paddle.empty(x.shape, x.dtype) tmp_ffn_out = paddle.empty(x.shape, x.dtype)
if layer.reduce_results and layer.tp_size > 1: if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce(tmp_ffn_out) tmp_ffn_out = tensor_model_parallel_all_reduce(tmp_ffn_out)
return tmp_ffn_out return tmp_ffn_out
@@ -640,5 +640,5 @@ class XPUW4A8MoEMethod(XPUMoEMethod):
permute_indices_per_token.shape[1], permute_indices_per_token.shape[1],
) )
if layer.reduce_results and layer.tp_size > 1: if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce(tmp_ffn_out) tmp_ffn_out = tensor_model_parallel_all_reduce(tmp_ffn_out)
return tmp_ffn_out return tmp_ffn_out
+1 -1
View File
@@ -863,7 +863,7 @@ class RowParallelLinear(LinearBase):
out = paddle.matmul(x, self.weight) out = paddle.matmul(x, self.weight)
if self.reduce_results and self.nranks > 1: if self.reduce_results and self.nranks > 1:
tensor_model_parallel_all_reduce(out, self.tp_group) out = tensor_model_parallel_all_reduce(out, self.tp_group)
if not self.fd_config.quant_config and self.add_bias: if not self.fd_config.quant_config and self.add_bias:
out = paddle.add(out, self.bias) out = paddle.add(out, self.bias)
return out return out
@@ -298,7 +298,7 @@ class CutlassMoEMethod(UnquantizedFusedMoEMethod):
) )
if layer.reduce_results and layer.tp_size > 1: if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce(fused_moe_out, layer.fd_config.parallel_config.tp_group) fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out, layer.fd_config.parallel_config.tp_group)
return fused_moe_out return fused_moe_out
@@ -594,6 +594,6 @@ class DeepGemmFusedMoeMethod(MoEMethodBase):
1.0, 1.0,
)[0] )[0]
if layer.tp_size > 1: if layer.tp_size > 1:
tensor_model_parallel_all_reduce(tmp_ffn_out) tmp_ffn_out = tensor_model_parallel_all_reduce(tmp_ffn_out)
return tmp_ffn_out return tmp_ffn_out
@@ -354,6 +354,6 @@ class MarlinWeightOnlyMoEMethod(QuantMethodBase):
ffn_out = ffn_out.sum(axis=1) ffn_out = ffn_out.sum(axis=1)
if layer.reduce_results and layer.tp_size > 1: if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce(ffn_out) ffn_out = tensor_model_parallel_all_reduce(ffn_out)
return ffn_out return ffn_out
@@ -393,7 +393,7 @@ class TritonWeightOnlyMoEMethod(QuantMethodBase):
down_proj_out.reshape_([token_num, top_k, hidden_size]) down_proj_out.reshape_([token_num, top_k, hidden_size])
out = down_proj_out.sum(axis=1) out = down_proj_out.sum(axis=1)
if layer.reduce_results and layer.tp_size > 1: if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce(out) out = tensor_model_parallel_all_reduce(out)
return out return out
@@ -767,7 +767,7 @@ class Wfp8Afp8MoEMethod(QuantMethodBase):
out = down_proj_out.sum(axis=1) out = down_proj_out.sum(axis=1)
if layer.reduce_results and layer.tp_size > 1: if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce(out) out = tensor_model_parallel_all_reduce(out)
return out return out
@@ -1056,7 +1056,7 @@ class TensorWiseFP8MoEMethod(QuantMethodBase):
out = down_proj_out.sum(axis=1) out = down_proj_out.sum(axis=1)
if layer.tp_size > 1: if layer.tp_size > 1:
tensor_model_parallel_all_reduce(out) out = tensor_model_parallel_all_reduce(out)
return out return out
@@ -1460,6 +1460,6 @@ class BlockWiseFP8MoEMethod(QuantMethodBase):
out = intermediate_cache3.sum(axis=1) out = intermediate_cache3.sum(axis=1)
if layer.tp_size > 1: if layer.tp_size > 1:
tensor_model_parallel_all_reduce(out) out = tensor_model_parallel_all_reduce(out)
return out return out
@@ -318,7 +318,7 @@ class CutlassWint2FusedMoeMethod(Wint2MoeMethod):
) )
if layer.tp_size > 1: if layer.tp_size > 1:
tensor_model_parallel_all_reduce(fused_moe_out) fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
return fused_moe_out return fused_moe_out
@@ -488,6 +488,6 @@ class TritonWint2FusedMoeMethod(CutlassWint2FusedMoeMethod):
fused_moe_out = paddle.sum(intermediate_cache3, axis=1) fused_moe_out = paddle.sum(intermediate_cache3, axis=1)
if layer.tp_size > 1: if layer.tp_size > 1:
tensor_model_parallel_all_reduce(fused_moe_out) fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
return fused_moe_out return fused_moe_out
@@ -194,7 +194,7 @@ class DeepSeekV3MoE(nn.Layer):
moe_out = moe_out + shared_experts_out moe_out = moe_out + shared_experts_out
# We do to TP all reduce after the sum of experts. # We do to TP all reduce after the sum of experts.
if self.tp_size > 1: if self.tp_size > 1:
tensor_model_parallel_all_reduce(moe_out) moe_out = tensor_model_parallel_all_reduce(moe_out)
return moe_out return moe_out
@@ -300,7 +300,7 @@ class Ernie4_5_VLMoE(nn.Layer):
if self.num_shared_experts > 0: if self.num_shared_experts > 0:
hidden_states += shared_experts_out hidden_states += shared_experts_out
if self.tp_size > 1: if self.tp_size > 1:
tensor_model_parallel_all_reduce(hidden_states) hidden_states = tensor_model_parallel_all_reduce(hidden_states)
return hidden_states return hidden_states
+1 -1
View File
@@ -167,7 +167,7 @@ class Glm4Moe(nn.Layer):
out = out + shared_experts_out out = out + shared_experts_out
# We do to TP all reduce after the sum of experts. # We do to TP all reduce after the sum of experts.
if self.tensor_parallel_size > 1: if self.tensor_parallel_size > 1:
tensor_model_parallel_all_reduce(out, self.tp_group) out = tensor_model_parallel_all_reduce(out, self.tp_group)
return out return out
+1 -1
View File
@@ -57,7 +57,7 @@ class Test(unittest.TestCase):
data_custom_ar = paddle.rand([m, n], dtype="bfloat16") data_custom_ar = paddle.rand([m, n], dtype="bfloat16")
data_paddle = data_custom_ar.clone() data_paddle = data_custom_ar.clone()
if fa.should_custom_ar(data_custom_ar): if fa.should_custom_ar(data_custom_ar):
fa.custom_all_reduce(data_custom_ar) data_custom_ar = fa.custom_all_reduce(data_custom_ar)
dist.all_reduce(data_paddle) dist.all_reduce(data_paddle)
if dist.get_rank() == 0: if dist.get_rank() == 0:
np.testing.assert_allclose( np.testing.assert_allclose(