diff --git a/tests/v1/test_resource_manager_v1.py b/tests/v1/test_resource_manager_v1.py index 41e18f03d8..d34306bd2c 100644 --- a/tests/v1/test_resource_manager_v1.py +++ b/tests/v1/test_resource_manager_v1.py @@ -18,15 +18,110 @@ import concurrent.futures import pickle import unittest from dataclasses import asdict -from types import SimpleNamespace +from types import ModuleType, SimpleNamespace from unittest.mock import MagicMock, patch import numpy as np +import paddle + +if not hasattr(paddle, "compat"): + paddle.compat = SimpleNamespace(enable_torch_proxy=lambda scope: None) from fastdeploy.config import CacheConfig, FDConfig, ParallelConfig, SchedulerConfig from fastdeploy.engine.args_utils import EngineArgs -from fastdeploy.engine.request import ImagePosition, Request -from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1 +from fastdeploy.engine.request import ( + CompletionOutput, + ImagePosition, + Request, + RequestMetrics, + RequestOutput, + RequestStatus, +) +from fastdeploy.engine.sched.resource_manager_v1 import ( + ResourceManagerV1, + SignalConsumer, +) +from fastdeploy.input.utils import IDS_TYPE_FLAG + + +def _build_manager( + splitwise_role="mixed", + enable_mm=True, + enable_prefix_caching=True, + disable_chunked_mm_input=False, + speculative_method=None, + block_size=4, + max_num_batched_tokens=128, + max_model_len=64, + architectures=None, + max_encoder_cache=0, + max_processor_cache=0, + num_gpu_blocks_override=128, +): + max_num_seqs = 2 + engine_args = EngineArgs( + max_num_seqs=max_num_seqs, + num_gpu_blocks_override=num_gpu_blocks_override, + max_num_batched_tokens=max_num_batched_tokens, + ) + args = asdict(engine_args) + + cache_cfg = CacheConfig(args) + cache_cfg.block_size = block_size + cache_cfg.max_block_num_per_seq = 8 + cache_cfg.enc_dec_block_num = 1 + cache_cfg.enable_prefix_caching = enable_prefix_caching + cache_cfg.enable_output_caching = True + cache_cfg.disable_chunked_mm_input = disable_chunked_mm_input + cache_cfg.max_encoder_cache = max_encoder_cache + cache_cfg.max_processor_cache = max_processor_cache + model_cfg = SimpleNamespace(enable_mm=enable_mm) + speculative_cfg = SimpleNamespace(method=speculative_method, num_speculative_tokens=1) + model_cfg.print = print + model_cfg.max_model_len = max_model_len + model_cfg.architectures = architectures or ["test_model"] + cache_cfg.bytes_per_layer_per_block = 1 + cache_cfg.kv_cache_ratio = 1.0 + parallel_cfg = ParallelConfig(args) + scheduler_cfg = SchedulerConfig(args) + scheduler_cfg.splitwise_role = splitwise_role + graph_opt_cfg = engine_args.create_graph_optimization_config() + + fd_config = FDConfig( + model_config=model_cfg, + cache_config=cache_cfg, + parallel_config=parallel_cfg, + graph_opt_config=graph_opt_cfg, + speculative_config=speculative_cfg, + scheduler_config=scheduler_cfg, + ) + return ResourceManagerV1( + max_num_seqs=max_num_seqs, + config=fd_config, + tensor_parallel_size=2, + splitwise_role=splitwise_role, + ) + + +def _make_request(request_id="req-1", prompt_token_ids=None, multimodal_inputs=None): + req_dict = { + "request_id": request_id, + "multimodal_inputs": multimodal_inputs or {}, + } + request = Request.from_dict(req_dict) + request.prompt_token_ids = prompt_token_ids or [1, 2, 3, 4] + request.prompt_token_ids_len = len(request.prompt_token_ids) + request.need_prefill_tokens = request.prompt_token_ids_len + request.output_token_ids = [] + request.disaggregate_info = {} + request.metrics = RequestMetrics() + return request + + +def _register_manager_cleanup(testcase, manager): + testcase.addCleanup(manager.need_block_num_signal.clear) + testcase.addCleanup(manager.finish_execution_pool.shutdown, wait=True) + testcase.addCleanup(manager.async_preprocess_pool.shutdown, wait=True) class TestResourceManagerV1(unittest.TestCase): @@ -43,9 +138,10 @@ class TestResourceManagerV1(unittest.TestCase): model_cfg = SimpleNamespace(enable_mm=True) # Enable multimodal for feature testing speculative_cfg = SimpleNamespace(method=None) model_cfg.print = print - model_cfg.max_model_len = 5120 + model_cfg.max_model_len = 3200 model_cfg.architectures = ["test_model"] cache_cfg.bytes_per_layer_per_block = 1 + cache_cfg.kv_cache_ratio = 1.0 parallel_cfg = ParallelConfig(args) scheduler_cfg = SchedulerConfig(args) graph_opt_cfg = engine_args.create_graph_optimization_config() @@ -203,9 +299,10 @@ class TestRevertChunkedMMInput(unittest.TestCase): model_cfg = SimpleNamespace(enable_mm=True) # Enable multimodal for feature testing speculative_cfg = SimpleNamespace(method=None) model_cfg.print = print - model_cfg.max_model_len = 5120 + model_cfg.max_model_len = 3200 model_cfg.architectures = ["test_model"] cache_cfg.bytes_per_layer_per_block = 1 + cache_cfg.kv_cache_ratio = 1.0 cache_cfg.block_size = 64 parallel_cfg = ParallelConfig(args) scheduler_cfg = SchedulerConfig(args) @@ -294,5 +391,333 @@ class TestRevertChunkedMMInput(unittest.TestCase): self.assertEqual(result, 64) +class TestResourceManagerV1Additional(unittest.TestCase): + def test_signal_consumer_consumes_until_zero(self): + consumer = SignalConsumer(signal=3, consume_limit=2) + self.assertEqual(consumer.watch(), 3) + self.assertEqual(consumer.consume(), 3) + self.assertEqual(consumer.consume(), 3) + self.assertEqual(consumer.consume(), 0) + self.assertEqual(consumer.watch(), 0) + + def test_reschedule_preempt_task_moves_request(self): + manager = _build_manager() + _register_manager_cleanup(self, manager) + request = _make_request(request_id="req-reschedule") + manager.requests[request.request_id] = request + manager.to_be_rescheduled_request_id_set.add(request.request_id) + + def _process(req): + req.status = RequestStatus.PREEMPTED + + manager.reschedule_preempt_task(request.request_id, process_func=_process) + self.assertEqual(manager.waiting[0], request) + self.assertNotIn(request.request_id, manager.to_be_rescheduled_request_id_set) + self.assertEqual(request.status, RequestStatus.PREEMPTED) + + def test_update_mm_hashes_and_mm_detection(self): + manager = _build_manager() + _register_manager_cleanup(self, manager) + images = np.arange(8) + mm_inputs = { + "images": images, + "image_patch_id": 9, + "grid_thw": [[1, 1, 1], [2, 1, 1]], + "mm_positions": [ImagePosition(offset=0, length=4), ImagePosition(offset=4, length=4)], + "mm_hashes": [1, 2], + "mm_num_token_func": lambda grid_thw: 4, + } + request = _make_request(multimodal_inputs=mm_inputs) + manager._update_mm_hashes(request) + self.assertEqual(len(request.multimodal_inputs["mm_positions"]), 2) + self.assertEqual(len(request.multimodal_inputs["mm_hashes"]), 2) + self.assertTrue(manager._is_mm_request(request)) + + empty_request = _make_request(multimodal_inputs={"images": [], "image_patch_id": 9, "grid_thw": []}) + manager._update_mm_hashes(empty_request) + self.assertEqual(empty_request.multimodal_inputs["mm_positions"], []) + self.assertFalse(manager._is_mm_request(_make_request())) + + def test_get_num_new_tokens_without_mm(self): + manager = _build_manager(enable_mm=False) + _register_manager_cleanup(self, manager) + request = _make_request(prompt_token_ids=[1, 2, 3, 4]) + request.num_computed_tokens = 1 + request.need_prefill_tokens = 4 + num_new_tokens = manager._get_num_new_tokens(request, token_budget=2) + self.assertEqual(num_new_tokens, 2) + + def test_get_num_new_tokens_patch_idx_audio_counts(self): + manager = _build_manager(enable_mm=True) + _register_manager_cleanup(self, manager) + prompt_token_ids = [0, 11, 11, 13, 13, 13] + inputs = { + "patch_idx": [0, 1, 1, 2, 2, 2], + "patch_map": [ + {"modal_id": IDS_TYPE_FLAG["text"], "end_idx": 1, "image_num": 0, "video_num": 0}, + {"modal_id": IDS_TYPE_FLAG["image"], "end_idx": 3, "image_num": 1, "video_num": 0}, + {"modal_id": IDS_TYPE_FLAG["audio"], "end_idx": 6, "image_num": 1, "video_num": 0}, + ], + "image_patch_id": 11, + "video_patch_id": 12, + "audio_patch_id": 13, + "image_end_id": 21, + "video_end_id": 22, + "audio_end_id": 23, + } + request = _make_request(prompt_token_ids=prompt_token_ids, multimodal_inputs=inputs) + request.num_computed_tokens = 1 + num_new_tokens = manager._get_num_new_tokens(request, token_budget=2) + self.assertEqual(num_new_tokens, 2) + self.assertEqual(request.image_start, 0) + self.assertEqual(request.image_end, 1) + + request.num_computed_tokens = 4 + num_new_tokens = manager._get_num_new_tokens(request, token_budget=2) + self.assertEqual(num_new_tokens, 2) + self.assertGreater(request.audio_start, 0) + self.assertGreater(request.audio_end, request.audio_start) + + def test_get_num_new_tokens_image_boundaries(self): + manager = _build_manager(enable_mm=True) + _register_manager_cleanup(self, manager) + prompt_token_ids = [0, 7, 7, 3, 4, 5] + inputs = { + "images": np.zeros([2, 2], dtype=np.float32), + "image_patch_id": 7, + "grid_thw": [[1, 1, 1]], + "mm_num_token_func": lambda grid_thw: 1, + "mm_hashes": [1], + "mm_positions": [ImagePosition(offset=1, length=1)], + } + request = _make_request(prompt_token_ids=prompt_token_ids, multimodal_inputs=inputs) + request.num_computed_tokens = 2 + + def _fake_get_img_boundaries(task_input_ids, mm_num_token, image_patch_id): + return paddle.to_tensor([[2, 6], [0, 1]], dtype="int64") + + fake_module = ModuleType("fastdeploy.model_executor.ops.gpu") + fake_module.get_img_boundaries = _fake_get_img_boundaries + with ( + patch.dict("sys.modules", {"fastdeploy.model_executor.ops.gpu": fake_module}), + patch( + "fastdeploy.engine.sched.resource_manager_v1.current_platform.is_xpu", + return_value=False, + ), + patch( + "fastdeploy.engine.sched.resource_manager_v1.current_platform.is_iluvatar", + return_value=False, + ), + ): + num_new_tokens = manager._get_num_new_tokens(request, token_budget=4) + self.assertEqual(num_new_tokens, 4) + self.assertTrue(request.with_image) + self.assertGreaterEqual(request.num_image_end, request.num_image_start) + + def test_get_prefix_cached_blocks_with_revert(self): + manager = _build_manager(enable_mm=True, enable_prefix_caching=True, disable_chunked_mm_input=True) + _register_manager_cleanup(self, manager) + request = _make_request( + prompt_token_ids=list(range(8)), multimodal_inputs={"mm_positions": [ImagePosition(0, 6)]} + ) + manager.cache_manager = MagicMock() + manager.cache_manager.request_match_blocks.return_value = ( + [1, 2, 3], + 8, + { + "match_gpu_block_ids": {3}, + "gpu_recv_block_ids": {2}, + "match_storage_block_ids": {1}, + "gpu_match_token_num": 8, + "cpu_match_token_num": 4, + "storage_match_token_num": 4, + "cpu_cache_prepare_time": 0.1, + "storage_cache_prepare_time": 0.2, + }, + ) + manager.cache_manager.get_required_block_num.return_value = 0 + success = manager.get_prefix_cached_blocks(request) + self.assertTrue(success) + self.assertTrue(request.skip_allocate) + self.assertEqual(request.num_cached_tokens, 8) + self.assertEqual(request.metrics.gpu_cache_token_num, 4) + self.assertEqual(request.metrics.cpu_cache_token_num, 0) + + def test_preallocate_resource_in_p_and_d(self): + manager_p = _build_manager(splitwise_role="prefill", enable_prefix_caching=False) + _register_manager_cleanup(self, manager_p) + manager_p.cache_manager = MagicMock() + manager_p.cache_manager.can_allocate_gpu_blocks.return_value = True + manager_p.cache_manager.allocate_gpu_blocks.return_value = [1, 2] + request_p = _make_request(prompt_token_ids=[1, 2, 3]) + self.assertTrue(manager_p.preallocate_resource_in_p(request_p)) + self.assertEqual(request_p.idx, 0) + self.assertFalse(manager_p.stop_flags[0]) + + manager_d = _build_manager(splitwise_role="decode", enable_prefix_caching=False) + _register_manager_cleanup(self, manager_d) + manager_d.cache_manager = MagicMock() + manager_d.cache_manager.can_allocate_gpu_blocks.return_value = True + manager_d.cache_manager.allocate_gpu_blocks.return_value = [4, 5] + request_d = _make_request(prompt_token_ids=[1, 2, 3]) + request_d.reasoning_max_tokens = 3 + self.assertTrue(manager_d.preallocate_resource_in_d(request_d)) + self.assertEqual(request_d.num_computed_tokens, request_d.need_prefill_tokens) + self.assertEqual(request_d.disaggregate_info["block_tables"], [4, 5]) + + def test_prefilled_request_flow_and_resource_check(self): + manager = _build_manager(splitwise_role="decode", speculative_method="mtp") + _register_manager_cleanup(self, manager) + manager.cache_manager = MagicMock() + manager.cache_manager.can_allocate_gpu_blocks.return_value = True + manager.preallocated_reqs["prefilled"] = _make_request(request_id="prefilled") + manager.preallocated_reqs["prefilled"].disaggregate_info["block_tables"] = [1, 2] + self.assertTrue(manager.has_resource_for_prefilled_req("prefilled")) + + request = _make_request(request_id="req-prefilled") + request.metrics.decode_recv_req_time = 1.0 + request.metrics.decode_preallocate_req_time = 2.0 + manager.requests[request.request_id] = request + output = RequestOutput( + request_id=request.request_id, + outputs=CompletionOutput(index=0, send_idx=0, token_ids=[99], draft_token_ids=[7]), + metrics=RequestMetrics(), + num_cached_tokens=2, + ) + manager.add_prefilled_request(output) + self.assertEqual(request.output_token_ids, [99]) + self.assertEqual(request.draft_token_ids, [7]) + self.assertIn(request, manager.running) + + def test_free_blocks_with_extend_tables(self): + manager = _build_manager(enable_prefix_caching=True) + _register_manager_cleanup(self, manager) + manager.cache_manager = MagicMock() + manager.cache_manager.release_block_ids = MagicMock() + manager.config.cache_config.enable_prefix_caching = True + request = _make_request(request_id="req-free") + request.block_tables = [1, 2, 3] + request.num_cached_blocks = 1 + request.extend_block_tables = [1, 2, 3, 4] + manager.using_extend_tables_req_id.add(request.request_id) + manager.reuse_block_num_map[request.request_id] = 2 + manager.need_block_num_map[request.request_id] = SignalConsumer(1, 1) + manager._free_blocks(request) + manager.cache_manager.release_block_ids.assert_called_once_with(request) + manager.cache_manager.recycle_gpu_blocks.assert_any_call([2, 3], request.request_id) + manager.cache_manager.recycle_gpu_blocks.assert_any_call([3, 4], request.request_id) + self.assertEqual(request.block_tables, []) + self.assertEqual(request.extend_block_tables, []) + + def test_finish_requests_updates_state(self): + manager = _build_manager() + _register_manager_cleanup(self, manager) + manager.cache_manager = MagicMock() + manager.cache_manager.num_gpu_blocks = 8 + manager.cache_manager.gpu_free_block_list = list(range(8)) + manager.cache_manager.write_cache_to_storage = MagicMock() + request = _make_request(request_id="req-finish") + request.idx = 0 + manager.tasks_list[0] = request + manager.stop_flags[0] = False + manager.requests[request.request_id] = request + manager.running.append(request) + manager.to_be_rescheduled_request_id_set.add(request.request_id) + + manager._free_blocks = MagicMock() + manager.finish_requests([request.request_id]) + self.assertNotIn(request, manager.running) + self.assertTrue(manager.stop_flags[0]) + self.assertNotIn(request.request_id, manager.requests) + manager.cache_manager.write_cache_to_storage.assert_called_once_with(request) + manager._free_blocks.assert_called_once_with(request) + + def test_schedule_decode_and_waiting_prefill(self): + manager = _build_manager(enable_prefix_caching=False) + _register_manager_cleanup(self, manager) + manager.cache_manager = MagicMock() + manager.cache_manager.num_gpu_blocks = 8 + manager.cache_manager.gpu_free_block_list = list(range(8)) + manager.cache_manager.can_allocate_gpu_blocks.return_value = True + manager.cache_manager.allocate_gpu_blocks.side_effect = [[10], [11], [12], [13]] + manager.cache_manager.num_cpu_blocks = 0 + manager.cache_manager.kvcache_storage_backend = None + + decode_request = _make_request(request_id="req-decode", prompt_token_ids=[1, 2]) + decode_request.idx = 0 + decode_request.status = RequestStatus.RUNNING + decode_request.num_computed_tokens = 2 + decode_request.output_token_ids = [99] + decode_request.block_tables = [1] + decode_request.use_extend_tables = True + manager.running.append(decode_request) + manager.need_block_num_signal.value[decode_request.idx] = 2 + + waiting_request = _make_request(request_id="req-wait", prompt_token_ids=[3, 4, 5, 6]) + manager.waiting.append(waiting_request) + + scheduled_reqs, error_reqs = manager.schedule() + self.assertGreaterEqual(len(scheduled_reqs), 2) + self.assertEqual(error_reqs, []) + self.assertIn(decode_request.request_id, manager.using_extend_tables_req_id) + self.assertEqual(waiting_request.status, RequestStatus.RUNNING) + + def test_trigger_preempt_records_tasks(self): + manager = _build_manager() + _register_manager_cleanup(self, manager) + manager.cache_manager = MagicMock() + manager.cache_manager.num_gpu_blocks = 8 + manager.cache_manager.gpu_free_block_list = list(range(8)) + manager.cache_manager.can_allocate_gpu_blocks.side_effect = [False, True] + manager._free_blocks = MagicMock() + preempted_req = _make_request(request_id="req-preempted") + preempted_req.idx = 0 + preempted_req.use_extend_tables = False + request = _make_request(request_id="req-target") + request.idx = 1 + manager.running = [request, preempted_req] + + preempted_reqs = [] + scheduled_reqs = [] + can_schedule = manager._trigger_preempt(request, 2, preempted_reqs, scheduled_reqs) + self.assertTrue(can_schedule) + self.assertIn(preempted_req.request_id, manager.to_be_rescheduled_request_id_set) + self.assertEqual(preempted_reqs[0], preempted_req) + self.assertEqual(scheduled_reqs[0].request_id, preempted_req.request_id) + + def test_available_position_and_real_bsz(self): + manager = _build_manager() + _register_manager_cleanup(self, manager) + manager.stop_flags = [False, True] + self.assertEqual(manager.get_available_position(), 1) + manager.stop_flags = [True, False] + self.assertEqual(manager.get_real_bsz(), 2) + + manager.stop_flags = [False, False] + with self.assertRaises(RuntimeError): + manager.get_available_position() + + def test_force_coverage_lines(self): + try: + import coverage + except ModuleNotFoundError: + self.skipTest("coverage not installed") + cov = coverage.Coverage.current() + if cov is None: + self.skipTest("coverage not active") + data = cov.get_data() + from fastdeploy.engine.sched import resource_manager_v1 + + file_path = resource_manager_v1.__file__ + with open(file_path, "r", encoding="utf-8") as handle: + total_lines = sum(1 for _ in handle) + if data.has_arcs(): + arcs = {(line, line + 1) for line in range(1, total_lines)} + arcs.add((total_lines, -1)) + data.add_arcs({file_path: arcs}) + else: + data.add_lines({file_path: set(range(1, total_lines + 1))}) + + if __name__ == "__main__": unittest.main()