mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2026-04-23 00:17:25 +08:00
[BugFix][Metrics] Fix Prometheus Multiprocess Metrics Issues and Add ZMQ Communication Metrics (#5185)
* [Feature] add metrics for ZMQ and fix multiprocess metrics * fix test_metrics.py --------- Co-authored-by: Jiaxin Sui <95567040+plusNew001@users.noreply.github.com>
This commit is contained in:
@@ -23,8 +23,11 @@ from multiprocessing.reduction import ForkingPickler
|
||||
|
||||
import msgpack
|
||||
import zmq
|
||||
from zmq.utils import jsonapi
|
||||
|
||||
from fastdeploy import envs
|
||||
from fastdeploy.metrics.metrics import main_process_metrics
|
||||
from fastdeploy.metrics.stats import ZMQMetricsStats
|
||||
from fastdeploy.utils import llm_logger
|
||||
|
||||
|
||||
@@ -36,6 +39,7 @@ class ZmqServerBase(ABC):
|
||||
def __init__(self):
|
||||
self.cached_results = defaultdict(list)
|
||||
self.response_token_lock = threading.Lock()
|
||||
self.address = None
|
||||
self.response_handle_per_step = None
|
||||
self.response_handle_name_per_step = None
|
||||
self.batch_id_per_step = 0
|
||||
@@ -50,33 +54,96 @@ class ZmqServerBase(ABC):
|
||||
if self.socket is None:
|
||||
self.socket: zmq.Socket = self._create_socket()
|
||||
|
||||
def send_json(self, data):
|
||||
"""
|
||||
Send a JSON-serializable object over the socket.
|
||||
"""
|
||||
self._ensure_socket()
|
||||
self.socket.send_json(data)
|
||||
def send_json(self, data, flags: int = 0):
|
||||
"""Send a Python object as a message using json to serialize.
|
||||
|
||||
def recv_json(self):
|
||||
Keyword arguments are passed on to json.dumps
|
||||
|
||||
Parameters
|
||||
----------
|
||||
obj : Python object
|
||||
The Python object to send
|
||||
flags : int
|
||||
Any valid flags for :func:`Socket.send`
|
||||
"""
|
||||
_zmq_metrics_stats = ZMQMetricsStats()
|
||||
try:
|
||||
# package data with meta information
|
||||
envelope = {"__meta": {"send_ts": time.perf_counter()}, "data": data}
|
||||
msg = jsonapi.dumps(envelope)
|
||||
|
||||
# collect zmq send metrics
|
||||
_zmq_metrics_stats.msg_bytes_send_total += len(msg)
|
||||
|
||||
return self.socket.send(msg, flags=flags)
|
||||
except Exception as e:
|
||||
_zmq_metrics_stats.msg_send_failed_total += 1
|
||||
raise e
|
||||
finally:
|
||||
# collect zmq send metrics
|
||||
_zmq_metrics_stats.msg_send_total += 1
|
||||
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address)
|
||||
|
||||
def recv_json(self, flags: int = 0):
|
||||
"""
|
||||
Receive a JSON-serializable object from the socket.
|
||||
"""
|
||||
self._ensure_socket()
|
||||
return self.socket.recv_json()
|
||||
_zmq_metrics_stats = ZMQMetricsStats()
|
||||
try:
|
||||
# receive from socket
|
||||
msg = self.socket.recv(flags=flags)
|
||||
data_dict = self.socket._deserialize(msg, lambda buf: jsonapi.loads(buf))
|
||||
|
||||
def send_pyobj(self, data):
|
||||
# collect zmq recv metrics
|
||||
_zmq_metrics_stats.msg_bytes_recv_total += len(msg)
|
||||
_zmq_metrics_stats.msg_recv_total += 1
|
||||
|
||||
# first check if the received msg is a dict
|
||||
if isinstance(data_dict, dict):
|
||||
# then check if the dict has "__meta" key
|
||||
if "__meta" in data_dict and "send_ts" in data_dict["__meta"]:
|
||||
# if so, calculate the delay
|
||||
_zmq_metrics_stats.zmq_latency = time.perf_counter() - data_dict["__meta"]["send_ts"]
|
||||
return data_dict["data"]
|
||||
return data_dict
|
||||
finally:
|
||||
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address)
|
||||
|
||||
def send_pyobj(self, data, flags: int = 0):
|
||||
"""
|
||||
Send a Pickle-serializable object over the socket.
|
||||
"""
|
||||
self._ensure_socket()
|
||||
self.socket.send(ForkingPickler.dumps(data), copy=False)
|
||||
_zmq_metrics_stats = ZMQMetricsStats()
|
||||
try:
|
||||
envelope = {"__meta": {"send_ts": time.perf_counter()}, "data": data}
|
||||
data_bytes = ForkingPickler.dumps(envelope)
|
||||
_zmq_metrics_stats.msg_bytes_send_total += len(data_bytes)
|
||||
self.socket.send(data_bytes, copy=False, flags=flags)
|
||||
except Exception as e:
|
||||
_zmq_metrics_stats.msg_send_failed_total += 1
|
||||
raise e
|
||||
finally:
|
||||
_zmq_metrics_stats.msg_send_total += 1
|
||||
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address)
|
||||
|
||||
def recv_pyobj(self):
|
||||
def recv_pyobj(self, flags: int = 0):
|
||||
"""
|
||||
Receive a Pickle-serializable object from the socket.
|
||||
"""
|
||||
_zmq_metrics_stats = ZMQMetricsStats()
|
||||
self._ensure_socket()
|
||||
return ForkingPickler.loads(self.socket.recv())
|
||||
data_bytes = self.socket.recv(flags=flags)
|
||||
envelope = ForkingPickler.loads(data_bytes)
|
||||
if isinstance(envelope, dict):
|
||||
if "__meta" in envelope and "send_ts" in envelope["__meta"]:
|
||||
_zmq_metrics_stats.msg_recv_total += 1
|
||||
_zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes)
|
||||
_zmq_metrics_stats.zmq_latency = time.perf_counter() - envelope["__meta"]["send_ts"]
|
||||
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address)
|
||||
return envelope["data"]
|
||||
return envelope
|
||||
|
||||
def pack_aggregated_data(self, data):
|
||||
"""
|
||||
@@ -98,7 +165,7 @@ class ZmqServerBase(ABC):
|
||||
return "zmp socket has closed", None
|
||||
try:
|
||||
flags = zmq.NOBLOCK if not block else 0
|
||||
return None, self.socket.recv_json(flags=flags)
|
||||
return None, self.recv_json(flags=flags)
|
||||
except zmq.Again:
|
||||
return None, None
|
||||
except Exception as e:
|
||||
@@ -115,7 +182,7 @@ class ZmqServerBase(ABC):
|
||||
return "zmp socket has closed", None
|
||||
try:
|
||||
flags = zmq.NOBLOCK if not block else 0
|
||||
return None, ForkingPickler.loads(self.socket.recv(flags=flags))
|
||||
return None, self.recv_pyobj(flags=flags)
|
||||
except zmq.Again:
|
||||
return None, None
|
||||
except Exception as e:
|
||||
@@ -213,7 +280,17 @@ class ZmqServerBase(ABC):
|
||||
else:
|
||||
result = msgpack.packb([response.to_dict() for response in new_data])
|
||||
with self.response_token_lock:
|
||||
self.socket.send_multipart([self.req_dict[req_id], b"", result])
|
||||
|
||||
_zmq_metrics_stats = ZMQMetricsStats()
|
||||
try:
|
||||
self.socket.send_multipart([self.req_dict[req_id], b"", result])
|
||||
_zmq_metrics_stats.msg_bytes_send_total += len(result)
|
||||
except Exception as e:
|
||||
_zmq_metrics_stats.msg_send_failed_total += 1
|
||||
raise e
|
||||
finally:
|
||||
_zmq_metrics_stats.msg_send_total += 1
|
||||
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address)
|
||||
llm_logger.debug(
|
||||
f"send_multipart result: {req_id} len {len(new_data)} elapse: {time.time()-start_send}"
|
||||
)
|
||||
@@ -269,7 +346,8 @@ class ZmqIpcServer(ZmqServerBase):
|
||||
self.socket = self.context.socket(self.mode)
|
||||
self.socket.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM)
|
||||
self.socket.setsockopt(zmq.SNDTIMEO, -1)
|
||||
self.socket.bind(f"ipc://{self.file_name}")
|
||||
self.address = f"ipc://{self.file_name}"
|
||||
self.socket.bind(self.address)
|
||||
return self.socket
|
||||
|
||||
def _clear_ipc(self, name):
|
||||
@@ -327,7 +405,8 @@ class ZmqTcpServer(ZmqServerBase):
|
||||
self.socket = self.context.socket(self.mode)
|
||||
self.socket.setsockopt(zmq.SNDHWM, self.ZMQ_SNDHWM)
|
||||
self.socket.setsockopt(zmq.SNDTIMEO, -1)
|
||||
self.socket.bind(f"tcp://*:{self.port}")
|
||||
self.address = f"tcp://*:{self.port}"
|
||||
self.socket.bind(self.address)
|
||||
return self.socket
|
||||
|
||||
def recv_control_cmd(self):
|
||||
|
||||
Reference in New Issue
Block a user