mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2026-04-23 00:17:25 +08:00
[Feature] [KVCache] support file_store kv cache backend (#6188)
* fix(examples): comment out stop.sh to avoid error when script is missing * feat: add file_store support for cache manager * [fix] fix multi gpu transfer * [fix] fix global kvcache transfer * [Feature] [KVCache] support file_store kv cache backend * chore: update FileStore according to PR comments * fix: remove comments * fix: add swap_cache_layout for file store * fix: remove rank key * fix: Switch KV cache storage to pure file mode * Temporarily disable support for Tensor types * fix: remove args --kvcache_file_path & add envs FILE_BACKEND_STORAGE_DIR * fixx: Simplify cache_transfer_manager.py * fix: fix syntax bug * fix: Simplify file_store.py * fix: Use the key directly as the filename * fix: Simplify set() * fix: Simplify cache_transfer_manager.py & file_store.py * fix: Only support load to cpu buffer * feat: add FileStore backend for cache transfer * fix: guard zmq import
This commit is contained in:
@@ -43,7 +43,11 @@ from fastdeploy.cache_manager.ops import (
|
||||
swap_cache_layout,
|
||||
unset_data_ipc,
|
||||
)
|
||||
from fastdeploy.cache_manager.transfer_factory import AttentionStore, MooncakeStore
|
||||
from fastdeploy.cache_manager.transfer_factory import (
|
||||
AttentionStore,
|
||||
FileStore,
|
||||
MooncakeStore,
|
||||
)
|
||||
from fastdeploy.config import SpeculativeConfig
|
||||
from fastdeploy.inter_communicator import EngineCacheQueue, IPCSignal, KVCacheStatus
|
||||
from fastdeploy.platforms import current_platform
|
||||
@@ -112,7 +116,7 @@ def parse_args():
|
||||
"--kvcache_storage_backend",
|
||||
type=str,
|
||||
default=None,
|
||||
choices=["mooncake", "attention_store"],
|
||||
choices=["mooncake", "attention_store", "file"],
|
||||
help="The storage backend for kvcache storage. If not set, storage backend is disabled.",
|
||||
)
|
||||
parser.add_argument(
|
||||
@@ -289,6 +293,15 @@ class CacheTransferManager:
|
||||
dp_id=self.local_data_parallel_id,
|
||||
)
|
||||
logger.info("Initialized attention store successfully!")
|
||||
elif args.kvcache_storage_backend == "file":
|
||||
logger.info("Start initialize file store...")
|
||||
self.storage_backend = FileStore(
|
||||
namespace=self.model_id,
|
||||
tp_rank=self.rank,
|
||||
tp_size=self.n_ranks,
|
||||
)
|
||||
self._init_storage_buffer(args)
|
||||
logger.info("Initialized file store successfully")
|
||||
else:
|
||||
raise NotImplementedError(f"Unsupported storage backend: {self.storage_backend_type}")
|
||||
except Exception as e:
|
||||
@@ -518,7 +531,7 @@ class CacheTransferManager:
|
||||
Read storage data from the given blocks to the corresponding cache tensors on the current rank's GPU.
|
||||
"""
|
||||
try:
|
||||
if self.storage_backend_type == "mooncake":
|
||||
if self.storage_backend_type in ("mooncake", "file"):
|
||||
block_num = len(gpu_block_ids)
|
||||
keys = k_cache_keys + v_cache_keys
|
||||
k_cache_ptrs = [
|
||||
@@ -600,7 +613,7 @@ class CacheTransferManager:
|
||||
k_cache_keys = [f"prefix{self.key_prefix}_{key}_{self.rank}_key" for key in task.keys]
|
||||
v_cache_keys = [f"prefix{self.key_prefix}_{key}_{self.rank}_value" for key in task.keys]
|
||||
match_block_num = 0
|
||||
if self.storage_backend_type == "mooncake":
|
||||
if self.storage_backend_type in ("mooncake", "file"):
|
||||
match_block_num = self.storage_backend.query(k_cache_keys, v_cache_keys)
|
||||
elif self.storage_backend_type == "attention_store":
|
||||
match_block_num = self.storage_backend.query(
|
||||
@@ -666,7 +679,7 @@ class CacheTransferManager:
|
||||
timeout,
|
||||
):
|
||||
try:
|
||||
if self.storage_backend_type == "mooncake":
|
||||
if self.storage_backend_type in ("mooncake", "file"):
|
||||
key_cache_size = [
|
||||
self.key_cache_shape[0],
|
||||
self.key_cache_shape[1],
|
||||
@@ -747,7 +760,7 @@ class CacheTransferManager:
|
||||
v_cache_keys = [f"prefix{self.key_prefix}_{key}_{self.rank}_value" for key in task.keys]
|
||||
|
||||
match_block_num = 0
|
||||
if self.storage_backend_type == "mooncake":
|
||||
if self.storage_backend_type == ("mooncake", "file"):
|
||||
match_block_num = self.storage_backend.query(k_cache_keys, v_cache_keys, task.timeout)
|
||||
elif self.storage_backend_type == "attention_store":
|
||||
match_block_num = self.storage_backend.query(task.task_id, task.token_ids, 0, task.timeout)
|
||||
|
||||
Reference in New Issue
Block a user