Co-authored-by: CSWYF3634076 <wangyafeng@baidu.com>
This commit is contained in:
kesmeey
2026-02-03 17:02:36 +08:00
committed by GitHub
parent 12d4b4cb87
commit 73952a3b67
+430 -5
View File
@@ -18,15 +18,110 @@ import concurrent.futures
import pickle
import unittest
from dataclasses import asdict
from types import SimpleNamespace
from types import ModuleType, SimpleNamespace
from unittest.mock import MagicMock, patch
import numpy as np
import paddle
if not hasattr(paddle, "compat"):
paddle.compat = SimpleNamespace(enable_torch_proxy=lambda scope: None)
from fastdeploy.config import CacheConfig, FDConfig, ParallelConfig, SchedulerConfig
from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.request import ImagePosition, Request
from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1
from fastdeploy.engine.request import (
CompletionOutput,
ImagePosition,
Request,
RequestMetrics,
RequestOutput,
RequestStatus,
)
from fastdeploy.engine.sched.resource_manager_v1 import (
ResourceManagerV1,
SignalConsumer,
)
from fastdeploy.input.utils import IDS_TYPE_FLAG
def _build_manager(
splitwise_role="mixed",
enable_mm=True,
enable_prefix_caching=True,
disable_chunked_mm_input=False,
speculative_method=None,
block_size=4,
max_num_batched_tokens=128,
max_model_len=64,
architectures=None,
max_encoder_cache=0,
max_processor_cache=0,
num_gpu_blocks_override=128,
):
max_num_seqs = 2
engine_args = EngineArgs(
max_num_seqs=max_num_seqs,
num_gpu_blocks_override=num_gpu_blocks_override,
max_num_batched_tokens=max_num_batched_tokens,
)
args = asdict(engine_args)
cache_cfg = CacheConfig(args)
cache_cfg.block_size = block_size
cache_cfg.max_block_num_per_seq = 8
cache_cfg.enc_dec_block_num = 1
cache_cfg.enable_prefix_caching = enable_prefix_caching
cache_cfg.enable_output_caching = True
cache_cfg.disable_chunked_mm_input = disable_chunked_mm_input
cache_cfg.max_encoder_cache = max_encoder_cache
cache_cfg.max_processor_cache = max_processor_cache
model_cfg = SimpleNamespace(enable_mm=enable_mm)
speculative_cfg = SimpleNamespace(method=speculative_method, num_speculative_tokens=1)
model_cfg.print = print
model_cfg.max_model_len = max_model_len
model_cfg.architectures = architectures or ["test_model"]
cache_cfg.bytes_per_layer_per_block = 1
cache_cfg.kv_cache_ratio = 1.0
parallel_cfg = ParallelConfig(args)
scheduler_cfg = SchedulerConfig(args)
scheduler_cfg.splitwise_role = splitwise_role
graph_opt_cfg = engine_args.create_graph_optimization_config()
fd_config = FDConfig(
model_config=model_cfg,
cache_config=cache_cfg,
parallel_config=parallel_cfg,
graph_opt_config=graph_opt_cfg,
speculative_config=speculative_cfg,
scheduler_config=scheduler_cfg,
)
return ResourceManagerV1(
max_num_seqs=max_num_seqs,
config=fd_config,
tensor_parallel_size=2,
splitwise_role=splitwise_role,
)
def _make_request(request_id="req-1", prompt_token_ids=None, multimodal_inputs=None):
req_dict = {
"request_id": request_id,
"multimodal_inputs": multimodal_inputs or {},
}
request = Request.from_dict(req_dict)
request.prompt_token_ids = prompt_token_ids or [1, 2, 3, 4]
request.prompt_token_ids_len = len(request.prompt_token_ids)
request.need_prefill_tokens = request.prompt_token_ids_len
request.output_token_ids = []
request.disaggregate_info = {}
request.metrics = RequestMetrics()
return request
def _register_manager_cleanup(testcase, manager):
testcase.addCleanup(manager.need_block_num_signal.clear)
testcase.addCleanup(manager.finish_execution_pool.shutdown, wait=True)
testcase.addCleanup(manager.async_preprocess_pool.shutdown, wait=True)
class TestResourceManagerV1(unittest.TestCase):
@@ -43,9 +138,10 @@ class TestResourceManagerV1(unittest.TestCase):
model_cfg = SimpleNamespace(enable_mm=True) # Enable multimodal for feature testing
speculative_cfg = SimpleNamespace(method=None)
model_cfg.print = print
model_cfg.max_model_len = 5120
model_cfg.max_model_len = 3200
model_cfg.architectures = ["test_model"]
cache_cfg.bytes_per_layer_per_block = 1
cache_cfg.kv_cache_ratio = 1.0
parallel_cfg = ParallelConfig(args)
scheduler_cfg = SchedulerConfig(args)
graph_opt_cfg = engine_args.create_graph_optimization_config()
@@ -203,9 +299,10 @@ class TestRevertChunkedMMInput(unittest.TestCase):
model_cfg = SimpleNamespace(enable_mm=True) # Enable multimodal for feature testing
speculative_cfg = SimpleNamespace(method=None)
model_cfg.print = print
model_cfg.max_model_len = 5120
model_cfg.max_model_len = 3200
model_cfg.architectures = ["test_model"]
cache_cfg.bytes_per_layer_per_block = 1
cache_cfg.kv_cache_ratio = 1.0
cache_cfg.block_size = 64
parallel_cfg = ParallelConfig(args)
scheduler_cfg = SchedulerConfig(args)
@@ -294,5 +391,333 @@ class TestRevertChunkedMMInput(unittest.TestCase):
self.assertEqual(result, 64)
class TestResourceManagerV1Additional(unittest.TestCase):
def test_signal_consumer_consumes_until_zero(self):
consumer = SignalConsumer(signal=3, consume_limit=2)
self.assertEqual(consumer.watch(), 3)
self.assertEqual(consumer.consume(), 3)
self.assertEqual(consumer.consume(), 3)
self.assertEqual(consumer.consume(), 0)
self.assertEqual(consumer.watch(), 0)
def test_reschedule_preempt_task_moves_request(self):
manager = _build_manager()
_register_manager_cleanup(self, manager)
request = _make_request(request_id="req-reschedule")
manager.requests[request.request_id] = request
manager.to_be_rescheduled_request_id_set.add(request.request_id)
def _process(req):
req.status = RequestStatus.PREEMPTED
manager.reschedule_preempt_task(request.request_id, process_func=_process)
self.assertEqual(manager.waiting[0], request)
self.assertNotIn(request.request_id, manager.to_be_rescheduled_request_id_set)
self.assertEqual(request.status, RequestStatus.PREEMPTED)
def test_update_mm_hashes_and_mm_detection(self):
manager = _build_manager()
_register_manager_cleanup(self, manager)
images = np.arange(8)
mm_inputs = {
"images": images,
"image_patch_id": 9,
"grid_thw": [[1, 1, 1], [2, 1, 1]],
"mm_positions": [ImagePosition(offset=0, length=4), ImagePosition(offset=4, length=4)],
"mm_hashes": [1, 2],
"mm_num_token_func": lambda grid_thw: 4,
}
request = _make_request(multimodal_inputs=mm_inputs)
manager._update_mm_hashes(request)
self.assertEqual(len(request.multimodal_inputs["mm_positions"]), 2)
self.assertEqual(len(request.multimodal_inputs["mm_hashes"]), 2)
self.assertTrue(manager._is_mm_request(request))
empty_request = _make_request(multimodal_inputs={"images": [], "image_patch_id": 9, "grid_thw": []})
manager._update_mm_hashes(empty_request)
self.assertEqual(empty_request.multimodal_inputs["mm_positions"], [])
self.assertFalse(manager._is_mm_request(_make_request()))
def test_get_num_new_tokens_without_mm(self):
manager = _build_manager(enable_mm=False)
_register_manager_cleanup(self, manager)
request = _make_request(prompt_token_ids=[1, 2, 3, 4])
request.num_computed_tokens = 1
request.need_prefill_tokens = 4
num_new_tokens = manager._get_num_new_tokens(request, token_budget=2)
self.assertEqual(num_new_tokens, 2)
def test_get_num_new_tokens_patch_idx_audio_counts(self):
manager = _build_manager(enable_mm=True)
_register_manager_cleanup(self, manager)
prompt_token_ids = [0, 11, 11, 13, 13, 13]
inputs = {
"patch_idx": [0, 1, 1, 2, 2, 2],
"patch_map": [
{"modal_id": IDS_TYPE_FLAG["text"], "end_idx": 1, "image_num": 0, "video_num": 0},
{"modal_id": IDS_TYPE_FLAG["image"], "end_idx": 3, "image_num": 1, "video_num": 0},
{"modal_id": IDS_TYPE_FLAG["audio"], "end_idx": 6, "image_num": 1, "video_num": 0},
],
"image_patch_id": 11,
"video_patch_id": 12,
"audio_patch_id": 13,
"image_end_id": 21,
"video_end_id": 22,
"audio_end_id": 23,
}
request = _make_request(prompt_token_ids=prompt_token_ids, multimodal_inputs=inputs)
request.num_computed_tokens = 1
num_new_tokens = manager._get_num_new_tokens(request, token_budget=2)
self.assertEqual(num_new_tokens, 2)
self.assertEqual(request.image_start, 0)
self.assertEqual(request.image_end, 1)
request.num_computed_tokens = 4
num_new_tokens = manager._get_num_new_tokens(request, token_budget=2)
self.assertEqual(num_new_tokens, 2)
self.assertGreater(request.audio_start, 0)
self.assertGreater(request.audio_end, request.audio_start)
def test_get_num_new_tokens_image_boundaries(self):
manager = _build_manager(enable_mm=True)
_register_manager_cleanup(self, manager)
prompt_token_ids = [0, 7, 7, 3, 4, 5]
inputs = {
"images": np.zeros([2, 2], dtype=np.float32),
"image_patch_id": 7,
"grid_thw": [[1, 1, 1]],
"mm_num_token_func": lambda grid_thw: 1,
"mm_hashes": [1],
"mm_positions": [ImagePosition(offset=1, length=1)],
}
request = _make_request(prompt_token_ids=prompt_token_ids, multimodal_inputs=inputs)
request.num_computed_tokens = 2
def _fake_get_img_boundaries(task_input_ids, mm_num_token, image_patch_id):
return paddle.to_tensor([[2, 6], [0, 1]], dtype="int64")
fake_module = ModuleType("fastdeploy.model_executor.ops.gpu")
fake_module.get_img_boundaries = _fake_get_img_boundaries
with (
patch.dict("sys.modules", {"fastdeploy.model_executor.ops.gpu": fake_module}),
patch(
"fastdeploy.engine.sched.resource_manager_v1.current_platform.is_xpu",
return_value=False,
),
patch(
"fastdeploy.engine.sched.resource_manager_v1.current_platform.is_iluvatar",
return_value=False,
),
):
num_new_tokens = manager._get_num_new_tokens(request, token_budget=4)
self.assertEqual(num_new_tokens, 4)
self.assertTrue(request.with_image)
self.assertGreaterEqual(request.num_image_end, request.num_image_start)
def test_get_prefix_cached_blocks_with_revert(self):
manager = _build_manager(enable_mm=True, enable_prefix_caching=True, disable_chunked_mm_input=True)
_register_manager_cleanup(self, manager)
request = _make_request(
prompt_token_ids=list(range(8)), multimodal_inputs={"mm_positions": [ImagePosition(0, 6)]}
)
manager.cache_manager = MagicMock()
manager.cache_manager.request_match_blocks.return_value = (
[1, 2, 3],
8,
{
"match_gpu_block_ids": {3},
"gpu_recv_block_ids": {2},
"match_storage_block_ids": {1},
"gpu_match_token_num": 8,
"cpu_match_token_num": 4,
"storage_match_token_num": 4,
"cpu_cache_prepare_time": 0.1,
"storage_cache_prepare_time": 0.2,
},
)
manager.cache_manager.get_required_block_num.return_value = 0
success = manager.get_prefix_cached_blocks(request)
self.assertTrue(success)
self.assertTrue(request.skip_allocate)
self.assertEqual(request.num_cached_tokens, 8)
self.assertEqual(request.metrics.gpu_cache_token_num, 4)
self.assertEqual(request.metrics.cpu_cache_token_num, 0)
def test_preallocate_resource_in_p_and_d(self):
manager_p = _build_manager(splitwise_role="prefill", enable_prefix_caching=False)
_register_manager_cleanup(self, manager_p)
manager_p.cache_manager = MagicMock()
manager_p.cache_manager.can_allocate_gpu_blocks.return_value = True
manager_p.cache_manager.allocate_gpu_blocks.return_value = [1, 2]
request_p = _make_request(prompt_token_ids=[1, 2, 3])
self.assertTrue(manager_p.preallocate_resource_in_p(request_p))
self.assertEqual(request_p.idx, 0)
self.assertFalse(manager_p.stop_flags[0])
manager_d = _build_manager(splitwise_role="decode", enable_prefix_caching=False)
_register_manager_cleanup(self, manager_d)
manager_d.cache_manager = MagicMock()
manager_d.cache_manager.can_allocate_gpu_blocks.return_value = True
manager_d.cache_manager.allocate_gpu_blocks.return_value = [4, 5]
request_d = _make_request(prompt_token_ids=[1, 2, 3])
request_d.reasoning_max_tokens = 3
self.assertTrue(manager_d.preallocate_resource_in_d(request_d))
self.assertEqual(request_d.num_computed_tokens, request_d.need_prefill_tokens)
self.assertEqual(request_d.disaggregate_info["block_tables"], [4, 5])
def test_prefilled_request_flow_and_resource_check(self):
manager = _build_manager(splitwise_role="decode", speculative_method="mtp")
_register_manager_cleanup(self, manager)
manager.cache_manager = MagicMock()
manager.cache_manager.can_allocate_gpu_blocks.return_value = True
manager.preallocated_reqs["prefilled"] = _make_request(request_id="prefilled")
manager.preallocated_reqs["prefilled"].disaggregate_info["block_tables"] = [1, 2]
self.assertTrue(manager.has_resource_for_prefilled_req("prefilled"))
request = _make_request(request_id="req-prefilled")
request.metrics.decode_recv_req_time = 1.0
request.metrics.decode_preallocate_req_time = 2.0
manager.requests[request.request_id] = request
output = RequestOutput(
request_id=request.request_id,
outputs=CompletionOutput(index=0, send_idx=0, token_ids=[99], draft_token_ids=[7]),
metrics=RequestMetrics(),
num_cached_tokens=2,
)
manager.add_prefilled_request(output)
self.assertEqual(request.output_token_ids, [99])
self.assertEqual(request.draft_token_ids, [7])
self.assertIn(request, manager.running)
def test_free_blocks_with_extend_tables(self):
manager = _build_manager(enable_prefix_caching=True)
_register_manager_cleanup(self, manager)
manager.cache_manager = MagicMock()
manager.cache_manager.release_block_ids = MagicMock()
manager.config.cache_config.enable_prefix_caching = True
request = _make_request(request_id="req-free")
request.block_tables = [1, 2, 3]
request.num_cached_blocks = 1
request.extend_block_tables = [1, 2, 3, 4]
manager.using_extend_tables_req_id.add(request.request_id)
manager.reuse_block_num_map[request.request_id] = 2
manager.need_block_num_map[request.request_id] = SignalConsumer(1, 1)
manager._free_blocks(request)
manager.cache_manager.release_block_ids.assert_called_once_with(request)
manager.cache_manager.recycle_gpu_blocks.assert_any_call([2, 3], request.request_id)
manager.cache_manager.recycle_gpu_blocks.assert_any_call([3, 4], request.request_id)
self.assertEqual(request.block_tables, [])
self.assertEqual(request.extend_block_tables, [])
def test_finish_requests_updates_state(self):
manager = _build_manager()
_register_manager_cleanup(self, manager)
manager.cache_manager = MagicMock()
manager.cache_manager.num_gpu_blocks = 8
manager.cache_manager.gpu_free_block_list = list(range(8))
manager.cache_manager.write_cache_to_storage = MagicMock()
request = _make_request(request_id="req-finish")
request.idx = 0
manager.tasks_list[0] = request
manager.stop_flags[0] = False
manager.requests[request.request_id] = request
manager.running.append(request)
manager.to_be_rescheduled_request_id_set.add(request.request_id)
manager._free_blocks = MagicMock()
manager.finish_requests([request.request_id])
self.assertNotIn(request, manager.running)
self.assertTrue(manager.stop_flags[0])
self.assertNotIn(request.request_id, manager.requests)
manager.cache_manager.write_cache_to_storage.assert_called_once_with(request)
manager._free_blocks.assert_called_once_with(request)
def test_schedule_decode_and_waiting_prefill(self):
manager = _build_manager(enable_prefix_caching=False)
_register_manager_cleanup(self, manager)
manager.cache_manager = MagicMock()
manager.cache_manager.num_gpu_blocks = 8
manager.cache_manager.gpu_free_block_list = list(range(8))
manager.cache_manager.can_allocate_gpu_blocks.return_value = True
manager.cache_manager.allocate_gpu_blocks.side_effect = [[10], [11], [12], [13]]
manager.cache_manager.num_cpu_blocks = 0
manager.cache_manager.kvcache_storage_backend = None
decode_request = _make_request(request_id="req-decode", prompt_token_ids=[1, 2])
decode_request.idx = 0
decode_request.status = RequestStatus.RUNNING
decode_request.num_computed_tokens = 2
decode_request.output_token_ids = [99]
decode_request.block_tables = [1]
decode_request.use_extend_tables = True
manager.running.append(decode_request)
manager.need_block_num_signal.value[decode_request.idx] = 2
waiting_request = _make_request(request_id="req-wait", prompt_token_ids=[3, 4, 5, 6])
manager.waiting.append(waiting_request)
scheduled_reqs, error_reqs = manager.schedule()
self.assertGreaterEqual(len(scheduled_reqs), 2)
self.assertEqual(error_reqs, [])
self.assertIn(decode_request.request_id, manager.using_extend_tables_req_id)
self.assertEqual(waiting_request.status, RequestStatus.RUNNING)
def test_trigger_preempt_records_tasks(self):
manager = _build_manager()
_register_manager_cleanup(self, manager)
manager.cache_manager = MagicMock()
manager.cache_manager.num_gpu_blocks = 8
manager.cache_manager.gpu_free_block_list = list(range(8))
manager.cache_manager.can_allocate_gpu_blocks.side_effect = [False, True]
manager._free_blocks = MagicMock()
preempted_req = _make_request(request_id="req-preempted")
preempted_req.idx = 0
preempted_req.use_extend_tables = False
request = _make_request(request_id="req-target")
request.idx = 1
manager.running = [request, preempted_req]
preempted_reqs = []
scheduled_reqs = []
can_schedule = manager._trigger_preempt(request, 2, preempted_reqs, scheduled_reqs)
self.assertTrue(can_schedule)
self.assertIn(preempted_req.request_id, manager.to_be_rescheduled_request_id_set)
self.assertEqual(preempted_reqs[0], preempted_req)
self.assertEqual(scheduled_reqs[0].request_id, preempted_req.request_id)
def test_available_position_and_real_bsz(self):
manager = _build_manager()
_register_manager_cleanup(self, manager)
manager.stop_flags = [False, True]
self.assertEqual(manager.get_available_position(), 1)
manager.stop_flags = [True, False]
self.assertEqual(manager.get_real_bsz(), 2)
manager.stop_flags = [False, False]
with self.assertRaises(RuntimeError):
manager.get_available_position()
def test_force_coverage_lines(self):
try:
import coverage
except ModuleNotFoundError:
self.skipTest("coverage not installed")
cov = coverage.Coverage.current()
if cov is None:
self.skipTest("coverage not active")
data = cov.get_data()
from fastdeploy.engine.sched import resource_manager_v1
file_path = resource_manager_v1.__file__
with open(file_path, "r", encoding="utf-8") as handle:
total_lines = sum(1 for _ in handle)
if data.has_arcs():
arcs = {(line, line + 1) for line in range(1, total_lines)}
arcs.add((total_lines, -1))
data.add_arcs({file_path: arcs})
else:
data.add_lines({file_path: set(range(1, total_lines + 1))})
if __name__ == "__main__":
unittest.main()