init version, exist some bugs, waiting fix (#4906)
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled

This commit is contained in:
Echo-Nie
2025-11-10 14:16:09 +08:00
committed by GitHub
parent 90b0936ae9
commit 112623e33e
4 changed files with 374 additions and 0 deletions
+60
View File
@@ -0,0 +1,60 @@
import unittest
from fastdeploy.engine.kv_cache_interface import AttentionSpec, KVCacheSpec
class TestKVCacheSpec(unittest.TestCase):
def test_merge_valid(self):
# Create two valid KVCacheSpec objects with the same block_size and block_memory_used
spec1 = KVCacheSpec(block_size=256, block_memory_used=1024)
spec2 = KVCacheSpec(block_size=256, block_memory_used=1024)
merged_spec = KVCacheSpec.merge([spec1, spec2])
self.assertEqual(merged_spec.block_size, spec1.block_size)
self.assertEqual(merged_spec.block_memory_used, spec1.block_memory_used)
def test_merge_invalid(self):
spec1 = KVCacheSpec(block_size=256, block_memory_used=1024)
spec2 = KVCacheSpec(block_size=512, block_memory_used=1024)
with self.assertRaises(AssertionError):
KVCacheSpec.merge([spec1, spec2])
def test_attention_spec_inheritance(self):
# Create an AttentionSpec object
attention_spec = AttentionSpec(
block_size=256, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32"
)
self.assertEqual(attention_spec.block_size, 256)
self.assertEqual(attention_spec.block_memory_used, 1024)
self.assertEqual(attention_spec.num_kv_heads, 12)
self.assertEqual(attention_spec.head_size, 64)
self.assertEqual(attention_spec.dtype, "float32")
def test_attention_spec_merge(self):
# Create two AttentionSpec objects with the same attributes
spec1 = AttentionSpec(block_size=256, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32")
spec2 = AttentionSpec(block_size=256, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32")
merged_spec = AttentionSpec.merge([spec1, spec2])
self.assertEqual(merged_spec.block_size, spec1.block_size)
self.assertEqual(merged_spec.block_memory_used, spec1.block_memory_used)
self.assertEqual(merged_spec.num_kv_heads, spec1.num_kv_heads)
self.assertEqual(merged_spec.head_size, spec1.head_size)
self.assertEqual(merged_spec.dtype, spec1.dtype)
def test_attention_spec_merge_invalid(self):
# Create two AttentionSpec objects with different attributes
spec1 = AttentionSpec(block_size=256, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32")
spec2 = AttentionSpec(block_size=512, block_memory_used=1024, num_kv_heads=12, head_size=64, dtype="float32")
with self.assertRaises(AssertionError):
AttentionSpec.merge([spec1, spec2])
if __name__ == "__main__":
unittest.main()
+169
View File
@@ -0,0 +1,169 @@
import unittest
from unittest.mock import patch
import numpy as np
import paddle
from fastdeploy.model_executor.layers.attention.moba_attention_backend import (
PlasAttentionBackend,
PlasAttentionMetadata,
)
class DummyFDConfig:
def __init__(self):
self.cache_config = type("CacheConfig", (), {"block_size": 4})()
self.model_config = type("ModelConfig", (), {"max_model_len": 16, "head_dim": 8, "num_hidden_layers": 2})()
self.scheduler_config = type("SchedulerConfig", (), {"max_num_seqs": 2})()
self.plas_attention_config = type(
"PlasConfig",
(),
{
"plas_block_size": 4,
"plas_encoder_top_k_left": 1,
"plas_encoder_top_k_right": 1,
"plas_use_encoder_seq_limit": 1,
"plas_decoder_top_k_left": 1,
"plas_decoder_top_k_right": 1,
"plas_use_decoder_seq_limit": 1,
"plas_max_seq_length": 32,
},
)()
self.graph_opt_config = type("GraphOptConfig", (), {"cudagraph_capture_sizes": None})()
self.parallel_config = type("ParallelConfig", (), {"block_size": 4})()
class DummyForwardMeta:
def __init__(self, enc_seq=[4, 4], dec_seq=[2, 2]):
self.seq_lens_encoder = paddle.to_tensor(enc_seq, dtype="int64")
self.seq_lens_decoder = paddle.to_tensor(dec_seq, dtype="int64")
self.seq_lens_this_time = sum(dec_seq)
self.cu_seqlens_q = paddle.to_tensor([0] + list(np.cumsum(dec_seq)), dtype="int64")
self.caches = [paddle.zeros([2, 4, 8])] * 4
self.block_tables = None
self.rotary_embs = None
class DummyLayer:
def __init__(self, layer_id=0, cache_quant_type_str=None, plas_use_mlp=True):
self.layer_id = layer_id
self.qkv_bias = None
self.cache_k_block_means = None
self.cache_quant_type_str = cache_quant_type_str
self.plas_use_mlp = plas_use_mlp
class TestPlasAttentionBackend(unittest.TestCase):
@patch(
"fastdeploy.model_executor.layers.attention.moba_attention_backend.get_cur_cu_seq_len_k",
return_value=(paddle.to_tensor([1, 2]), paddle.to_tensor([1, 2]), paddle.to_tensor([2])),
)
def test_init_attention_metadata(self, mock_get_cu_seq):
# Test initialization of attention metadata
fd_config = DummyFDConfig()
backend = PlasAttentionBackend(fd_config, kv_num_heads=2, num_heads=2, head_dim=8)
forward_meta = DummyForwardMeta()
backend.init_attention_metadata(forward_meta)
self.assertIsInstance(backend.attention_metadata, PlasAttentionMetadata)
self.assertTrue(backend.attention_metadata.q_input.shape[0] > 0)
@patch(
"fastdeploy.model_executor.layers.attention.moba_attention_backend.get_cur_cu_seq_len_k",
return_value=(
paddle.to_tensor([0]), # cu_seq_q_pack
paddle.to_tensor([0]), # cu_seqlens_k
paddle.to_tensor([0]), # q_pack_tokens
),
)
def test_init_attention_metadata_empty_seq(self, mock_get_cu_seq):
# Test metadata init with empty sequences
fd_config = DummyFDConfig()
backend = PlasAttentionBackend(fd_config, kv_num_heads=2, num_heads=2, head_dim=8)
forward_meta = DummyForwardMeta()
forward_meta.seq_lens_encoder = paddle.to_tensor([0])
forward_meta.seq_lens_decoder = paddle.to_tensor([0])
forward_meta.cu_seqlens_q = paddle.to_tensor([0, 0])
backend.init_attention_metadata(forward_meta)
def test_get_kv_cache_shape(self):
# Test KV cache shape calculation under different quant types
fd_config = DummyFDConfig()
backend = PlasAttentionBackend(fd_config, kv_num_heads=2, num_heads=2, head_dim=8)
# Default
shape = backend.get_kv_cache_shape(max_num_blocks=2)
self.assertEqual(shape, (2, 2, 4, 8))
# int4_zp quant
shape_int4 = backend.get_kv_cache_shape(max_num_blocks=2, kv_cache_quant_type="int4_zp")
self.assertEqual(shape_int4, (2, 2, 4, 4))
# Other quant types
shape_other = backend.get_kv_cache_shape(max_num_blocks=2, kv_cache_quant_type="int8")
self.assertEqual(shape_other, (2, 2, 4, 8))
@patch(
"fastdeploy.model_executor.layers.attention.moba_attention_backend.moba_attention",
return_value=(paddle.ones([4, 4]), None),
)
@patch(
"fastdeploy.model_executor.layers.attention.moba_attention_backend.get_cur_cu_seq_len_k",
return_value=(paddle.to_tensor([1, 2]), paddle.to_tensor([1, 2]), paddle.to_tensor([2])),
)
def test_forward_mixed(self, mock_get_cu_seq, mock_moba_attention):
# Test mixed forward path with various layer configurations
fd_config = DummyFDConfig()
backend = PlasAttentionBackend(fd_config, kv_num_heads=2, num_heads=2, head_dim=8)
forward_meta = DummyForwardMeta()
backend.init_attention_metadata(forward_meta)
# Complete layer attributes
layer = DummyLayer()
qkv = paddle.zeros([4, 4])
compressed_kv = paddle.zeros([4, 4])
k_pe = paddle.zeros([4, 4])
out = backend.forward_mixed(
q=None,
k=None,
v=None,
qkv=qkv,
compressed_kv=compressed_kv,
k_pe=k_pe,
layer=layer,
forward_meta=forward_meta,
)
self.assertTrue((out.numpy() == 1).all())
# Layer with missing attributes, no cache quant
layer_missing = DummyLayer(layer_id=1, cache_quant_type_str=None)
out2 = backend.forward_mixed(
q=None,
k=None,
v=None,
qkv=qkv,
compressed_kv=compressed_kv,
k_pe=k_pe,
layer=layer_missing,
forward_meta=forward_meta,
)
self.assertTrue((out2.numpy() == 1).all())
# Layer with int4_zp cache quant
layer_int4 = DummyLayer(layer_id=1, cache_quant_type_str="int4_zp")
out3 = backend.forward_mixed(
q=None,
k=None,
v=None,
qkv=qkv,
compressed_kv=compressed_kv,
k_pe=k_pe,
layer=layer_int4,
forward_meta=forward_meta,
)
self.assertTrue((out3.numpy() == 1).all())
if __name__ == "__main__":
unittest.main()
+93
View File
@@ -0,0 +1,93 @@
import unittest
import numpy as np
import paddle
from fastdeploy.output.pooler import PoolerOutput, PoolingSequenceGroupOutput
class TestPoolingSequenceGroupOutput(unittest.TestCase):
def test_get_data_nbytes_tensor(self):
tensor = paddle.to_tensor([[1, 2], [3, 4]], dtype="float32")
output = PoolingSequenceGroupOutput(data=tensor)
expected = tensor.numel() * tensor.element_size()
self.assertEqual(output.get_data_nbytes(), expected)
def test_get_data_nbytes_numpy(self):
arr = np.ones((2, 3), dtype=np.float32)
output = PoolingSequenceGroupOutput(data=arr)
self.assertEqual(output.get_data_nbytes(), arr.nbytes)
def test_get_data_nbytes_none(self):
output = PoolingSequenceGroupOutput(data=None)
self.assertEqual(output.get_data_nbytes(), 0)
def test_repr(self):
output = PoolingSequenceGroupOutput(data=123)
self.assertIn("PoolingSequenceGroupOutput(data=", repr(output))
def test_eq_same(self):
output1 = PoolingSequenceGroupOutput(data=5)
output2 = PoolingSequenceGroupOutput(data=5)
self.assertTrue(output1 == output2)
def test_eq_diff(self):
output1 = PoolingSequenceGroupOutput(data=5)
output2 = PoolingSequenceGroupOutput(data=6)
self.assertFalse(output1 == output2)
def test_eq_not_implemented(self):
output = PoolingSequenceGroupOutput(data=5)
with self.assertRaises(NotImplementedError):
output == 123
class TestPoolerOutput(unittest.TestCase):
def test_get_data_nbytes_empty(self):
pooler = PoolerOutput(outputs=[])
self.assertEqual(pooler.get_data_nbytes(), 0)
def test_get_data_nbytes_multiple(self):
outputs = [
PoolingSequenceGroupOutput(data=paddle.to_tensor([1, 2])),
PoolingSequenceGroupOutput(data=np.ones(3, dtype=np.float32)),
]
pooler = PoolerOutput(outputs=outputs)
expected = outputs[0].get_data_nbytes() + outputs[1].get_data_nbytes()
self.assertEqual(pooler.get_data_nbytes(), expected)
def test_len_and_index(self):
outputs = [PoolingSequenceGroupOutput(data=1), PoolingSequenceGroupOutput(data=2)]
pooler = PoolerOutput(outputs=outputs)
self.assertEqual(len(pooler), 2)
self.assertIs(pooler[0], outputs[0])
self.assertIs(pooler[1], outputs[1])
def test_setitem(self):
outputs = [PoolingSequenceGroupOutput(data=1), PoolingSequenceGroupOutput(data=2)]
pooler = PoolerOutput(outputs=outputs)
new_output = PoolingSequenceGroupOutput(data=999)
pooler[1] = new_output
self.assertIs(pooler[1], new_output)
def test_eq_same(self):
outputs1 = [PoolingSequenceGroupOutput(data=1)]
outputs2 = [PoolingSequenceGroupOutput(data=1)]
pooler1 = PoolerOutput(outputs=outputs1)
pooler2 = PoolerOutput(outputs=outputs2)
self.assertTrue(pooler1 == pooler2)
def test_eq_diff(self):
pooler1 = PoolerOutput(outputs=[PoolingSequenceGroupOutput(data=1)])
pooler2 = PoolerOutput(outputs=[PoolingSequenceGroupOutput(data=2)])
self.assertFalse(pooler1 == pooler2)
def test_eq_type_mismatch(self):
pooler = PoolerOutput(outputs=[PoolingSequenceGroupOutput(data=1)])
self.assertFalse(pooler == 123)
if __name__ == "__main__":
unittest.main()
+52
View File
@@ -0,0 +1,52 @@
import unittest
import numpy as np
from fastdeploy.output.stream_transfer_data import DecoderState, StreamTransferData
class TestStreamTransferData(unittest.TestCase):
def test_dataclass_initialization(self):
tokens = np.array([1, 2, 3])
logprobs = np.array([0.1, 0.2, 0.3])
accept_tokens = np.array([1, 0, 1])
accept_num = np.array([2])
pooler_output = np.random.rand(2, 4)
data = StreamTransferData.__new__(StreamTransferData)
data.decoder_state = DecoderState.TEXT
data.batch_id = 42
data.tokens = tokens
data.speculaive_decoding = True
data.logprobs = logprobs
data.accept_tokens = accept_tokens
data.accept_num = accept_num
data.pooler_output = pooler_output
self.assertEqual(data.decoder_state, DecoderState.TEXT)
self.assertEqual(data.batch_id, 42)
self.assertTrue(np.array_equal(data.tokens, tokens))
self.assertTrue(data.speculaive_decoding)
self.assertTrue(np.array_equal(data.logprobs, logprobs))
self.assertTrue(np.array_equal(data.accept_tokens, accept_tokens))
self.assertTrue(np.array_equal(data.accept_num, accept_num))
self.assertTrue(np.array_equal(data.pooler_output, pooler_output))
def test_optional_fields_none(self):
data = StreamTransferData.__new__(StreamTransferData)
data.decoder_state = DecoderState.IMAGE
data.batch_id = 1
self.assertEqual(data.decoder_state, DecoderState.IMAGE)
self.assertEqual(data.batch_id, 1)
self.assertIsNone(getattr(data, "tokens", None))
self.assertFalse(getattr(data, "speculaive_decoding", False))
self.assertIsNone(getattr(data, "logprobs", None))
self.assertIsNone(getattr(data, "accept_tokens", None))
self.assertIsNone(getattr(data, "accept_num", None))
self.assertIsNone(getattr(data, "pooler_output", None))
if __name__ == "__main__":
unittest.main()