[PD Disaggregation] Update usage of pd disaggregation and data parallel (#5742)

* Update usage of pd disaggregation

* up

* up

* up

* up

* up

* up

* up

* up

* up

* up dp docs

* up

* up

* up

* fix unittest
This commit is contained in:
jc
2026-01-05 17:51:29 +08:00
committed by GitHub
parent 690d4bcdb0
commit 8d384f9fd8
15 changed files with 441 additions and 385 deletions
+22 -11
View File
@@ -58,7 +58,7 @@ from fastdeploy.splitwise.internal_adapter_utils import InternalAdapter
from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector
from fastdeploy.trace.constants import LoggingEventName
from fastdeploy.trace.trace_logger import print as trace_print
from fastdeploy.utils import EngineError, envs, get_logger, llm_logger
from fastdeploy.utils import EngineError, console_logger, envs, get_logger, llm_logger
try:
TokenProcessor = load_token_processor_plugins()
@@ -157,6 +157,7 @@ class EngineService:
def start(self, async_llm_pid=None):
self.running = True
console_logger.debug("Start engineService...")
if self.use_async_llm:
self.start_worker_service(async_llm_pid)
@@ -807,7 +808,7 @@ class EngineService:
# start async preprocess
self.resource_manager.apply_async_preprocess(task)
need_delete_tasks = []
if envs.FD_OFFLINE_PERF_TEST_FOR_PD:
if envs.PREFILL_CONTINUOUS_REQUEST_DECODE_RESOURCES:
for task in tasks:
# assure can allocate block ids in P
while not self.resource_manager.preallocate_resource_in_p(task):
@@ -1352,6 +1353,7 @@ class EngineService:
threading.Thread(target=decode_loop, daemon=True).start()
def start_cache_service(self, device_ids, ipc_signal_suffix):
console_logger.debug("Start cache manager...")
return self.resource_manager.cache_manager.launch_cache_manager(
cache_config=self.cfg.cache_config,
tensor_parallel_size=self.cfg.parallel_config.tensor_parallel_size,
@@ -1379,17 +1381,24 @@ class EngineService:
return False
def _register_to_router(self):
"""If use router, register this server to router"""
timeout = 5
sleep_seconds = 10
"""
Periodically send server information to the router for registeration, and it is used
as a heartbeat signal.
"""
def _register():
timeout = 5
sleep_seconds = 5
is_registered = False
while True:
try:
api_server_host = self.cfg.router_config.api_server_host
api_server_port = self.cfg.router_config.api_server_port
api_server_url = f"http://{api_server_host}:{api_server_port}"
if not check_service_health(api_server_url):
time.sleep(sleep_seconds)
self.llm_logger.info("Wait for API service health and then register to router")
time.sleep(sleep_seconds)
continue
@@ -1401,20 +1410,22 @@ class EngineService:
)
if resp.ok:
self.llm_logger.info("Successfully registered to the router!")
break
if not is_registered:
is_registered = True
self.llm_logger.info("Register to router successfully")
else:
self.llm_logger.error(
f"Router registration failed: {resp.status_code}, "
f"Send server info to router failed: {resp.status_code}, "
f"{resp.text}, {self.cfg.register_info}"
)
time.sleep(sleep_seconds)
except requests.exceptions.RequestException as e:
self.llm_logger.error(f"Register to router request error: {e}")
except Exception as e:
self.llm_logger.exception(f"Unexpected error during router registration: {e}")
time.sleep(sleep_seconds)
if self.cfg.router_config.router is not None:
if self.cfg.router_config.router is None:
self.llm_logger.info("Router is not enabled, skip registering to router")
else:
register_thread = threading.Thread(target=_register, daemon=True)
register_thread.start()