From faade7d0abe1a9d7dfe666ad67be169df017f2b2 Mon Sep 17 00:00:00 2001 From: ddchenhao66 <165133255+ddchenhao66@users.noreply.github.com> Date: Tue, 3 Feb 2026 19:49:01 +0800 Subject: [PATCH] [BugFix] Fix port-releated errors in mix mode when FD_ENABLE_INTERNAL_ADAPTER is enabled (#6309) --- fastdeploy/engine/engine.py | 17 ++++++++++++++--- fastdeploy/engine/expert_service.py | 15 ++++++++++++--- fastdeploy/envs.py | 8 ++++---- .../EB_Lite_with_adapter/test_eblite_serving.py | 6 ++++-- 4 files changed, 34 insertions(+), 12 deletions(-) diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index f6de120421..7a18d67669 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -181,9 +181,20 @@ class LLMEngine: device_ids = self.cfg.parallel_config.device_ids.split(",") self.cache_manager_processes = self.engine.start_cache_service(device_ids, self.ipc_signal_suffix) - if self.cfg.scheduler_config.splitwise_role != "mixed" and envs.FD_ENABLE_INTERNAL_ADAPTER: - envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT = envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS.split(",")[0] - envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT = envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS.split(",")[0] + if envs.FD_ENABLE_INTERNAL_ADAPTER: + assert ( + envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS is not None or envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT is not None + ), "Please set FD_ZMQ_RECV_REQUEST_SERVER_PORTS or FD_ZMQ_RECV_REQUEST_SERVER_PORT when enabling internal adapter." + assert ( + envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS is not None or envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT is not None + ), "Please set FD_ZMQ_SEND_RESPONSE_SERVER_PORTS or FD_ZMQ_SEND_RESPONSE_SERVER_PORT when enabling internal adapter." + if envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS is not None: + envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT = envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS.split(",")[0] + if envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS is not None: + envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT = envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS.split(",")[0] + llm_logger.info( + f"envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT:{envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT},envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT:{envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT}" + ) if api_server_pid is not None: llm_logger.info(f"Start zmq server, api_server_pid: {api_server_pid}") diff --git a/fastdeploy/engine/expert_service.py b/fastdeploy/engine/expert_service.py index cab831ab98..8a2f121d3f 100644 --- a/fastdeploy/engine/expert_service.py +++ b/fastdeploy/engine/expert_service.py @@ -54,15 +54,24 @@ class ExpertService: else: self.llm_logger = llm_logger - if cfg.scheduler_config.splitwise_role != "mixed": - if envs.FD_ENABLE_INTERNAL_ADAPTER: + if envs.FD_ENABLE_INTERNAL_ADAPTER: + assert ( + envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS is not None or envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT is not None + ), "Please set FD_ZMQ_RECV_REQUEST_SERVER_PORTS or FD_ZMQ_RECV_REQUEST_SERVER_PORT when enabling internal adapter." + assert ( + envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS is not None or envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT is not None + ), "Please set FD_ZMQ_SEND_RESPONSE_SERVER_PORTS or FD_ZMQ_SEND_RESPONSE_SERVER_PORT when enabling internal adapter." + if envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS is not None: envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT = envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS.split(",")[ local_data_parallel_id ] + if envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS is not None: envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT = envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS.split(",")[ local_data_parallel_id ] - self.llm_logger.info(f"local_data_parallel_id: {local_data_parallel_id}") + self.llm_logger.info( + f"local_data_parallel_id: {local_data_parallel_id},envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT:{envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT},envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT:{envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT}" + ) if self.cfg.cache_config.num_gpu_blocks_override is None: self.do_profile = True diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 6b06263d18..3c87296ca4 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -110,13 +110,13 @@ environment_variables: dict[str, Callable[[], Any]] = { # enable internal module to access LLMEngine. "FD_ENABLE_INTERNAL_ADAPTER": lambda: int(os.getenv("FD_ENABLE_INTERNAL_ADAPTER", "0")), # LLMEngine receive requests port, used when FD_ENABLE_INTERNAL_ADAPTER=1 - "FD_ZMQ_RECV_REQUEST_SERVER_PORT": lambda: os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORT", "8200"), + "FD_ZMQ_RECV_REQUEST_SERVER_PORT": lambda: os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORT", None), # LLMEngine send response port, used when FD_ENABLE_INTERNAL_ADAPTER=1 - "FD_ZMQ_SEND_RESPONSE_SERVER_PORT": lambda: os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORT", "8201"), + "FD_ZMQ_SEND_RESPONSE_SERVER_PORT": lambda: os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORT", None), # LLMEngine receive requests port, used when FD_ENABLE_INTERNAL_ADAPTER=1 - "FD_ZMQ_RECV_REQUEST_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORTS", "8200"), + "FD_ZMQ_RECV_REQUEST_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORTS", None), # LLMEngine send response port, used when FD_ENABLE_INTERNAL_ADAPTER=1 - "FD_ZMQ_SEND_RESPONSE_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORTS", "8201"), + "FD_ZMQ_SEND_RESPONSE_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORTS", None), # LLMEngine receive control command port, used when FD_ENABLE_INTERNAL_ADAPTER=1 "FD_ZMQ_CONTROL_CMD_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_CONTROL_CMD_SERVER_PORTS", "8202"), # Whether to enable the decode caches requests for preallocating resource diff --git a/tests/ci_use/EB_Lite_with_adapter/test_eblite_serving.py b/tests/ci_use/EB_Lite_with_adapter/test_eblite_serving.py index a13b749da4..6d8dfac53f 100644 --- a/tests/ci_use/EB_Lite_with_adapter/test_eblite_serving.py +++ b/tests/ci_use/EB_Lite_with_adapter/test_eblite_serving.py @@ -42,8 +42,10 @@ FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8234)) FD_ENABLE_INTERNAL_ADAPTER = int(os.getenv("FD_ENABLE_INTERNAL_ADAPTER", "1")) FD_ZMQ_RECV_REQUEST_SERVER_PORT = int(os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORT", "8204")) FD_ZMQ_SEND_RESPONSE_SERVER_PORT = int(os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORT", "8205")) -FD_ZMQ_RECV_REQUEST_SERVER_PORTS = str(os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORTS", "8204")) -FD_ZMQ_SEND_RESPONSE_SERVER_PORTS = str(os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORTS", "8205")) +FD_ZMQ_RECV_REQUEST_SERVER_PORTS = str(os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORTS", FD_ZMQ_RECV_REQUEST_SERVER_PORT)) +FD_ZMQ_SEND_RESPONSE_SERVER_PORTS = str( + os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORTS", FD_ZMQ_SEND_RESPONSE_SERVER_PORT) +) FD_ZMQ_CONTROL_CMD_SERVER_PORTS = int(os.getenv("FD_ZMQ_CONTROL_CMD_SERVER_PORTS", "8206")) FD_ZMQ_CONTROL_CMD_SERVER_PORT = FD_ZMQ_CONTROL_CMD_SERVER_PORTS