[Perf] Support tensor transmission between work and engine with zero-copy to improve efficiency (#4839)

* feat(zmq): support tensor transmission with zero-copy for improved efficiency

* perf: zmq.send disable copy

* zmq recv data for debug

* convert logprobs tensor to cpu
This commit is contained in:
SunLei
2025-11-11 15:43:11 +08:00
committed by GitHub
parent 8b61f01c68
commit 3098aee05f
8 changed files with 23 additions and 18 deletions
+5 -4
View File
@@ -19,6 +19,7 @@ import threading
import time
from abc import ABC, abstractmethod
from collections import defaultdict
from multiprocessing.reduction import ForkingPickler
import msgpack
import zmq
@@ -44,7 +45,7 @@ class ZmqServerBase(ABC):
def _ensure_socket(self):
"""Ensure the socket is created before use."""
if self.socket is None:
self.socket = self._create_socket()
self.socket: zmq.Socket = self._create_socket()
def send_json(self, data):
"""
@@ -65,14 +66,14 @@ class ZmqServerBase(ABC):
Send a Pickle-serializable object over the socket.
"""
self._ensure_socket()
self.socket.send_pyobj(data)
self.socket.send(ForkingPickler.dumps(data), copy=False)
def recv_pyobj(self):
"""
Receive a Pickle-serializable object from the socket.
"""
self._ensure_socket()
return self.socket.recv_pyobj()
return ForkingPickler.loads(self.socket.recv())
def pack_aggregated_data(self, data):
"""
@@ -111,7 +112,7 @@ class ZmqServerBase(ABC):
return "zmp socket has closed", None
try:
flags = zmq.NOBLOCK if not block else 0
return None, self.socket.recv_pyobj(flags=flags)
return None, ForkingPickler.loads(self.socket.recv(flags=flags))
except zmq.Again:
return None, None
except Exception as e: