diff --git a/benchmarks/backend_request_func.py b/benchmarks/backend_request_func.py index 6785b4cf87..e52f9a58be 100644 --- a/benchmarks/backend_request_func.py +++ b/benchmarks/backend_request_func.py @@ -157,7 +157,7 @@ def metrics_summary(metrics, token_timestamps): summary["gpu_cache_token_num"] = m0.get("gpu_cache_token_num") summary["cpu_cache_token_num"] = m0.get("cpu_cache_token_num") summary["storage_cache_token_num"] = m0.get("storage_cache_token_num") - summary["gpu_cpu_cache_prepare_time"] = m0.get("gpu_cpu_cache_prepare_time") + summary["cpu_cache_prepare_time"] = m0.get("cpu_cache_prepare_time") summary["storage_cache_prepare_time"] = m0.get("storage_cache_prepare_time") return summary diff --git a/benchmarks/benchmark_serving.py b/benchmarks/benchmark_serving.py index 247b57180f..ff47ff6b75 100644 --- a/benchmarks/benchmark_serving.py +++ b/benchmarks/benchmark_serving.py @@ -809,7 +809,7 @@ async def benchmark( process_pd_metrics(outputs, "gpu_cache_token_num", is_time=False) process_pd_metrics(outputs, "cpu_cache_token_num", is_time=False) process_pd_metrics(outputs, "storage_cache_token_num", is_time=False) - process_pd_metrics(outputs, "gpu_cpu_cache_prepare_time") + process_pd_metrics(outputs, "cpu_cache_prepare_time") process_pd_metrics(outputs, "storage_cache_prepare_time") process_one_length("input_len", "Cached Tokens", "Cached Tokens") process_one_length("s_input_len", "Input Length", "Infer Input Length") diff --git a/examples/mooncake_store/README.md b/examples/mooncake_store/README.md index b3ac816adb..5b3f7f9c99 100644 --- a/examples/mooncake_store/README.md +++ b/examples/mooncake_store/README.md @@ -54,4 +54,4 @@ python -m fastdeploy.entrypoints.openai.api_server \ ## Troubleshooting For more details, please refer to: -https://github.com/kvcache-ai/Mooncake/blob/main/doc/en/troubleshooting.md +https://github.com/kvcache-ai/Mooncake/blob/main/docs/source/troubleshooting/troubleshooting.md diff --git a/fastdeploy/cache_manager/cache_metrics.py b/fastdeploy/cache_manager/cache_metrics.py index a7dec1bd68..2dd3137d32 100644 --- a/fastdeploy/cache_manager/cache_metrics.py +++ b/fastdeploy/cache_manager/cache_metrics.py @@ -39,6 +39,7 @@ class CacheMetrics: # token level self.total_gpu_matched_token_num = 0 self.total_cpu_matched_token_num = 0 + self.total_storage_matched_token_num = 0 self.matched_token_num = 0 self.total_token_num = 0 @@ -54,6 +55,7 @@ class CacheMetrics: self.hit_token_ratio = self.matched_token_num / self.total_token_num self.cpu_hit_token_ratio = self.total_cpu_matched_token_num / self.total_token_num self.gpu_hit_token_ratio = self.total_gpu_matched_token_num / self.total_token_num + self.storage_hit_token_ratio = self.total_storage_matched_token_num / self.total_token_num main_process_metrics.hit_req_rate.set(self.hit_req_ratio) main_process_metrics.hit_token_rate.set(self.hit_token_ratio) @@ -67,6 +69,7 @@ class CacheMetrics: + f" cpu_hit_token_ratio {self.cpu_hit_token_ratio:.2f}" + f" total_gpu_matched_token_num {self.total_gpu_matched_token_num}" + f" total_cpu_matched_token_num {self.total_cpu_matched_token_num}" + + f" total_storage_matched_token_num {self.total_storage_matched_token_num}" + f" total_matched_token_num {self.matched_token_num}" + f" total_token_num {self.total_token_num}" ) @@ -76,6 +79,7 @@ class CacheMetrics: req_id, current_query_cpu_match_token_num, current_query_gpu_match_token_num, + current_storage_match_token_num, current_query_token_num, ): """ @@ -84,18 +88,23 @@ class CacheMetrics: cpu_cache_match_ratio = current_query_cpu_match_token_num / current_query_token_num gpu_cache_match_ratio = current_query_gpu_match_token_num / current_query_token_num + storage_cache_match_ratio = current_storage_match_token_num / current_query_token_num - total_match_ratio = cpu_cache_match_ratio + gpu_cache_match_ratio + total_match_ratio = cpu_cache_match_ratio + gpu_cache_match_ratio + storage_cache_match_ratio self.total_cpu_matched_token_num += current_query_cpu_match_token_num self.total_gpu_matched_token_num += current_query_gpu_match_token_num + self.total_storage_matched_token_num += current_storage_match_token_num - self.matched_token_num += current_query_cpu_match_token_num + current_query_gpu_match_token_num + self.matched_token_num += ( + current_query_cpu_match_token_num + current_query_gpu_match_token_num + current_storage_match_token_num + ) self.total_token_num += current_query_token_num logger.info( f"Metrics for req_id {req_id}: token_num {current_query_token_num}" + f" cpu_cache_match_ratio {cpu_cache_match_ratio}" + f" gpu_cache_match_ratio {gpu_cache_match_ratio}" + + f" storage_cache_match_ratio {storage_cache_match_ratio}" + f" total_match_ratio {total_match_ratio}" ) @@ -114,6 +123,7 @@ class CacheMetrics: self.total_gpu_matched_token_num = 0 self.total_cpu_matched_token_num = 0 + self.total_storage_matched_token_num = 0 self.matched_token_num = 0 self.total_token_num = 0 diff --git a/fastdeploy/cache_manager/cache_transfer_manager.py b/fastdeploy/cache_manager/cache_transfer_manager.py index faf9255441..126b4cef3a 100644 --- a/fastdeploy/cache_manager/cache_transfer_manager.py +++ b/fastdeploy/cache_manager/cache_transfer_manager.py @@ -456,8 +456,9 @@ class CacheTransferManager: def _run_read_storage(self, k_cache_keys, v_cache_keys, gpu_block_ids, cpu_block_ids): try: logger.debug( - f"_run_read_storage, key_hash_keys: {k_cache_keys}, " - f"value_hash_keys: {v_cache_keys}, gpu_block_ids: {gpu_block_ids}" + f"_run_read_storage, key_hash_keys_num: {len(k_cache_keys)}, " + f"value_hash_keys_num: {len(v_cache_keys)}, gpu_block_ids_num: {len(gpu_block_ids)}, " + f"cpu_block_ids_num: {len(cpu_block_ids)}" ) block_num = len(gpu_block_ids) @@ -468,7 +469,9 @@ class CacheTransferManager: ] kv_cache_ptrs = k_cache_ptrs + v_cache_ptrs kv_block_sizes = [self.storage_buffer_stride_bytes] * block_num * 2 # key and value + start_time = time.time() result = self.storage_backend.batch_get(keys, target_locations=kv_cache_ptrs, target_sizes=kv_block_sizes) + read_cost_time = time.time() - start_time k_result, v_result = result[:block_num], result[block_num:] success_block_num = 0 @@ -480,6 +483,7 @@ class CacheTransferManager: valid_cpu_block_ids = cpu_block_ids[:success_block_num] mode = 1 # cpu ==> gpu + start_time = time.time() swap_cache_layout( self.gpu_cache_k_tensors, self.storage_key_read_buffer, @@ -498,6 +502,10 @@ class CacheTransferManager: self.device, mode, ) + swap_cost_time = time.time() - start_time + logger.debug( + f"_run_read_storage, swap_cost_time: {swap_cost_time:.6f}s, read_cost_time: {read_cost_time:.6f}s" + ) return valid_gpu_block_ids except Exception as e: @@ -511,8 +519,8 @@ class CacheTransferManager: """Read cache from the storage backend to the GPU memory.""" try: logger.debug( - f"read_storage_task, task id: {task_id}, hash_keys: {keys}, " - f"gpu_block_ids: {gpu_block_ids}, timeout: {timeout}" + f"read_storage_task, task id: {task_id}, hash_keys_num: {len(keys)}, " + f"gpu_block_ids_num: {len(gpu_block_ids)}, timeout: {timeout}" ) k_cache_keys = [f"{key}_key_{self.rank}" for key in keys] v_cache_keys = [f"{key}_value_{self.rank}" for key in keys] @@ -565,7 +573,9 @@ class CacheTransferManager: self.key_cache_shape[2], self.key_cache_shape[3], ] + mode = 0 # gpu ==> cpu + start_time = time.time() swap_cache_layout( self.gpu_cache_k_tensors, self.storage_key_write_buffer, @@ -584,6 +594,7 @@ class CacheTransferManager: self.device, mode, ) + swap_cost_time = time.time() - start_time block_num = len(gpu_block_ids) keys = k_cache_keys + v_cache_keys @@ -595,7 +606,13 @@ class CacheTransferManager: ] kv_cache_ptrs = k_cache_ptrs + v_cache_ptrs kv_block_sizes = [self.storage_buffer_stride_bytes] * block_num * 2 # key and value + start_time = time.time() self.storage_backend.batch_set(keys, target_locations=kv_cache_ptrs, target_sizes=kv_block_sizes) + write_cost_time = time.time() - start_time + + logger.debug( + f"_run_write_back_storage, swap_cost_time: {swap_cost_time:.6f}s, write_cost_time: {write_cost_time:.6f}s" + ) except Exception as e: logger.error( f"[rank {self.rank}/{self.n_ranks}] An error occurred in _run_write_back_storage: " diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index 60dd088942..91b23a2971 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -112,7 +112,7 @@ class PrefixCacheManager: self.req_leaf_map = {} # {request_id: leaf node} self.leaf_req_map = defaultdict(set) self.unfilled_req_block_map = defaultdict(list) - self.cache_info = {} # {request_id: (last_match_node, num_cached_tokens)} + self.req_to_radix_tree_info = {} # {request_id: (last_match_node, num_cached_tokens_in_raidx_tree)} self.executor_pool = ThreadPoolExecutor(max_workers=1) self.free_gpu_executor_pool = ThreadPoolExecutor(max_workers=1) @@ -634,7 +634,7 @@ class PrefixCacheManager: """ try: req_id = task.request_id - last_node, num_cached_tokens = self.cache_info[req_id] + last_node, num_cached_tokens = self.req_to_radix_tree_info[req_id] can_cache_computed_tokens = num_computed_tokens - num_computed_tokens % block_size if req_id in self.leaf_req_map[last_node]: # delete old leaf record, update later self.leaf_req_map[last_node].remove(req_id) @@ -653,8 +653,8 @@ class PrefixCacheManager: ) self.req_leaf_map[req_id] = leaf_node self.leaf_req_map[leaf_node].add(req_id) - self.cache_info[req_id] = [leaf_node, can_cache_computed_tokens] - task.cached_block_num = can_cache_computed_tokens // block_size + self.req_to_radix_tree_info[req_id] = [leaf_node, can_cache_computed_tokens] + task.num_cached_blocks = can_cache_computed_tokens // block_size except Exception as e: logger.error(f"update_cache_blocks, error: {type(e)} {e}, {str(traceback.format_exc())}") raise e @@ -674,14 +674,14 @@ class PrefixCacheManager: break return False, 0 - def request_match_blocks(self, task, block_size, *args): + def request_match_blocks(self, task: Request, block_size, *args): """ - get match blocks info for a task. + Match and fetch cache for a task. This is a synchronous interface. If CPU-to-GPU data transfer occurs, it will block until synchronization completes. Callers requiring asynchronous behavior should invoke this via a thread pool. - Note: This function may allocate GPU blocks for matched CPU Cache + Note: This function may allocate GPU blocks for matched CPU Cache and Storage Cache Parameters: - task: Task dictionary @@ -689,15 +689,17 @@ class PrefixCacheManager: Returns: - common_block_ids: List of matched shared blocks - - unique_block_ids: List of exclusively allocated blocks + - match_token_num: Number of matched tokens + - metrics: Dictionary of metrics """ with self.request_release_lock: try: - hit_info = { - "gpu_cache_blocks": 0, - "cpu_cache_blocks": 0, + metrics = { "gpu_match_token_num": 0, "cpu_match_token_num": 0, + "storage_match_token_num": 0, + "cpu_cache_prepare_time": 0, + "storage_cache_prepare_time": 0, } self.metrics.req_count += 1 if isinstance(task.prompt_token_ids, np.ndarray): @@ -706,7 +708,8 @@ class PrefixCacheManager: prompt_token_ids = task.prompt_token_ids req_id = task.request_id logger.info(f"request_match_blocks: start to process req {req_id}") - input_token_num = len(prompt_token_ids + task.output_token_ids) + input_token_ids = prompt_token_ids + task.output_token_ids + input_token_num = len(input_token_ids) common_block_ids = [] # 1. match block ( @@ -721,7 +724,7 @@ class PrefixCacheManager: # update matched node info self._update_matched_node_info(req_id, match_block_node, current_time=time.time()) - # 2. prepare cache: allocate gpu cache for matched cpu blocks, wait for data transfer to complete + # 2. prepare cpu cache: allocate gpu cache for matched cpu blocks, wait for data transfer to complete gpu_recv_block_ids = [] match_cpu_blocks_num = len(match_cpu_block_ids) if self.can_allocate_gpu_blocks(num_blocks=match_cpu_blocks_num): @@ -731,6 +734,7 @@ class PrefixCacheManager: ) gpu_recv_block_ids = self.allocate_gpu_blocks(match_cpu_blocks_num) if len(gpu_recv_block_ids) > 0: + start_time = time.time() self._prepare_cpu_cache( req_id=req_id, swap_node_ids=swap_node_ids, @@ -738,81 +742,94 @@ class PrefixCacheManager: match_cpu_block_ids=match_cpu_block_ids, cpu_recv_block_ids=[], ) + cost_time = time.time() - start_time + metrics["cpu_cache_prepare_time"] = cost_time else: raise Exception( "request_match_blocks: Not enough GPU memory to allocate cache for matched CPU Cache" ) - # 3. update metrics - matched_token_num = gpu_match_token_num + cpu_match_token_num - common_block_ids = match_gpu_block_ids + gpu_recv_block_ids - if matched_token_num > 0: + # 3. match and prefetch cache from storage + match_token_num = gpu_match_token_num + cpu_match_token_num + no_match_token_num = input_token_num - match_token_num + no_match_block_num = (no_match_token_num + block_size - 1) // block_size + gpu_recv_storage_block_ids = [] + storage_match_token_num = 0 + match_storage_block_ids = [] + + if self.kvcache_storage_backend and no_match_token_num >= block_size: + if not self.can_allocate_gpu_blocks(num_blocks=no_match_block_num): + raise Exception( + "request_match_blocks: Not enough GPU memory to allocate cache for matched Storage Cache" + ) + + logger.debug( + f"request_match_blocks: req_id {req_id}, allocate {no_match_block_num} block to receive storage cache" + ) + gpu_recv_storage_block_ids = self.allocate_gpu_blocks(no_match_block_num) + + prefix_block_key = [] if match_block_node.hash_value is None else [match_block_node.hash_value] + cur_token_idx = match_token_num + no_match_block_keys = [] + while cur_token_idx <= input_token_num - block_size: + cur_block_token_ids = input_token_ids[cur_token_idx : cur_token_idx + block_size] + cur_block_key = get_hash_str(cur_block_token_ids, prefix_block_key) + no_match_block_keys.append(cur_block_key) + cur_token_idx += block_size + prefix_block_key = [cur_block_key] + + logger.info( + f"start prefetch cache from storage, req_id: {req_id}, block num: {len(no_match_block_keys)}" + ) + start_time = time.time() + storage_matched_block_ids = self.issue_prefetch_storage_task( + req_id, no_match_block_keys, gpu_recv_storage_block_ids + ) + storage_matched_block_num = len(storage_matched_block_ids) + storage_match_token_num = storage_matched_block_num * block_size + cost_time = time.time() - start_time + metrics["storage_cache_prepare_time"] = cost_time + logger.info( + f"finish prefetch cache from storage, req_id: {req_id}, " + f"matched block num: {storage_matched_block_num}, cost_time:{cost_time:.6f}s" + ) + + match_storage_block_ids = gpu_recv_storage_block_ids[:storage_matched_block_num] + self.recycle_gpu_blocks(gpu_recv_storage_block_ids[storage_matched_block_num:]) + + # 4. update metrics + match_token_num = gpu_match_token_num + cpu_match_token_num + storage_match_token_num + common_block_ids = match_gpu_block_ids + gpu_recv_block_ids + match_storage_block_ids + if match_token_num > 0: self.metrics.hit_req_count += 1 self.metrics.calculate_hit_metrics( req_id, cpu_match_token_num, gpu_match_token_num, + storage_match_token_num, input_token_num, ) - hit_info["gpu_cache_blocks"] = len(match_gpu_block_ids) - hit_info["cpu_cache_blocks"] = len(match_cpu_block_ids) - hit_info["gpu_match_token_num"] = gpu_match_token_num - hit_info["cpu_match_token_num"] = cpu_match_token_num + metrics["gpu_match_token_num"] = gpu_match_token_num + metrics["cpu_match_token_num"] = cpu_match_token_num + metrics["storage_match_token_num"] = storage_match_token_num self.metrics._update_history_hit_metrics() if self.metrics.req_count % 10000 == 0: self.metrics.reset_metrics() - logger.info(f"request_match_blocks: req_id {req_id}, matched_block_ids {common_block_ids}") + logger.debug(f"request_match_blocks: req_id {req_id}, matched_block_ids_num {len(common_block_ids)}") + logger.debug(f"request_match_blocks: req_id {req_id}, matched_block_ids {common_block_ids}") + # set leaf node temporarily, then update it in update_cache_blocks self.req_leaf_map[req_id] = match_block_node self.leaf_req_map[match_block_node].add(req_id) - # record request cache info - self.cache_info[req_id] = [match_block_node, len(common_block_ids) * block_size] - task.cached_block_num = len(common_block_ids) - return common_block_ids, matched_token_num, hit_info + # record request cache info in radix tree, note that the block ids for receiving storage cache + # are recorded into radix tree in update_cache_blocks + self.req_to_radix_tree_info[req_id] = [match_block_node, gpu_match_token_num + cpu_match_token_num] + task.num_cached_blocks = len(common_block_ids) + return common_block_ids, match_token_num, metrics except Exception as e: logger.error(f"request_match_blocks: request_block_ids: error: {type(e)} {e}") raise e - def request_match_storage_blocks(self, request, extra_gpu_block_ids): - """ - Match and fetch the cached blocks from the storage backend for the given request. - # TODO: merge this function into request_match_blocks - - args: - request: The request to be processed - extra_gpu_block_ids: A list of GPU block IDs to be used for fetching the cache - returns: - matched_block_ids: A list of block IDs that prefetched cache from storage - """ - if self.kvcache_storage_backend is None: - return [] - - req_id = request.request_id - input_ids = request.prompt_token_ids - block_size = self.cache_config.block_size - - prefix_block_key = [] - num_cached_tokens = 0 - if req_id in self.cache_info: - last_node, num_cached_tokens = self.cache_info[req_id] - prefix_block_key = [] if last_node.hash_value is None else [last_node.hash_value] - - block_keys = [] - current_tokens = num_cached_tokens - while current_tokens <= len(input_ids) - block_size: - cur_block_key = get_hash_str(input_ids[current_tokens : current_tokens + block_size], prefix_block_key) - block_keys.append(cur_block_key) - current_tokens += block_size - prefix_block_key = [cur_block_key] - - logger.info(f"start prefetch cache from storage, req_id: {req_id}, block num: {len(block_keys)}") - matched_block_ids = self.issue_prefetch_storage_task(req_id, block_keys, extra_gpu_block_ids) - logger.info( - f"finish prefetch cache from storage, req_id: {req_id}, matched block num: {len(matched_block_ids)}" - ) - - return matched_block_ids - def request_block_ids(self, task, block_size, dec_token_num, *args): """ Allocate blocks for a task. @@ -898,6 +915,7 @@ class PrefixCacheManager: req_id, cpu_match_token_num, gpu_match_token_num, + 0, input_token_num, ) hit_info["gpu_cache_blocks"] = gpu_match_token_num // block_size @@ -946,8 +964,8 @@ class PrefixCacheManager: keys.append(node.hash_value) node = node.parent - if req_id in self.cache_info: - del self.cache_info[req_id] + if req_id in self.req_to_radix_tree_info: + del self.req_to_radix_tree_info[req_id] logger.info(f"release_block_ids: req_id {req_id} leaf_node {leaf_node}") @@ -1187,6 +1205,7 @@ class PrefixCacheManager: while True: if len(self.gpu_lru_leaf_heap) == 0: + logger.info("free_block_ids_async: no more gpu leaf node available.") break if total_gpu_free_count >= need_block_num: break @@ -1239,6 +1258,9 @@ class PrefixCacheManager: ): heapq.heappush(self.gpu_lru_leaf_heap, node) self.gpu_lru_leaf_set.add(node) + logger.info( + f"free_block_ids_async: need_block_num {need_block_num}, free_block_num {total_gpu_free_count}." + ) # swap cache to cpu if hash_value_gpu_block_ids_map: @@ -1628,7 +1650,7 @@ class PrefixCacheManager: match_token_num = match_token_num + block_size current_match_node = child # record request cache info - self.cache_info[req_id] = [child, match_token_num] + self.req_to_radix_tree_info[req_id] = [child, match_token_num] else: break @@ -1952,7 +1974,7 @@ class PrefixCacheManager: self.req_leaf_map.clear() self.leaf_req_map.clear() self.unfilled_req_block_map.clear() - self.cache_info.clear() + self.req_to_radix_tree_info.clear() # reset gpu cache data structure self.gpu_lru_leaf_heap.clear() diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index 6a7b665de5..2c2f0f6c20 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -115,6 +115,7 @@ class Request: # model specific token ids: end of sentence token ids self.eos_token_ids = eos_token_ids self.num_cached_tokens = 0 + self.num_cached_blocks = 0 self.disable_chat_template = disable_chat_template self.disaggregate_info = disaggregate_info @@ -528,7 +529,7 @@ class RequestMetrics: gpu_cache_token_num: Optional[int] = 0 cpu_cache_token_num: Optional[int] = 0 storage_cache_token_num: Optional[int] = 0 - gpu_cpu_cache_prepare_time: Optional[float] = None + cpu_cache_prepare_time: Optional[float] = None storage_cache_prepare_time: Optional[float] = None def __post_init__(self): diff --git a/fastdeploy/engine/resource_manager.py b/fastdeploy/engine/resource_manager.py index 4a69455904..4388e01dd8 100644 --- a/fastdeploy/engine/resource_manager.py +++ b/fastdeploy/engine/resource_manager.py @@ -375,7 +375,7 @@ class ResourceManager: info = ( f"ResourceManager info, " f"total_block_number: {total_block_number}, total_batch_number: {total_batch_number}, " - f"available_block_num: {available_block_num}, available_batch: {available_batch_num}\n" + f"available_block_num: {available_block_num}, available_batch: {available_batch_num}," f"running_reqs: {used_batch_num}, block_usage: {block_usage:.2f}%, batch_usage: {batch_usage:.2f}%" ) return info diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 9b34d9df30..3d6157b538 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -16,7 +16,6 @@ import copy import threading -import time import traceback from collections import deque from collections.abc import Iterable @@ -276,7 +275,7 @@ class ResourceManagerV1(ResourceManager): llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}") else: self._free_blocks(preempted_req) - preempted_req.cached_block_num = 0 + preempted_req.num_cached_blocks = 0 self.to_be_rescheduled_request_id_set.add(preempted_req.request_id) llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}") preempted_reqs.append(preempted_req) @@ -650,7 +649,9 @@ class ResourceManagerV1(ResourceManager): break else: # need to prefill llm_logger.debug( - f"scheduler prefill task: {request} request.need_prefill_tokens {request.need_prefill_tokens} request.num_computed_tokens {request.num_computed_tokens}" + f"scheduler prefill task in running queue: {request.request_id}, " + f"request.need_prefill_tokens {request.need_prefill_tokens}," + f"request.num_computed_tokens {request.num_computed_tokens}" ) num_new_tokens = self._get_num_new_tokens(request, token_budget) num_new_block = self.get_new_block_nums(request, num_new_tokens) @@ -703,7 +704,10 @@ class ResourceManagerV1(ResourceManager): self._update_mm_hashes(request) # Enable prefix caching if self.config.cache_config.enable_prefix_caching: - if self.cache_manager.num_cpu_blocks > 0: + if ( + self.cache_manager.num_cpu_blocks > 0 + or self.config.cache_config.kvcache_storage_backend + ): if not self.cache_manager.can_allocate_gpu_blocks( (request.need_prefill_tokens + self.config.cache_config.block_size - 1) // self.config.cache_config.block_size @@ -721,14 +725,6 @@ class ResourceManagerV1(ResourceManager): if not request.get("skip_allocate", False): extra_gpu_block_ids = self.cache_manager.allocate_gpu_blocks(num_new_block) request.block_tables.extend(extra_gpu_block_ids) - if ( - self.config.cache_config.enable_prefix_caching - and self.config.cache_config.kvcache_storage_backend - and num_new_tokens >= self.config.cache_config.block_size - ): - matched_block_ids = self.get_storage_cached_blocks(request, extra_gpu_block_ids) - num_new_tokens -= len(matched_block_ids) * self.config.cache_config.block_size - self.waiting.popleft() self.running.append(request) scheduled_reqs.append(self._prepare_prefill_task(request, num_new_tokens)) @@ -754,7 +750,10 @@ class ResourceManagerV1(ResourceManager): request.num_total_tokens ) # Before preempted task rescheduled, preempted task has been sent to engine, no more tokens are output, here num_total_tokens should be static and correct if self.config.cache_config.enable_prefix_caching: - if self.cache_manager.num_cpu_blocks > 0: + if ( + self.cache_manager.num_cpu_blocks > 0 + or self.config.cache_config.kvcache_storage_backend + ): if not self.cache_manager.can_allocate_gpu_blocks( (request.need_prefill_tokens + self.config.cache_config.block_size - 1) // self.config.cache_config.block_size @@ -772,14 +771,6 @@ class ResourceManagerV1(ResourceManager): if not request.get("skip_allocate", False): extra_gpu_block_ids = self.cache_manager.allocate_gpu_blocks(num_new_block) request.block_tables.extend(extra_gpu_block_ids) - if ( - self.config.cache_config.enable_prefix_caching - and self.config.cache_config.kvcache_storage_backend - and num_new_tokens >= self.config.cache_config.block_size - ): - matched_block_ids = self.get_storage_cached_blocks(request, extra_gpu_block_ids) - num_new_tokens -= len(matched_block_ids) * self.config.cache_config.block_size - self.waiting.popleft() self.running.append(request) scheduled_reqs.append(self._prepare_prefill_task(request, num_new_tokens)) @@ -918,11 +909,10 @@ class ResourceManagerV1(ResourceManager): def get_prefix_cached_blocks(self, request: Request): """ - set prefix cached information for the given request + Match and fetch cache for a task. """ try: - cache_prepare_time = time.time() - (common_block_ids, matched_token_num, hit_info) = self.cache_manager.request_match_blocks( + (common_block_ids, matched_token_num, metrics) = self.cache_manager.request_match_blocks( request, self.config.cache_config.block_size ) @@ -933,8 +923,11 @@ class ResourceManagerV1(ResourceManager): ) request.num_cached_tokens = matched_token_num - request.metrics.gpu_cache_token_num = hit_info["gpu_match_token_num"] - request.metrics.cpu_cache_token_num = hit_info["cpu_match_token_num"] + request.metrics.gpu_cache_token_num = metrics["gpu_match_token_num"] + request.metrics.cpu_cache_token_num = metrics["cpu_match_token_num"] + request.metrics.storage_cache_token_num = metrics["storage_match_token_num"] + request.metrics.cpu_cache_prepare_time = metrics["cpu_cache_prepare_time"] + request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"] request.cache_info = [matched_block_num, no_cache_block_num] request.block_tables = common_block_ids request.skip_allocate = False @@ -949,45 +942,11 @@ class ResourceManagerV1(ResourceManager): request.skip_allocate = True else: request.num_computed_tokens = matched_token_num - request.metrics.gpu_cpu_cache_prepare_time = time.time() - cache_prepare_time return True except Exception as e: llm_logger.error(f"prefix match blocks error: {e}, {str(traceback.format_exc())} waiting reschedule...") return False - def get_storage_cached_blocks(self, request: Request, extra_gpu_block_ids: list = []): - """ - Match and prefetch the cached blocks from the storage backend. - TODO: merge this function into get_prefix_cached_blocks - """ - try: - tic = time.time() - req_id = request.request_id - llm_logger.debug(f"get_storage_cached_blocks start process req {req_id}") - matched_block_ids = self.cache_manager.request_match_storage_blocks(request, extra_gpu_block_ids) - llm_logger.debug( - f"matched {len(matched_block_ids)} blocks from storage for req_id:{req_id}, " - f"cost_time: {time.time() - tic:.6f}s" - ) - - matched_token_num = len(matched_block_ids) * self.config.cache_config.block_size - request.metrics.storage_cache_token_num = matched_token_num - request.num_computed_tokens += matched_token_num - if request.num_computed_tokens == request.need_prefill_tokens: - request.num_computed_tokens = request.num_computed_tokens - self.config.cache_config.block_size - request.metrics.storage_cache_prepare_time = time.time() - tic - request.cache_info[0] += len(matched_block_ids) # matched_block_num - request.cache_info[1] -= len(matched_block_ids) # no_cache_block_num - - main_process_metrics.prefix_cache_token_num.inc(matched_token_num) - # TODO: main_process_metrics.prefix_storage_cache_token_num.inc(matched_token_num) - return matched_block_ids - except Exception as e: - llm_logger.error( - f"get_storage_cached_blocks process req {req_id}, error: {e}, {str(traceback.format_exc())} " - ) - return [] - def add_request(self, request: Request) -> None: with self.lock: self.apply_async_preprocess(request) @@ -1043,8 +1002,6 @@ class ResourceManagerV1(ResourceManager): need_extra_prefill_blocks = need_prealloc_prefill_blocks - request.cache_info[0] if self.cache_manager.can_allocate_gpu_blocks(need_extra_prefill_blocks): extra_gpu_block_ids = self.cache_manager.allocate_gpu_blocks(need_extra_prefill_blocks) - if self.config.cache_config.enable_prefix_caching: - self.get_storage_cached_blocks(request, extra_gpu_block_ids) request.block_tables.extend(extra_gpu_block_ids) allocated_position = self.get_available_position() request.idx = allocated_position @@ -1143,7 +1100,7 @@ class ResourceManagerV1(ResourceManager): def _free_blocks(self, request: Request): if self.config.cache_config.enable_prefix_caching: self.cache_manager.release_block_ids(request) - self.cache_manager.recycle_gpu_blocks(request.block_tables[request.cached_block_num :]) + self.cache_manager.recycle_gpu_blocks(request.block_tables[request.num_cached_blocks :]) else: self.cache_manager.recycle_gpu_blocks(request.block_tables) request.block_tables = [] diff --git a/tests/cache_manager/test_prefix_cache_manager.py b/tests/cache_manager/test_prefix_cache_manager.py index 30532962b2..a834ff7584 100644 --- a/tests/cache_manager/test_prefix_cache_manager.py +++ b/tests/cache_manager/test_prefix_cache_manager.py @@ -376,7 +376,7 @@ class PrefixCacheManagerTest(unittest.TestCase): self.assertEqual(common, []) self.assertEqual(matched_tokens, 0) - self.assertEqual(hit_info["gpu_cache_blocks"], 0) + self.assertEqual(hit_info["gpu_match_token_num"], 0) manager.metrics.reset_metrics.assert_called_once() def test_get_required_block_num_rounds_up(self): @@ -809,7 +809,7 @@ class PrefixCacheManagerTest(unittest.TestCase): manager = _create_manager(num_gpu_blocks=2) req_id = "update-req" last_node = BlockNode(1, [], 0, 1, 0, 2, 0, 0, parent=manager.radix_tree_root) - manager.cache_info[req_id] = (last_node, 0) + manager.req_to_radix_tree_info[req_id] = (last_node, 0) manager.leaf_req_map[last_node].add(req_id) new_leaf = BlockNode(2, [], 0, 1, 0, 2, 1, 0, parent=last_node) @@ -819,7 +819,7 @@ class PrefixCacheManagerTest(unittest.TestCase): self.assertIs(manager.req_leaf_map[req_id], new_leaf) self.assertIn(req_id, manager.leaf_req_map[new_leaf]) - self.assertEqual(task.cached_block_num, 2) + self.assertEqual(task.num_cached_blocks, 2) def test_is_chunked_mm_input_detects_overlap(self): manager = _create_manager() @@ -1044,8 +1044,8 @@ class PrefixCacheManagerTest(unittest.TestCase): self.assertEqual(common_blocks[0], 0) self.assertGreaterEqual(matched_tokens, 4) mock_prepare_cpu.assert_called() - self.assertEqual(hit_info["gpu_cache_blocks"], 1) - self.assertEqual(hit_info["cpu_cache_blocks"], 1) + self.assertEqual(hit_info["gpu_match_token_num"], block_size) + self.assertEqual(hit_info["cpu_match_token_num"], block_size) def test_release_block_ids_cleans_request_state(self): manager = _create_manager(num_gpu_blocks=4)