[Feature] EngineWorkerQueue anonymous port (#4597)

* EngineWorkerQueue 支持匿名端口设置

* EngineWorkerQueue 支持匿名端口设置

* EngineWorkerQueue 支持匿名端口设置

* EngineWorkerQueue 支持匿名端口设置

* EngineWorkerQueue 支持匿名端口设置
This commit is contained in:
Daci
2025-10-28 10:22:37 +08:00
committed by GitHub
parent 7681375a19
commit 6426414a0f
5 changed files with 46 additions and 3 deletions
+2 -2
View File
@@ -216,7 +216,7 @@ class EngineArgs:
The amount of CPU memory to offload to.
"""
cache_queue_port: str = "8003"
cache_queue_port: str = "0"
"""
Port for cache queue.
"""
@@ -236,7 +236,7 @@ class EngineArgs:
Flag to enable the custom all-reduce kernel.
"""
engine_worker_queue_port: str = "8002"
engine_worker_queue_port: str = "0"
"""
Port for worker queue communication.
"""
+12
View File
@@ -268,6 +268,16 @@ class EngineService:
num_client=self.cfg.parallel_config.tensor_parallel_size,
local_data_parallel_size=self.cfg.parallel_config.data_parallel_size,
)
# Dynamically updates the port value if an anonymous port is used
self.cfg.parallel_config.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id] = str(
self.engine_worker_queue_server.get_server_port()
)
address = (
self.cfg.master_ip,
int(
self.cfg.parallel_config.engine_worker_queue_port[self.cfg.parallel_config.local_data_parallel_id]
),
)
if self.cfg.cache_config.enable_prefix_caching or self.cfg.scheduler_config.splitwise_role != "mixed":
self.cache_task_queue = EngineCacheQueue(
@@ -281,6 +291,8 @@ class EngineService:
client_id=-1,
local_data_parallel_size=self.cfg.parallel_config.data_parallel_size,
)
self.cfg.cache_config.cache_queue_port = self.cache_task_queue.get_server_port()
self.llm_logger.info(
f"local {min(self.cfg.worker_num_per_node * self.cfg.node_rank + self.cfg.parallel_config.local_data_parallel_id,self.cfg.parallel_config.data_parallel_size - 1)}"
)
+1 -1
View File
@@ -183,7 +183,7 @@ async def lifespan(app: FastAPI):
max_model_len=args.max_model_len,
tensor_parallel_size=args.tensor_parallel_size,
pid=pid,
port=int(args.engine_worker_queue_port[args.local_data_parallel_id]),
port=int(os.environ.get("INFERENCE_MSG_QUEUE_ID", "0")),
limit_mm_per_prompt=args.limit_mm_per_prompt,
mm_processor_kwargs=args.mm_processor_kwargs,
reasoning_parser=args.reasoning_parser,
@@ -61,6 +61,7 @@ class EngineCacheQueue:
"""
self.address: Tuple[str, int] = address
self.authkey: bytes = authkey
self.is_server: bool = is_server
self.num_client: int = num_client
self.client_id: int = client_id
self.local_data_parallel_size = local_data_parallel_size
@@ -150,6 +151,12 @@ class EngineCacheQueue:
self.manager: BaseManager = QueueManager(address=self.address, authkey=self.authkey)
self.manager.start()
# If the port is 0, an anonymous port will be automatically assigned. The port range can be queried from system configuration,
# e.g., by running 'cat /proc/sys/net/ipv4/ip_local_port_range'; typically in the range of 10000-60999.
# After manager.start(), its address attribute will be updated to the actual listening address.
# We update self.address here so that the real address can be queried later.
self.address = self.manager.address
logger.info(f"EngineCacheQueue server started at {self.address}")
else:
# Client-side connection setup
@@ -194,6 +201,15 @@ class EngineCacheQueue:
self.position: int = 1 << self.client_id
logger.info(f"Connected EngineCacheQueue client_id: {self.client_id}")
def get_server_port(self) -> int:
"""
Returns the actual port that the server instance is listening on.
Calling this method only makes sense on instances where is_server=True.
"""
if not self.is_server:
raise RuntimeError("Only the server instance can provide the port.")
return self.address[1]
def _connect_with_retry(self, max_retries: int = 5, interval: int = 3) -> None:
"""
Connect to the server with retry mechanism.
@@ -206,6 +206,12 @@ class EngineWorkerQueue:
)
self.manager: BaseManager = QueueManager(address=self.address, authkey=self.authkey)
self.manager.start()
# If the port is 0, an anonymous port will be automatically assigned. The port range can be queried from system configuration,
# e.g., by running 'cat /proc/sys/net/ipv4/ip_local_port_range'; typically in the range of 10000-60999.
# After manager.start(), its address attribute will be updated to the actual listening address.
# We update self.address here so that the real address can be queried later.
self.address = self.manager.address
else:
# Client-side connection setup
assert (
@@ -277,6 +283,15 @@ class EngineWorkerQueue:
f"of connected clients: {self.connected_client_counter.get()}"
)
def get_server_port(self) -> int:
"""
Returns the actual port that the server instance is listening on.
Calling this method only makes sense on instances where is_server=True.
"""
if not self.is_server:
raise RuntimeError("Only the server instance can provide the port.")
return self.address[1]
def _connect_with_retry(self, max_retries: int = 5, interval: int = 3) -> None:
"""
Connect to the server with retry mechanism.