[BugFix][Cherry-Pick] Fix race condition in async RL control request (#7433)

This commit is contained in:
jackyYang6
2026-04-17 16:45:50 +08:00
committed by GitHub
parent 6119a07f74
commit be28f7c4f4
+3 -3
View File
@@ -552,14 +552,14 @@ class EngineClient:
async def run_control_method(self, request: ControlRequest): async def run_control_method(self, request: ControlRequest):
api_server_logger.info(f"Received control request: {request}") api_server_logger.info(f"Received control request: {request}")
request_id = request.request_id
dealer, response_queue = await self.connection_manager.get_connection(request_id)
dealer.write([b"", request_id.encode("utf-8")])
req_dict = request.to_dict() req_dict = request.to_dict()
if not self.enable_mm and not envs.ENABLE_V1_DATA_PROCESSOR: if not self.enable_mm and not envs.ENABLE_V1_DATA_PROCESSOR:
self.zmq_client.send_json(req_dict) self.zmq_client.send_json(req_dict)
else: else:
self.zmq_client.send_pyobj(req_dict) self.zmq_client.send_pyobj(req_dict)
request_id = request.request_id
dealer, response_queue = await self.connection_manager.get_connection(request_id)
dealer.write([b"", request_id.encode("utf-8")])
try: try:
# todo: support user specified timeout. default 600s is enough for most control cases # todo: support user specified timeout. default 600s is enough for most control cases
response = await asyncio.wait_for(response_queue.get(), timeout=600) response = await asyncio.wait_for(response_queue.get(), timeout=600)