This commit is contained in:
luukunn
2026-03-09 19:35:23 +08:00
parent 186822e450
commit 0234a25b47
4 changed files with 15 additions and 23 deletions
+1 -1
View File
@@ -1553,7 +1553,7 @@ class EngineService:
else:
new_contents.append(content)
if new_contents:
batch_data.append([request_id, new_contents])
batch_data.extend(new_contents)
# Send all request results together in one batch
if batch_data:
@@ -281,12 +281,11 @@ class ZmqOpenAIServing(OpenAIServing):
for pr in ctx.preprocess_requests:
dealer.write([b"", pr["request_id"].encode("utf-8")])
while num_choices > 0:
request_output_dicts = await asyncio.wait_for(request_output_queue.get(), timeout=60)
for request_output_dict in request_output_dicts:
api_server_logger.debug(f"Received RequestOutput: {request_output_dict}")
if request_output_dict["finished"] is True:
num_choices -= 1
yield request_output_dict
request_output_dict = await asyncio.wait_for(request_output_queue.get(), timeout=60)
api_server_logger.debug(f"Received RequestOutput: {request_output_dict}")
if request_output_dict["finished"] is True:
num_choices -= 1
yield request_output_dict
except Exception as e:
raise ValueError(f"Error processing response: {str(e)}")
+7 -12
View File
@@ -202,7 +202,7 @@ class DealerConnectionManager:
async def _dispatch_batch_responses(self):
"""
Receive batch responses and dispatch to corresponding request queues.
batch_data format: [[req_id, [outputs]], [req_id, [outputs]], ...]
batch_data format: [output, output, ...] where each output contains request_id
"""
consecutive_errors = 0
max_consecutive_errors = 5
@@ -219,20 +219,15 @@ class DealerConnectionManager:
address = f"ipc:///dev/shm/response_{self.pid}.push"
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, address)
# Parse request_ids (outside lock)
parsed_items = []
for req_id, outputs in batch_data:
if req_id.startswith(("cmpl", "embd", "reward", "chatcmpl")):
req_id = req_id.rsplit("_", 1)[0]
parsed_items.append((req_id, outputs))
# Dispatch: dict lookup + put_nowait are both non-blocking,
# safe to do in a single pass under lock
# Dispatch directly: extract request_id from output and dispatch in one pass
async with self.lock:
for req_id, outputs in parsed_items:
for output in batch_data:
req_id = output.request_id
if req_id.startswith(("cmpl", "embd", "reward", "chatcmpl")):
req_id = req_id.rsplit("_", 1)[0]
queue = self.request_map.get(req_id)
if queue is not None:
queue.put_nowait(outputs)
queue.put_nowait(output)
consecutive_errors = 0
+2 -4
View File
@@ -316,7 +316,7 @@ class ZmqServerBase(ABC):
def _send_batch_response(self, batch_data):
"""
Batch send responses for multiple requests.
batch_data: List[[req_id, [output, ...]], ...]
batch_data: List[output, ...] where each output contains request_id
"""
self._ensure_socket()
if self.socket is None:
@@ -325,9 +325,7 @@ class ZmqServerBase(ABC):
try:
# Convert outputs to dict if needed (CPU work, no lock needed)
if not envs.ENABLE_V1_DATA_PROCESSOR:
for req_id, outputs in batch_data:
for i, output in enumerate(outputs):
outputs[i] = output.to_dict()
batch_data = [output.to_dict() for output in batch_data]
result = ForkingPickler.dumps(batch_data)
result_len = len(result)