mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2026-04-23 00:17:25 +08:00
[XPU] abstract a hardware-agnostic operator wrapper for prefix cache and specify xpu device id definition (#4455)
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled
Co-authored-by: ddchenhao66 <dhaochen163.com>
This commit is contained in:
@@ -28,27 +28,19 @@ import paddle
|
||||
|
||||
from fastdeploy import envs
|
||||
from fastdeploy.cache_manager.cache_data import CacheStatus
|
||||
from fastdeploy.cache_manager.ops import (
|
||||
cuda_host_alloc,
|
||||
cuda_host_free,
|
||||
memory_allocated,
|
||||
set_data_ipc,
|
||||
set_device,
|
||||
share_external_data_,
|
||||
swap_cache_all_layers,
|
||||
unset_data_ipc,
|
||||
)
|
||||
from fastdeploy.config import SpeculativeConfig
|
||||
from fastdeploy.inter_communicator import EngineCacheQueue, IPCSignal, KVCacheStatus
|
||||
from fastdeploy.platforms import current_platform
|
||||
|
||||
if current_platform.is_cuda():
|
||||
from fastdeploy.model_executor.ops.gpu import (
|
||||
cuda_host_alloc,
|
||||
cuda_host_free,
|
||||
set_data_ipc,
|
||||
share_external_data,
|
||||
swap_cache_all_layers,
|
||||
unset_data_ipc,
|
||||
)
|
||||
elif current_platform.is_xpu():
|
||||
from fastdeploy.model_executor.ops.xpu import (
|
||||
cuda_host_alloc,
|
||||
cuda_host_free,
|
||||
set_data_ipc,
|
||||
share_external_data,
|
||||
swap_cache_all_layers,
|
||||
)
|
||||
from fastdeploy.utils import get_logger
|
||||
|
||||
|
||||
@@ -194,10 +186,7 @@ class CacheTransferManager:
|
||||
suffix=args.engine_worker_queue_port,
|
||||
create=False,
|
||||
)
|
||||
|
||||
# TODO XPU support RL
|
||||
if not current_platform.is_xpu():
|
||||
threading.Thread(target=self.clear_or_update_caches, args=[args], daemon=True).start()
|
||||
threading.Thread(target=self.clear_or_update_caches, args=[args], daemon=True).start()
|
||||
|
||||
def _init_gpu_cache(self, args):
|
||||
|
||||
@@ -208,10 +197,7 @@ class CacheTransferManager:
|
||||
logger.info(f"[rank {self.rank}/{self.n_ranks}] OK! Stop waiting.")
|
||||
|
||||
logger.info(f"[rank {self.rank}/{self.n_ranks}] Initializing kv cache for all layers.")
|
||||
if current_platform.is_cuda():
|
||||
paddle.set_device(f"gpu:{self.device}")
|
||||
elif current_platform.is_xpu():
|
||||
paddle.set_device(f"xpu:{self.device}")
|
||||
set_device(self.device)
|
||||
for i in range(args.num_layers + self.num_extra_layers):
|
||||
num_gpu_blocks = args.num_gpu_blocks if i < args.num_layers else self.num_extra_layer_gpu_blocks
|
||||
cache_shape = [num_gpu_blocks, args.kv_num_head, args.block_size, args.head_dim]
|
||||
@@ -228,12 +214,8 @@ class CacheTransferManager:
|
||||
logger.info(f"[rank {self.rank}/{self.n_ranks}] ..attaching kv cache for layer {i}: {cache_shape}")
|
||||
key_cache = paddle.empty(shape=[], dtype=args.cache_dtype)
|
||||
val_cache = paddle.empty(shape=[], dtype=args.cache_dtype)
|
||||
if current_platform.is_xpu():
|
||||
key_cache = share_external_data(key_cache, key_name, cache_shape, True)
|
||||
val_cache = share_external_data(val_cache, val_name, cache_shape, True)
|
||||
else:
|
||||
key_cache = share_external_data(key_cache, key_name, cache_shape)
|
||||
val_cache = share_external_data(val_cache, val_name, cache_shape)
|
||||
key_cache = share_external_data_(key_cache, key_name, cache_shape, True)
|
||||
val_cache = share_external_data_(val_cache, val_name, cache_shape, True)
|
||||
|
||||
self.gpu_cache_kvs[key_name] = key_cache
|
||||
self.gpu_cache_kvs[val_name] = val_cache
|
||||
@@ -247,10 +229,7 @@ class CacheTransferManager:
|
||||
cache_kv_size_byte = sum([tmp.numel() * 1 for key, tmp in self.gpu_cache_kvs.items()])
|
||||
logger.info(f"[rank {self.rank}/{self.n_ranks}] device :{self.device}")
|
||||
logger.info(f"[rank {self.rank}/{self.n_ranks}] cache_kv_size_byte : {cache_kv_size_byte}")
|
||||
if current_platform.is_cuda():
|
||||
logger.info(
|
||||
f"[rank {self.rank}/{self.n_ranks}] done init cache (full) gmem alloc : {paddle.device.cuda.memory_allocated()}"
|
||||
)
|
||||
logger.info(f"[rank {self.rank}/{self.n_ranks}] done init cache (full) gmem alloc : {memory_allocated()}")
|
||||
|
||||
def _init_cpu_cache(self, args):
|
||||
if args.num_cpu_blocks == 0:
|
||||
@@ -513,6 +492,9 @@ class CacheTransferManager:
|
||||
)
|
||||
|
||||
def clear_or_update_caches(self, args):
|
||||
# TODO XPU support RL
|
||||
if unset_data_ipc is None:
|
||||
return
|
||||
logger.info("Start a thread to clear/restore kv cache when model weights are cleared/updated.")
|
||||
logger.info(f"FD_ENABLE_SWAP_SPACE_CLEARING={envs.FD_ENABLE_SWAP_SPACE_CLEARING}")
|
||||
kv_cache_status = np.zeros([1], dtype=np.int32)
|
||||
@@ -544,10 +526,7 @@ class CacheTransferManager:
|
||||
time.sleep(0.1)
|
||||
|
||||
# clear gpu caches
|
||||
if current_platform.is_cuda():
|
||||
paddle.set_device(f"gpu:{self.device}")
|
||||
elif current_platform.is_xpu():
|
||||
paddle.set_device(f"xpu:{self.device}")
|
||||
set_device(self.device)
|
||||
for name, tensor in self.gpu_cache_kvs.items():
|
||||
unset_data_ipc(tensor, name, True, False)
|
||||
self.gpu_cache_kvs.clear()
|
||||
@@ -617,8 +596,5 @@ if __name__ == "__main__":
|
||||
args = parse_args()
|
||||
rank_id = args.rank + args.local_data_parallel_id * args.mp_num
|
||||
logger = get_logger("cache_transfer_manager", f"cache_transfer_manager_rank{rank_id}.log")
|
||||
if current_platform.is_cuda():
|
||||
paddle.set_device(f"gpu:{args.device_id}")
|
||||
elif current_platform.is_xpu():
|
||||
paddle.set_device(f"xpu:{args.device_id}")
|
||||
set_device(args.device_id)
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user