[BugFix] Fix port-releated errors in mix mode when FD_ENABLE_INTERNAL_ADAPTER is enabled (#6309)

This commit is contained in:
ddchenhao66
2026-02-03 19:49:01 +08:00
committed by GitHub
parent c745a22420
commit faade7d0ab
4 changed files with 34 additions and 12 deletions
+14 -3
View File
@@ -181,9 +181,20 @@ class LLMEngine:
device_ids = self.cfg.parallel_config.device_ids.split(",")
self.cache_manager_processes = self.engine.start_cache_service(device_ids, self.ipc_signal_suffix)
if self.cfg.scheduler_config.splitwise_role != "mixed" and envs.FD_ENABLE_INTERNAL_ADAPTER:
envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT = envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS.split(",")[0]
envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT = envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS.split(",")[0]
if envs.FD_ENABLE_INTERNAL_ADAPTER:
assert (
envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS is not None or envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT is not None
), "Please set FD_ZMQ_RECV_REQUEST_SERVER_PORTS or FD_ZMQ_RECV_REQUEST_SERVER_PORT when enabling internal adapter."
assert (
envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS is not None or envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT is not None
), "Please set FD_ZMQ_SEND_RESPONSE_SERVER_PORTS or FD_ZMQ_SEND_RESPONSE_SERVER_PORT when enabling internal adapter."
if envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS is not None:
envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT = envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS.split(",")[0]
if envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS is not None:
envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT = envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS.split(",")[0]
llm_logger.info(
f"envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT:{envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT},envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT:{envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT}"
)
if api_server_pid is not None:
llm_logger.info(f"Start zmq server, api_server_pid: {api_server_pid}")
+12 -3
View File
@@ -54,15 +54,24 @@ class ExpertService:
else:
self.llm_logger = llm_logger
if cfg.scheduler_config.splitwise_role != "mixed":
if envs.FD_ENABLE_INTERNAL_ADAPTER:
if envs.FD_ENABLE_INTERNAL_ADAPTER:
assert (
envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS is not None or envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT is not None
), "Please set FD_ZMQ_RECV_REQUEST_SERVER_PORTS or FD_ZMQ_RECV_REQUEST_SERVER_PORT when enabling internal adapter."
assert (
envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS is not None or envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT is not None
), "Please set FD_ZMQ_SEND_RESPONSE_SERVER_PORTS or FD_ZMQ_SEND_RESPONSE_SERVER_PORT when enabling internal adapter."
if envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS is not None:
envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT = envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS.split(",")[
local_data_parallel_id
]
if envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS is not None:
envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT = envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS.split(",")[
local_data_parallel_id
]
self.llm_logger.info(f"local_data_parallel_id: {local_data_parallel_id}")
self.llm_logger.info(
f"local_data_parallel_id: {local_data_parallel_id},envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT:{envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT},envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT:{envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT}"
)
if self.cfg.cache_config.num_gpu_blocks_override is None:
self.do_profile = True
+4 -4
View File
@@ -110,13 +110,13 @@ environment_variables: dict[str, Callable[[], Any]] = {
# enable internal module to access LLMEngine.
"FD_ENABLE_INTERNAL_ADAPTER": lambda: int(os.getenv("FD_ENABLE_INTERNAL_ADAPTER", "0")),
# LLMEngine receive requests port, used when FD_ENABLE_INTERNAL_ADAPTER=1
"FD_ZMQ_RECV_REQUEST_SERVER_PORT": lambda: os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORT", "8200"),
"FD_ZMQ_RECV_REQUEST_SERVER_PORT": lambda: os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORT", None),
# LLMEngine send response port, used when FD_ENABLE_INTERNAL_ADAPTER=1
"FD_ZMQ_SEND_RESPONSE_SERVER_PORT": lambda: os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORT", "8201"),
"FD_ZMQ_SEND_RESPONSE_SERVER_PORT": lambda: os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORT", None),
# LLMEngine receive requests port, used when FD_ENABLE_INTERNAL_ADAPTER=1
"FD_ZMQ_RECV_REQUEST_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORTS", "8200"),
"FD_ZMQ_RECV_REQUEST_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORTS", None),
# LLMEngine send response port, used when FD_ENABLE_INTERNAL_ADAPTER=1
"FD_ZMQ_SEND_RESPONSE_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORTS", "8201"),
"FD_ZMQ_SEND_RESPONSE_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORTS", None),
# LLMEngine receive control command port, used when FD_ENABLE_INTERNAL_ADAPTER=1
"FD_ZMQ_CONTROL_CMD_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_CONTROL_CMD_SERVER_PORTS", "8202"),
# Whether to enable the decode caches requests for preallocating resource
@@ -42,8 +42,10 @@ FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8234))
FD_ENABLE_INTERNAL_ADAPTER = int(os.getenv("FD_ENABLE_INTERNAL_ADAPTER", "1"))
FD_ZMQ_RECV_REQUEST_SERVER_PORT = int(os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORT", "8204"))
FD_ZMQ_SEND_RESPONSE_SERVER_PORT = int(os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORT", "8205"))
FD_ZMQ_RECV_REQUEST_SERVER_PORTS = str(os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORTS", "8204"))
FD_ZMQ_SEND_RESPONSE_SERVER_PORTS = str(os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORTS", "8205"))
FD_ZMQ_RECV_REQUEST_SERVER_PORTS = str(os.getenv("FD_ZMQ_RECV_REQUEST_SERVER_PORTS", FD_ZMQ_RECV_REQUEST_SERVER_PORT))
FD_ZMQ_SEND_RESPONSE_SERVER_PORTS = str(
os.getenv("FD_ZMQ_SEND_RESPONSE_SERVER_PORTS", FD_ZMQ_SEND_RESPONSE_SERVER_PORT)
)
FD_ZMQ_CONTROL_CMD_SERVER_PORTS = int(os.getenv("FD_ZMQ_CONTROL_CMD_SERVER_PORTS", "8206"))
FD_ZMQ_CONTROL_CMD_SERVER_PORT = FD_ZMQ_CONTROL_CMD_SERVER_PORTS