mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2026-04-22 16:07:51 +08:00
Mooncake storage register local buffer by chunk (#7416)
This commit is contained in:
@@ -90,7 +90,7 @@ Create a `mooncake_config.json` file:
|
||||
"metadata_server": "http://0.0.0.0:15002/metadata",
|
||||
"master_server_addr": "0.0.0.0:15001",
|
||||
"global_segment_size": 1000000000,
|
||||
"local_buffer_size": 134217728,
|
||||
"local_buffer_size": 1048576,
|
||||
"protocol": "rdma",
|
||||
"rdma_devices": ""
|
||||
}
|
||||
|
||||
@@ -90,7 +90,7 @@ pip install ./dist/fastdeploy*.whl
|
||||
"metadata_server": "http://0.0.0.0:15002/metadata",
|
||||
"master_server_addr": "0.0.0.0:15001",
|
||||
"global_segment_size": 1000000000,
|
||||
"local_buffer_size": 134217728,
|
||||
"local_buffer_size": 1048576,
|
||||
"protocol": "rdma",
|
||||
"rdma_devices": ""
|
||||
}
|
||||
|
||||
@@ -31,7 +31,14 @@ from fastdeploy.platforms import current_platform
|
||||
from fastdeploy.utils import get_host_ip
|
||||
|
||||
DEFAULT_GLOBAL_SEGMENT_SIZE = 1024 * 1024 * 1024 # 1 GiB
|
||||
DEFAULT_LOCAL_BUFFER_SIZE = 128 * 1024 * 1024 # 128MB
|
||||
DEFAULT_LOCAL_BUFFER_SIZE = 1024 * 1024 # 1MB
|
||||
DEFAULT_MC_MAX_MR_SIZE = 4 * 1024 * 1024 * 1024 # 4GB
|
||||
MIN_MC_MAX_MR_SIZE = 1024 * 1024 * 1024 # 1GB
|
||||
MAX_MC_MAX_MR_SIZE = 6 * 1024 * 1024 * 1024 # 6GB
|
||||
|
||||
|
||||
def byte_to_gb(byte):
|
||||
return byte / (1024 * 1024 * 1024)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -111,9 +118,25 @@ class MooncakeStore(KVCacheStorage):
|
||||
host_ip = get_host_ip()
|
||||
os.environ["MC_TCP_BIND_ADDRESS"] = host_ip
|
||||
logger.info(f"Set MC_TCP_BIND_ADDRESS to {host_ip}")
|
||||
if os.environ.get("MC_MAX_MR_SIZE") is None:
|
||||
os.environ["MC_MAX_MR_SIZE"] = "4294967296" # 4GB
|
||||
logger.info("MC_MAX_MR_SIZE is not set, default to 4GB.")
|
||||
|
||||
# Set MC_MAX_MR_SIZE for mooncake store to control the maximum mr size
|
||||
self.mc_max_mr_size = int(os.environ.get("MC_MAX_MR_SIZE", 0))
|
||||
if self.mc_max_mr_size == 0:
|
||||
self.mc_max_mr_size = DEFAULT_MC_MAX_MR_SIZE
|
||||
logger.info(f"MC_MAX_MR_SIZE is not set, default to {byte_to_gb(DEFAULT_MC_MAX_MR_SIZE)} GB.")
|
||||
elif self.mc_max_mr_size < MIN_MC_MAX_MR_SIZE:
|
||||
self.mc_max_mr_size = MIN_MC_MAX_MR_SIZE
|
||||
logger.info(
|
||||
f"MC_MAX_MR_SIZE is smaller than {byte_to_gb(MIN_MC_MAX_MR_SIZE)} GB, set to {byte_to_gb(MIN_MC_MAX_MR_SIZE)} GB."
|
||||
)
|
||||
elif self.mc_max_mr_size > MAX_MC_MAX_MR_SIZE:
|
||||
self.mc_max_mr_size = MAX_MC_MAX_MR_SIZE
|
||||
logger.info(
|
||||
f"MC_MAX_MR_SIZE is larger than {byte_to_gb(MAX_MC_MAX_MR_SIZE)} GB, set to {byte_to_gb(MAX_MC_MAX_MR_SIZE)} GB."
|
||||
)
|
||||
else:
|
||||
logger.info(f"MC_MAX_MR_SIZE is set to {self.mc_max_mr_size} bytes.")
|
||||
os.environ["MC_MAX_MR_SIZE"] = str(self.mc_max_mr_size)
|
||||
|
||||
try:
|
||||
from mooncake.store import MooncakeDistributedStore
|
||||
@@ -129,6 +152,11 @@ class MooncakeStore(KVCacheStorage):
|
||||
self.config = MooncakeStoreConfig.create()
|
||||
if self.tp_rank is not None:
|
||||
self.config.select_rdma_device(self.tp_rank)
|
||||
if self.config.local_buffer_size > self.mc_max_mr_size:
|
||||
raise ValueError(
|
||||
f"local_buffer_size {self.config.local_buffer_size} must be "
|
||||
f"smaller than mc_max_mr_size {self.mc_max_mr_size}"
|
||||
)
|
||||
logger.info(f"Mooncake Configuration loaded, {self.config}.")
|
||||
|
||||
ret_code = self.store.setup(
|
||||
@@ -162,13 +190,38 @@ class MooncakeStore(KVCacheStorage):
|
||||
self.store.remove(warmup_key)
|
||||
|
||||
def register_buffer(self, buffer_ptr, buffer_size) -> None:
|
||||
try:
|
||||
ret_code = self.store.register_buffer(buffer_ptr, buffer_size)
|
||||
if ret_code:
|
||||
logger.error(f"failed to register buffer, error code: {ret_code}")
|
||||
except TypeError as err:
|
||||
logger.error("Failed to register buffer to Mooncake Store: %s", err)
|
||||
raise TypeError("Mooncake Store Register Buffer Error.") from err
|
||||
"""Register a buffer with Mooncake Store.
|
||||
If buffer_size exceeds mc_max_mr_size, the buffer is split into
|
||||
multiple chunks, each registered separately.
|
||||
cuda_host_alloc returns physically contiguous pinned memory, so
|
||||
pointer offset arithmetic is valid for sub-region registration.
|
||||
"""
|
||||
max_mr_size = self.mc_max_mr_size
|
||||
if buffer_size <= max_mr_size:
|
||||
try:
|
||||
ret_code = self.store.register_buffer(buffer_ptr, buffer_size)
|
||||
assert ret_code == 0, f"failed to register buffer, error code: {ret_code}"
|
||||
except TypeError as err:
|
||||
logger.error("Failed to register buffer to Mooncake Store: %s", err)
|
||||
raise TypeError("Mooncake Store Register Buffer Error.") from err
|
||||
else:
|
||||
num_chunks = (buffer_size + max_mr_size - 1) // max_mr_size
|
||||
logger.info(
|
||||
f"Registering buffer of {byte_to_gb(buffer_size):.2f}GB in {num_chunks} chunks "
|
||||
f"(max_mr_size={byte_to_gb(max_mr_size):.2f}GB per chunk)"
|
||||
)
|
||||
for i in range(num_chunks):
|
||||
chunk_ptr = buffer_ptr + i * max_mr_size
|
||||
chunk_size = min(max_mr_size, buffer_size - i * max_mr_size)
|
||||
try:
|
||||
ret_code = self.store.register_buffer(chunk_ptr, chunk_size)
|
||||
assert ret_code == 0, (
|
||||
f"failed to register chunk {i}/{num_chunks}, "
|
||||
f"size={byte_to_gb(chunk_size):.2f}GB, error code: {ret_code}"
|
||||
)
|
||||
except TypeError as err:
|
||||
logger.error("Failed to register chunk %d/%d to Mooncake Store: %s", i, num_chunks, err)
|
||||
raise TypeError("Mooncake Store Register Buffer Error.") from err
|
||||
|
||||
def set(
|
||||
self,
|
||||
|
||||
Reference in New Issue
Block a user