[CI] 【Hackathon 10th Spring No.27】功能模块 fastdeploy/cache_manager/prefix_cache_manager.py单测补充 (#6297)

* test: update prefix cache manager tests

* test: refine prefix cache manager coverage helpers

* style: apply black formatting to test_prefix_cache_manager.py

Co-authored-by: Cursor <cursoragent@cursor.com>

* tests: update test_prefix_cache_manager

Co-authored-by: Cursor <cursoragent@cursor.com>

* update

---------

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
kesmeey
2026-03-02 20:04:12 +08:00
committed by GitHub
parent 3cf7c6c281
commit aae87e6ae2
@@ -16,13 +16,30 @@ import sys
import threading
import types
import unittest
import warnings
from functools import partial
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import numpy as np
import paddle
import pytest
warnings.simplefilter("ignore", DeprecationWarning)
pytestmark = pytest.mark.filterwarnings(
"ignore:ast.Num is deprecated and will be removed in Python 3.14; use ast.Constant instead:DeprecationWarning"
)
if not hasattr(paddle, "compat"):
paddle.compat = types.SimpleNamespace(enable_torch_proxy=lambda **_: None)
warnings.filterwarnings(
"ignore",
message="ast.Num is deprecated and will be removed in Python 3.14; use ast.Constant instead",
category=DeprecationWarning,
)
warnings.filterwarnings("ignore", category=DeprecationWarning, module=r"astor\.op_util")
# Module under test: PrefixCacheManager and related cache primitives.
from fastdeploy.cache_manager.cache_data import BlockNode, CacheStatus
from fastdeploy.cache_manager.prefix_cache_manager import PrefixCacheManager
@@ -385,6 +402,24 @@ class PrefixCacheManagerTest(unittest.TestCase):
self.assertEqual(hit_info["gpu_match_token_num"], 0)
manager.metrics.reset_metrics.assert_called_once()
def test_request_match_blocks_raises_with_paddle_tensor_prompt(self):
manager = _create_manager()
task = SimpleNamespace(
prompt_token_ids=paddle.to_tensor([1, 2, 3], dtype="int64"),
output_token_ids=[4],
request_id="paddle-prompt",
)
manager.prefix_tree_status_signal = SimpleNamespace(value=np.array([PrefixTreeStatus.NORMAL]))
with patch.object(
manager,
"mm_match_block",
return_value=([], [], [], manager.radix_tree_root, 0, 0),
):
with self.assertRaises(TypeError):
manager.request_match_blocks(task, block_size=2)
def test_get_required_block_num_rounds_up(self):
manager = _create_manager()
self.assertEqual(manager.get_required_block_num(0, 4), 0)
@@ -1140,6 +1175,264 @@ class TestPrefixCacheManagerCoverage(unittest.TestCase):
self.assertTrue(all(dim >= 0 for dim in key_shape))
self.assertTrue(all(dim is not None for dim in value_shape))
def test_storage_task_helpers_cover_sync_paths(self):
manager = _create_manager(num_gpu_blocks=4)
manager.kvcache_storage_backend = "memory"
manager.cache_task_queue = _DummyEngineCacheQueue()
class _DoneEvent:
def wait(self, timeout=None):
return True
with patch("fastdeploy.cache_manager.prefix_cache_manager.Event", _DoneEvent):
manager.issue_write_back_storage_task(
SimpleNamespace(task_id="write-1", keys=["a"], gpu_block_ids=[0]),
is_sync=True,
)
self.assertEqual(len(manager.cache_task_queue.tasks), 1)
self.assertNotIn("write-1", manager.task_write_back_event)
manager.storage_prefetch_block_ids["prefetch-1"] = [6, 7]
with patch("fastdeploy.cache_manager.prefix_cache_manager.Event", _DoneEvent):
block_ids = manager.issue_prefetch_storage_task(
SimpleNamespace(task_id="prefetch-1", keys=["k"], gpu_block_ids=[1]),
is_sync=True,
)
self.assertEqual(block_ids, [6, 7])
self.assertNotIn("prefetch-1", manager.task_prefetch_event)
def test_write_cache_to_storage_and_key_mismatch_error(self):
manager = _create_manager(num_gpu_blocks=4)
manager.kvcache_storage_backend = "memory"
leaf = _make_block_node(manager, 13, [1, 2], block_size=2)
manager.req_leaf_map["req-write"] = leaf
manager.issue_write_back_storage_task = MagicMock()
request = SimpleNamespace(
request_id="req-write",
prompt_token_ids=np.array([1, 2]),
output_token_ids=[3, 4],
block_tables=[2, 3],
)
manager.config.cache_config.enable_output_caching = True
manager.write_cache_to_storage(request)
manager.issue_write_back_storage_task.assert_called_once()
with self.assertRaises(ValueError):
manager.issue_write_back_storage_task = PrefixCacheManager.issue_write_back_storage_task.__get__(manager)
manager.cache_task_queue = _DummyEngineCacheQueue()
manager.issue_write_back_storage_task(
SimpleNamespace(task_id="bad", keys=["x"], gpu_block_ids=[]),
is_sync=False,
)
def test_get_block_hash_extra_keys_handles_multimodal_segments(self):
manager = _create_manager()
request = SimpleNamespace(
multimodal_inputs={
"mm_positions": [SimpleNamespace(offset=2, length=3), SimpleNamespace(offset=8, length=2)],
"mm_hashes": ["img-a", "img-b"],
},
num_total_tokens=12,
)
mm_idx, hash_keys = manager.get_block_hash_extra_keys(request, start_idx=0, end_idx=2, mm_idx=0)
self.assertEqual((mm_idx, hash_keys), (0, []))
mm_idx, hash_keys = manager.get_block_hash_extra_keys(request, start_idx=2, end_idx=6, mm_idx=0)
self.assertEqual(hash_keys, ["img-a"])
mm_idx, hash_keys = manager.get_block_hash_extra_keys(request, start_idx=7, end_idx=10, mm_idx=1)
self.assertEqual(hash_keys, ["img-b"])
def test_cache_output_blocks_updates_leaf_and_recycles_redundant_block(self):
manager = _create_manager(num_gpu_blocks=6, num_cpu_blocks=1)
manager.req_to_radix_tree_info["req-cache"] = (manager.radix_tree_root, 0)
manager.leaf_req_map[manager.radix_tree_root].add("req-cache")
existing = _make_block_node(manager, 31, [1, 2], block_size=2, cache_status=CacheStatus.GPU)
manager.gpu_lru_leaf_set.add(existing)
manager.gpu_lru_leaf_heap.append(existing)
recycled = []
manager.recycle_gpu_blocks = lambda ids, req_id=None: recycled.extend(ids if isinstance(ids, list) else [ids])
manager.recycle_cpu_blocks = lambda ids: None
task = SimpleNamespace(
request_id="req-cache",
prompt_token_ids=[1, 2],
output_token_ids=[3, 4],
block_tables=[5, 4],
multimodal_inputs=None,
num_total_tokens=4,
)
manager.cache_output_blocks(task, block_size=2)
self.assertIn(5, recycled)
self.assertEqual(task.num_cached_blocks, 2)
self.assertIn("req-cache", manager.req_leaf_map)
def test_mm_build_path_tracks_unfilled_block_ids(self):
manager = _create_manager(num_gpu_blocks=6)
request = SimpleNamespace(
request_id="req-mm-build",
prompt_token_ids=[1, 2, 3],
output_token_ids=[],
block_tables=[0, 1],
multimodal_inputs=None,
num_total_tokens=3,
)
leaf = manager.mm_build_path(
request=request,
num_computed_tokens=3,
block_size=2,
last_node=manager.radix_tree_root,
num_cached_tokens=0,
)
self.assertNotEqual(leaf, manager.radix_tree_root)
self.assertEqual(manager.unfilled_req_block_map["req-mm-build"], [])
def test_request_match_blocks_prefetches_storage_and_recycles_unused_blocks(self):
manager = _create_manager(num_gpu_blocks=6)
manager.kvcache_storage_backend = "memory"
manager.prefix_tree_status_signal = SimpleNamespace(value=np.array([PrefixTreeStatus.NORMAL]))
task = SimpleNamespace(prompt_token_ids=[1, 2, 3, 4, 5, 6], output_token_ids=[], request_id="storage-req")
with (
patch.object(manager, "mm_match_block", return_value=([], [], [], manager.radix_tree_root, 0, 0)),
patch.object(manager, "issue_prefetch_storage_task", return_value=[0]),
):
common_blocks, matched_tokens, metrics = manager.request_match_blocks(task, block_size=2)
self.assertEqual(common_blocks, [0])
self.assertEqual(matched_tokens, 2)
self.assertEqual(metrics["storage_match_token_num"], 2)
self.assertEqual(task.num_cached_blocks, 1)
self.assertEqual(manager.req_to_radix_tree_info["storage-req"][1], 0)
def test_free_cpu_block_ids_removes_recyclable_leaf(self):
manager = _create_manager(num_gpu_blocks=2, num_cpu_blocks=2)
cpu_hash = get_hash_str([9, 9])
node = BlockNode(
77,
[9, 9],
cpu_hash,
1,
0,
2,
cpu_hash,
0,
parent=manager.radix_tree_root,
cache_status=CacheStatus.CPU,
)
node.shared_count = 0
manager.radix_tree_root.children[cpu_hash] = node
manager.cpu_lru_leaf_heap.append(node)
manager.cpu_lru_leaf_set.add(node)
freed = manager.free_cpu_block_ids(1)
self.assertEqual(freed, 1)
self.assertNotIn(cpu_hash, manager.radix_tree_root.children)
self.assertIn(0, manager.cpu_free_block_list)
def test_is_chunked_mm_input_covers_all_paths(self):
manager = _create_manager()
self.assertEqual(manager.is_chunked_mm_input(None, 2), (False, 0))
mm_inputs = {"mm_positions": [SimpleNamespace(offset=3, length=2), SimpleNamespace(offset=8, length=2)]}
self.assertEqual(manager.is_chunked_mm_input(mm_inputs, 4), (True, 0))
self.assertEqual(manager.is_chunked_mm_input(mm_inputs, 1), (False, 0))
self.assertEqual(manager.is_chunked_mm_input(mm_inputs, 7), (False, 0))
def test_request_match_blocks_storage_allocate_failure_raises(self):
manager = _create_manager(num_gpu_blocks=2)
manager.kvcache_storage_backend = "memory"
manager.prefix_tree_status_signal = SimpleNamespace(value=np.array([PrefixTreeStatus.NORMAL]))
task = SimpleNamespace(prompt_token_ids=[1, 2, 3, 4], output_token_ids=[], request_id="storage-fail")
with (
patch.object(manager, "mm_match_block", return_value=([], [], [], manager.radix_tree_root, 0, 0)),
patch.object(manager, "can_allocate_gpu_blocks", side_effect=[True, False]),
):
with self.assertRaises(Exception):
manager.request_match_blocks(task, block_size=2)
def test_update_cache_blocks_ignores_exception_when_tree_not_normal(self):
manager = _create_manager()
manager.prefix_tree_status_signal = SimpleNamespace(value=np.array([PrefixTreeStatus.CLEARING]))
task = SimpleNamespace(request_id="req-err", num_cached_blocks=0)
manager.req_to_radix_tree_info["req-err"] = (manager.radix_tree_root, 0)
with patch.object(manager, "mm_build_path", side_effect=RuntimeError("boom")):
manager.update_cache_blocks(task, num_computed_tokens=2, block_size=2)
self.assertEqual(task.num_cached_blocks, 0)
def test_recv_data_transfer_result_handles_storage_events(self):
manager = _create_manager(num_gpu_blocks=4)
manager.tensor_parallel_size = 2
manager.prefix_tree_status_signal = SimpleNamespace(value=np.array([PrefixTreeStatus.NORMAL]))
prefetch_event = threading.Event()
write_event = threading.Event()
manager.task_prefetch_event["pref"] = prefetch_event
manager.task_write_back_event["write"] = write_event
payloads = [
(CacheStatus.STORAGE2GPU, "pref", ["h1"], [1, 2]),
(CacheStatus.STORAGE2GPU, "pref", ["h2"], [1]),
(CacheStatus.GPU2STORAGE, "write", ["h3"], [9]),
]
manager.cache_task_queue = _FakeTransferQueue(payloads)
with self.assertRaises(SystemExit):
manager.recv_data_transfer_result()
self.assertTrue(prefetch_event.is_set())
self.assertTrue(write_event.is_set())
self.assertEqual(manager.storage_prefetch_block_ids["pref"], [1])
def test_reset_waits_for_inflight_and_futures(self):
manager = _create_manager(num_gpu_blocks=3, num_cpu_blocks=1)
manager.cache_task_inflight_signal = SimpleNamespace(value=np.array([1], dtype=np.int32))
class _Queue:
def __init__(self):
self.calls = 0
def result_queue_empty(self):
self.calls += 1
return self.calls > 1
manager.cache_task_queue = _Queue()
manager.cpu_free_future = _CompletedFuture()
manager.gpu_free_task_future = _CompletedFuture()
def _sleep(_):
manager.cache_task_inflight_signal.value[:] = 0
with patch("fastdeploy.cache_manager.prefix_cache_manager.time.sleep", side_effect=_sleep):
manager.reset()
self.assertEqual(manager.radix_tree_root.node_id, -1)
self.assertIsNone(manager.cpu_free_future)
self.assertIsNone(manager.gpu_free_task_future)
def test_update_cache_blocks_raises_when_tree_normal_on_exception(self):
manager = _create_manager()
manager.prefix_tree_status_signal = SimpleNamespace(value=np.array([PrefixTreeStatus.NORMAL]))
task = SimpleNamespace(request_id="req-err-normal")
manager.req_to_radix_tree_info["req-err-normal"] = (manager.radix_tree_root, 0)
with patch.object(manager, "mm_build_path", side_effect=RuntimeError("boom")):
with self.assertRaises(RuntimeError):
manager.update_cache_blocks(task, num_computed_tokens=2, block_size=2)
def test_handle_swap_result_ignores_none_node(self):
manager = _create_manager()
self.assertIsNone(manager._handle_swap_result(None, 1, 2, CacheStatus.SWAP2CPU))
def test_reset_sets_empty_cpu_free_list_when_no_cpu_blocks(self):
manager = _create_manager(num_gpu_blocks=2, num_cpu_blocks=0)
manager.cache_task_inflight_signal = SimpleNamespace(value=np.array([0], dtype=np.int32))
manager.cache_task_queue = SimpleNamespace(result_queue_empty=lambda: True)
manager.cpu_lru_leaf_heap = [object()]
manager.cpu_lru_leaf_set = {object()}
manager.cpu_free_future = None
manager.gpu_free_task_future = None
manager.reset()
self.assertEqual(manager.cpu_free_block_list, [])
if __name__ == "__main__":
unittest.main()