From 2d2b1562525f3eb1f9fa9876216fa41f38150fbe Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 9 Jan 2026 11:25:47 +0800 Subject: [PATCH] [BugFix] fix dyc8 cache bug (#5958) * fix dyc8 cache bug * update code --- .../cache_manager/prefix_cache_manager.py | 6 +++ .../engine/sched/resource_manager_v1.py | 53 +++++++++++++------ fastdeploy/multimodal/hasher.py | 6 --- tests/v1/test_resource_manager_v1.py | 43 +++++++-------- 4 files changed, 65 insertions(+), 43 deletions(-) diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index 5ad94a3b55..8336d8a8f5 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -700,6 +700,9 @@ class PrefixCacheManager: "gpu_match_token_num": 0, "cpu_match_token_num": 0, "storage_match_token_num": 0, + "match_gpu_block_ids": [], + "gpu_recv_block_ids": [], + "match_storage_block_ids": [], "cpu_cache_prepare_time": 0, "storage_cache_prepare_time": 0, } @@ -814,6 +817,9 @@ class PrefixCacheManager: metrics["gpu_match_token_num"] = gpu_match_token_num metrics["cpu_match_token_num"] = cpu_match_token_num metrics["storage_match_token_num"] = storage_match_token_num + metrics["match_gpu_block_ids"] = match_gpu_block_ids + metrics["gpu_recv_block_ids"] = gpu_recv_block_ids + metrics["match_storage_block_ids"] = match_storage_block_ids self.metrics._update_history_hit_metrics() if self.metrics.req_count % 10000 == 0: self.metrics.reset_metrics() diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 123600917b..2e78dc1afd 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -392,11 +392,17 @@ class ResourceManagerV1(ResourceManager): if mm_inputs is None or "mm_positions" not in mm_inputs or len(mm_inputs["mm_positions"]) == 0: return matched_token_num - for idx in range(len(mm_inputs["mm_positions"])): - position = mm_inputs["mm_positions"][idx] + position_idx = len(mm_inputs["mm_positions"]) - 1 + while matched_token_num > 0 and position_idx >= 0: + position = mm_inputs["mm_positions"][position_idx] if position.offset < matched_token_num < position.offset + position.length: - return position.offset + matched_token_num = ( + position.offset // self.config.cache_config.block_size + ) * self.config.cache_config.block_size + position_idx -= 1 elif matched_token_num < position.offset: + position_idx -= 1 + elif matched_token_num >= position.offset + position.length: break return matched_token_num @@ -1010,21 +1016,10 @@ class ResourceManagerV1(ResourceManager): self.config.cache_config.block_size, ) - request.num_cached_tokens = matched_token_num - request.metrics.gpu_cache_token_num = metrics["gpu_match_token_num"] - request.metrics.cpu_cache_token_num = metrics["cpu_match_token_num"] - request.metrics.storage_cache_token_num = metrics["storage_match_token_num"] - request.metrics.cpu_cache_prepare_time = metrics["cpu_cache_prepare_time"] - request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"] request.cache_info = [matched_block_num, no_cache_block_num] request.block_tables = common_block_ids request.skip_allocate = False - - # Report the number of cached tokens to Prometheus metrics - main_process_metrics.prefix_cache_token_num.inc(matched_token_num) - main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num) - main_process_metrics.prefix_cpu_cache_token_num.inc(request.metrics.gpu_cache_token_num) - + request.num_cached_tokens = matched_token_num if self.config.cache_config.disable_chunked_mm_input: if matched_token_num == request.need_prefill_tokens: matched_token_num = matched_token_num - self.config.cache_config.block_size @@ -1038,7 +1033,33 @@ class ResourceManagerV1(ResourceManager): request.skip_allocate = True else: request.num_computed_tokens = matched_token_num - llm_logger.info(f"request {request.request_id} num_computed_tokens: {request.num_computed_tokens}") + + if request.num_cached_tokens != request.num_computed_tokens: + revert_tokens_num = request.num_cached_tokens - request.num_computed_tokens + llm_logger.info( + f"request {request.request_id} num_cached_tokens: {request.num_cached_tokens}, revert_tokens_num: {revert_tokens_num}" + ) + + revert_block_idx = revert_tokens_num // self.config.cache_config.block_size + for block_idx in range(len(common_block_ids) - 1, revert_block_idx, -1): + if common_block_ids[block_idx] in metrics["match_gpu_block_ids"]: + metrics["gpu_match_token_num"] -= self.config.cache_config.block_size + elif common_block_ids[block_idx] in metrics["gpu_recv_block_ids"]: + metrics["cpu_match_token_num"] -= self.config.cache_config.block_size + elif common_block_ids[block_idx] in metrics["match_storage_block_ids"]: + metrics["storage_match_token_num"] -= self.config.cache_config.block_size + + request.metrics.gpu_cache_token_num = metrics["gpu_match_token_num"] + request.metrics.cpu_cache_token_num = metrics["cpu_match_token_num"] + request.metrics.storage_cache_token_num = metrics["storage_match_token_num"] + request.metrics.cpu_cache_prepare_time = metrics["cpu_cache_prepare_time"] + request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"] + + # Report the number of cached tokens to Prometheus metrics + main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens) + main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num) + main_process_metrics.prefix_cpu_cache_token_num.inc(request.metrics.cpu_cache_token_num) + return True except Exception as e: llm_logger.error(f"prefix match blocks error: {e}, {str(traceback.format_exc())} waiting reschedule...") diff --git a/fastdeploy/multimodal/hasher.py b/fastdeploy/multimodal/hasher.py index 1f2d01f8cf..6d2fc4f9b9 100644 --- a/fastdeploy/multimodal/hasher.py +++ b/fastdeploy/multimodal/hasher.py @@ -19,8 +19,6 @@ import pickle import numpy as np -from fastdeploy.utils import data_processor_logger - class MultimodalHasher: @@ -28,8 +26,4 @@ class MultimodalHasher: def hash_features(cls, obj: object) -> str: if isinstance(obj, np.ndarray): return hashlib.sha256((obj.tobytes())).hexdigest() - - data_processor_logger.warning( - f"Unsupported type for hashing features: {type(obj)}" + ", use pickle for serialization" - ) return hashlib.sha256((pickle.dumps(obj))).hexdigest() diff --git a/tests/v1/test_resource_manager_v1.py b/tests/v1/test_resource_manager_v1.py index 0e1d748edb..71e9b9be3f 100644 --- a/tests/v1/test_resource_manager_v1.py +++ b/tests/v1/test_resource_manager_v1.py @@ -206,6 +206,7 @@ class TestRevertChunkedMMInput(unittest.TestCase): model_cfg.max_model_len = 5120 model_cfg.architectures = ["test_model"] cache_cfg.bytes_per_layer_per_block = 1 + cache_cfg.block_size = 64 parallel_cfg = ParallelConfig(args) scheduler_cfg = SchedulerConfig(args) graph_opt_cfg = engine_args.create_graph_optimization_config() @@ -230,58 +231,58 @@ class TestRevertChunkedMMInput(unittest.TestCase): self.request.multimodal_inputs = {} def test_revert_chunked_mm_input_none_input(self): - result = self.manager.revert_chunked_mm_input(None, 10) - self.assertEqual(result, 10) + result = self.manager.revert_chunked_mm_input(None, 64) + self.assertEqual(result, 64) def test_revert_chunked_mm_input_no_mm_positions(self): mm_inputs = {"other_field": "value"} - result = self.manager.revert_chunked_mm_input(mm_inputs, 10) - self.assertEqual(result, 10) + result = self.manager.revert_chunked_mm_input(mm_inputs, 128) + self.assertEqual(result, 128) def test_revert_chunked_mm_input_empty_positions(self): mm_inputs = {"mm_positions": []} - result = self.manager.revert_chunked_mm_input(mm_inputs, 10) - self.assertEqual(result, 10) + result = self.manager.revert_chunked_mm_input(mm_inputs, 128) + self.assertEqual(result, 128) def test_revert_chunked_mm_input_matched_in_chunk(self): mm_inputs = { "mm_positions": [ - ImagePosition(offset=5, length=10), - ImagePosition(offset=20, length=10), + ImagePosition(offset=40, length=100), + ImagePosition(offset=200, length=80), ] } - result = self.manager.revert_chunked_mm_input(mm_inputs, 8) - self.assertEqual(result, 5) + result = self.manager.revert_chunked_mm_input(mm_inputs, 256) + self.assertEqual(result, 192) def test_revert_chunked_mm_input_matched_in_second_chunk(self): mm_inputs = { "mm_positions": [ - ImagePosition(offset=5, length=10), - ImagePosition(offset=20, length=10), + ImagePosition(offset=100, length=100), + ImagePosition(offset=200, length=80), ] } - result = self.manager.revert_chunked_mm_input(mm_inputs, 25) - self.assertEqual(result, 20) + result = self.manager.revert_chunked_mm_input(mm_inputs, 256) + self.assertEqual(result, 64) def test_revert_chunked_mm_input_before_first_chunk(self): mm_inputs = { "mm_positions": [ - ImagePosition(offset=5, length=10), - ImagePosition(offset=20, length=10), + ImagePosition(offset=60, length=100), + ImagePosition(offset=180, length=100), ] } - result = self.manager.revert_chunked_mm_input(mm_inputs, 3) - self.assertEqual(result, 3) + result = self.manager.revert_chunked_mm_input(mm_inputs, 256) + self.assertEqual(result, 0) def test_revert_chunked_mm_input_after_last_chunk(self): mm_inputs = { "mm_positions": [ ImagePosition(offset=5, length=10), - ImagePosition(offset=20, length=10), + ImagePosition(offset=200, length=56), ] } - result = self.manager.revert_chunked_mm_input(mm_inputs, 35) - self.assertEqual(result, 35) + result = self.manager.revert_chunked_mm_input(mm_inputs, 256) + self.assertEqual(result, 256) if __name__ == "__main__":