mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2026-04-23 00:17:25 +08:00
edd31e8849
* add * [tests] Add Paddle attention determinism tests and refactor resource manager Add comprehensive determinism tests for Paddle attention layer and refactor resource manager for deterministic mode support. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * add * add * add * add * add more * add more * fixsome * fixsome * fix bugs * fix bugs * only in gpu * add docs * fix comments * fix some * fix some * fix comments * add more * fix potential problem * remove not need * remove not need * remove no need * fix bug * fix bugs * fix comments * fix comments * Update tests/ce/deterministic/test_determinism_verification.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update tests/inter_communicator/test_ipc_signal.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update tests/layers/test_paddle_attention_determinism.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update tests/engine/test_sampling_params_determinism.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update tests/layers/test_paddle_attention_determinism.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update tests/layers/test_paddle_attention_determinism_standalone.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix comments * fix import error * fix a bug * fix bugs * fix bugs * fix coverage * refine codes * refine code * fix comments * fix comments * fix comments * rm not need * fix allreduce large tensor bug * mv log files * mv log files * add files --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
213 lines
7.8 KiB
Python
213 lines
7.8 KiB
Python
#!/usr/bin/env python
|
|
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
All-Reduce Deterministic Test with Real Communication
|
|
|
|
Tests:
|
|
1. Custom All-Reduce is deterministic for supported dtypes (float32, float16, bfloat16)
|
|
2. Non-16 byte aligned tensors raise RuntimeError in deterministic mode
|
|
3. Unsupported dtypes (int32) raise AssertionError in deterministic mode
|
|
|
|
Run:
|
|
python -m paddle.distributed.launch --gpus=0,1,2,3 tests/distributed/allreduce_deterministic.py
|
|
"""
|
|
|
|
import os
|
|
|
|
import paddle
|
|
import paddle.distributed as dist
|
|
import pytest
|
|
|
|
pytestmark = pytest.mark.gpu
|
|
|
|
from fastdeploy import envs
|
|
from fastdeploy.distributed import communication
|
|
from fastdeploy.distributed.communication import tensor_model_parallel_all_reduce
|
|
|
|
SUPPORTED_DTYPES = [paddle.float32, paddle.float16, paddle.bfloat16]
|
|
TENSOR_SIZE = 2048
|
|
NUM_RUNS = 20
|
|
|
|
|
|
def _create_tensor(size: int, dtype: paddle.dtype, rank: int) -> paddle.Tensor:
|
|
"""Create a test tensor with appropriate dtype and scaling."""
|
|
if dtype == paddle.int32:
|
|
return paddle.randint(-100, 100, shape=[size, 1], dtype=dtype) * (rank + 1)
|
|
return paddle.randn([size, 1], dtype=dtype) * (rank + 1)
|
|
|
|
|
|
def _check_results_identical(results: list) -> bool:
|
|
"""Check if all results are identical."""
|
|
if not results:
|
|
return True
|
|
return all((results[0] == r).all() for r in results[1:])
|
|
|
|
|
|
def _init_custom_allreduce(world_size: int):
|
|
"""Initialize custom all-reduce for testing."""
|
|
mp_group = dist.new_group(ranks=list(range(world_size)))
|
|
communication.use_custom_allreduce(mp_group, 8192 * 1024)
|
|
return mp_group
|
|
|
|
|
|
def _enable_deterministic_mode():
|
|
"""Enable deterministic mode via environment variable."""
|
|
os.environ["FD_DETERMINISTIC_MODE"] = "1"
|
|
assert envs.FD_DETERMINISTIC_MODE, f"FD_DETERMINISTIC_MODE should be True but got {envs.FD_DETERMINISTIC_MODE}"
|
|
|
|
|
|
def test_custom_allreduce_deterministic(rank, world_size, dtype):
|
|
"""Custom all-reduce should be deterministic."""
|
|
_mp_group = _init_custom_allreduce(world_size) # noqa: F841
|
|
results = []
|
|
|
|
for _ in range(NUM_RUNS):
|
|
paddle.seed(42 + rank)
|
|
x = _create_tensor(TENSOR_SIZE, dtype, rank)
|
|
result = tensor_model_parallel_all_reduce(x)
|
|
results.append(result.astype("float32").numpy().copy())
|
|
dist.barrier()
|
|
|
|
communication.custom_ar_clear_ipc_handles()
|
|
return _check_results_identical(results)
|
|
|
|
|
|
def _init_large_custom_allreduce(world_size: int):
|
|
"""Initialize custom all-reduce with 128MB buffer for large tensor tests."""
|
|
_enable_deterministic_mode()
|
|
large_max_size = 128 * 1024 * 1024 # 128MB
|
|
mp_group = dist.new_group(ranks=list(range(world_size)))
|
|
# Properly close old instance to free GPU buffers and IPC handles
|
|
if communication._TP_AR is not None:
|
|
communication._TP_AR.close()
|
|
communication._TP_AR = None
|
|
communication.use_custom_allreduce(mp_group, large_max_size)
|
|
|
|
|
|
def test_large_tensor_correctness(rank, world_size, dtype):
|
|
"""Large tensor (> default 8MB) should produce correct results with increased max_size."""
|
|
# 2M elements * 2 bytes (bf16) = 4MB; 8M elements * 2 bytes = 16MB (> 8MB default)
|
|
large_sizes = [2 * 1024 * 1024, 8 * 1024 * 1024]
|
|
for large_size in large_sizes:
|
|
expected_val = float(world_size * (world_size + 1) // 2)
|
|
x = paddle.full([large_size, 1], float(rank + 1), dtype=dtype)
|
|
result = tensor_model_parallel_all_reduce(x)
|
|
|
|
# Cast to float32 before numpy() since bfloat16 has no native numpy support
|
|
result_np = result.astype("float32").numpy().flatten()
|
|
max_diff = abs(result_np - expected_val).max()
|
|
if max_diff > 0.01:
|
|
raise AssertionError(
|
|
f"Large tensor AR mismatch for {dtype}, size={large_size}: "
|
|
f"expected={expected_val}, got_sample={result_np[:5]}, max_diff={max_diff}"
|
|
)
|
|
dist.barrier()
|
|
|
|
|
|
def test_large_tensor_deterministic(rank, world_size, dtype):
|
|
"""Multiple runs of large tensor all-reduce must produce bitwise-identical results."""
|
|
# 8M elements * 2 bytes (bf16) = 16MB, exceeds default 8MB
|
|
large_size = 8 * 1024 * 1024
|
|
results = []
|
|
for _ in range(NUM_RUNS):
|
|
paddle.seed(42 + rank)
|
|
x = _create_tensor(large_size, dtype, rank)
|
|
result = tensor_model_parallel_all_reduce(x)
|
|
results.append(result.astype("float32").numpy().copy())
|
|
dist.barrier()
|
|
|
|
return _check_results_identical(results)
|
|
|
|
|
|
def test_non_16_aligned_raises_error(rank, world_size):
|
|
"""Non-16 byte aligned tensors should raise RuntimeError in deterministic mode."""
|
|
_enable_deterministic_mode()
|
|
mp_group = _init_custom_allreduce(world_size)
|
|
# 1026 * 4 = 4104 bytes (NOT multiple of 16)
|
|
x = paddle.to_tensor([1.0] * 1026, dtype=paddle.float32).reshape([1026, 1])
|
|
|
|
try:
|
|
with pytest.raises(RuntimeError, match="DETERMINISTIC_MODE.*multiple of 16"):
|
|
tensor_model_parallel_all_reduce(x, group_=mp_group)
|
|
finally:
|
|
communication.custom_ar_clear_ipc_handles()
|
|
|
|
|
|
def test_unsupported_dtype_raises_error(rank, world_size):
|
|
"""Unsupported dtypes should raise AssertionError in deterministic mode."""
|
|
_enable_deterministic_mode()
|
|
mp_group = _init_custom_allreduce(world_size)
|
|
x = _create_tensor(TENSOR_SIZE, paddle.int32, rank)
|
|
|
|
try:
|
|
with pytest.raises(AssertionError, match="DETERMINISTIC_MODE.*not supported"):
|
|
tensor_model_parallel_all_reduce(x, group_=mp_group)
|
|
finally:
|
|
communication.custom_ar_clear_ipc_handles()
|
|
|
|
|
|
def main():
|
|
if not dist.is_initialized():
|
|
paddle.distributed.init_parallel_env()
|
|
|
|
rank = dist.get_rank()
|
|
world_size = dist.get_world_size()
|
|
|
|
assert world_size >= 2, f"Test requires at least 2 GPUs, got {world_size}"
|
|
|
|
print(f"All-Reduce Deterministic Test (world_size={world_size}, runs={NUM_RUNS})")
|
|
|
|
# Error path tests
|
|
test_non_16_aligned_raises_error(rank, world_size)
|
|
print("PASS: non-16 byte aligned tensor raises RuntimeError")
|
|
dist.barrier()
|
|
|
|
test_unsupported_dtype_raises_error(rank, world_size)
|
|
print("PASS: unsupported dtype (int32) raises AssertionError")
|
|
dist.barrier()
|
|
|
|
# Determinism tests for supported dtypes (small tensors)
|
|
for dtype in SUPPORTED_DTYPES:
|
|
assert test_custom_allreduce_deterministic(
|
|
rank, world_size, dtype
|
|
), f"Custom all-reduce is NOT deterministic for {dtype}"
|
|
print(f"PASS: custom all-reduce deterministic for {dtype}")
|
|
dist.barrier()
|
|
|
|
# Large tensor tests (> default 8MB, using increased max_size)
|
|
# Create one 128MB instance shared by all dtype tests to avoid IPC buffer leaks
|
|
_init_large_custom_allreduce(world_size)
|
|
|
|
for dtype in SUPPORTED_DTYPES:
|
|
test_large_tensor_correctness(rank, world_size, dtype)
|
|
print(f"PASS: large tensor all-reduce correctness for {dtype}")
|
|
dist.barrier()
|
|
|
|
for dtype in SUPPORTED_DTYPES:
|
|
assert test_large_tensor_deterministic(
|
|
rank, world_size, dtype
|
|
), f"Large tensor all-reduce is NOT deterministic for {dtype}"
|
|
print(f"PASS: large tensor all-reduce deterministic for {dtype}")
|
|
dist.barrier()
|
|
|
|
communication.custom_ar_clear_ipc_handles()
|
|
|
|
print("All tests passed.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|