mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2026-04-23 00:17:25 +08:00
[Feature]Log Format Normalization and Trace Log Optimization (#6370)
* log refactor * log refactor 2 * log refactor 3
This commit is contained in:
@@ -24,7 +24,7 @@ import numpy as np
|
||||
from fastdeploy.engine.args_utils import EngineArgs
|
||||
from fastdeploy.engine.common_engine import EngineService
|
||||
|
||||
MODEL_NAME = os.getenv("MODEL_PATH", "/path/to/models") + "/ERNIE-4.5-0.3B-Paddle"
|
||||
MODEL_NAME = os.getenv("MODEL_PATH", "/workspace/wenlei/models") + "/ERNIE-4.5-0.3B-Paddle"
|
||||
|
||||
|
||||
class TestCommonEngine(unittest.TestCase):
|
||||
@@ -872,3 +872,98 @@ class TestCommonEngineAdditionalCoverage(unittest.TestCase):
|
||||
)()
|
||||
self.assertEqual(eng._get_scheduler_unhandled_request_num(), 0)
|
||||
eng.llm_logger.debug.assert_called()
|
||||
|
||||
def test_insert_zmq_task_trace_carrier_handling(self):
|
||||
"""Cover lines 1164-1167: trace_carrier handling in _insert_zmq_task_to_scheduler."""
|
||||
cfg = self._make_cfg(splitwise_role="mixed")
|
||||
|
||||
class DummyQ:
|
||||
def __init__(self, *a, **k):
|
||||
self.available_prefill_instances = type("X", (), {"put": lambda *_: None})()
|
||||
|
||||
def get_server_port(self):
|
||||
return 0
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
|
||||
with patch("fastdeploy.engine.common_engine.EngineWorkerQueue", DummyQ):
|
||||
eng = EngineService(cfg, start_queue=False, use_async_llm=False)
|
||||
eng.running = True
|
||||
|
||||
# Mock data with trace_carrier to trigger lines 1164-1167
|
||||
test_request_id = "test_req_123"
|
||||
trace_carrier_data = {"trace_id": "abc123", "span_id": "def456"}
|
||||
mock_data_with_trace = {
|
||||
"request_id": test_request_id,
|
||||
"trace_carrier": trace_carrier_data,
|
||||
"status": None,
|
||||
"user": "test_user",
|
||||
}
|
||||
|
||||
class DummyRecv:
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
self.call_count = 0
|
||||
|
||||
def receive_json_once(self, block):
|
||||
self.call_count += 1
|
||||
if self.call_count == 1:
|
||||
return None, self.data
|
||||
else:
|
||||
eng.running = False
|
||||
return None, None
|
||||
|
||||
def receive_pyobj_once(self, block):
|
||||
return self.receive_json_once(block)
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
eng.recv_request_server = DummyRecv(mock_data_with_trace)
|
||||
|
||||
# Mock tracing.trace_set_proc_propagate_context to verify it's called
|
||||
with patch("fastdeploy.engine.common_engine.tracing.trace_set_proc_propagate_context") as mock_trace_set:
|
||||
with patch.object(eng, "llm_logger"):
|
||||
with patch("fastdeploy.engine.common_engine.Request") as MockRequest:
|
||||
mock_request = Mock()
|
||||
mock_request.metrics.scheduler_recv_req_time = 0
|
||||
MockRequest.from_dict.return_value = mock_request
|
||||
|
||||
with patch("fastdeploy.engine.common_engine.trace_print"):
|
||||
eng._insert_zmq_task_to_scheduler()
|
||||
|
||||
# Verify trace_set_proc_propagate_context was called with correct args (lines 1165-1167)
|
||||
mock_trace_set.assert_called_once()
|
||||
call_args = mock_trace_set.call_args
|
||||
# request_id should be "test" (first part after split on "_") and trace_carrier
|
||||
self.assertEqual(call_args[0][0], "test")
|
||||
self.assertEqual(call_args[0][1], trace_carrier_data)
|
||||
|
||||
# Reset and test without trace_carrier - should not call trace_set_proc_propagate_context
|
||||
eng.running = True
|
||||
mock_data_without_trace = {
|
||||
"request_id": "test_req_456",
|
||||
"status": None,
|
||||
"user": "test_user",
|
||||
}
|
||||
eng.recv_request_server = DummyRecv(mock_data_without_trace)
|
||||
|
||||
with patch("fastdeploy.engine.common_engine.tracing.trace_set_proc_propagate_context") as mock_trace_set:
|
||||
with patch.object(eng, "llm_logger"):
|
||||
with patch("fastdeploy.engine.common_engine.Request") as MockRequest:
|
||||
mock_request = Mock()
|
||||
mock_request.metrics.scheduler_recv_req_time = 0
|
||||
MockRequest.from_dict.return_value = mock_request
|
||||
|
||||
with patch("fastdeploy.engine.common_engine.trace_print"):
|
||||
eng._insert_zmq_task_to_scheduler()
|
||||
|
||||
# Verify trace_set_proc_propagate_context was NOT called when no trace_carrier
|
||||
mock_trace_set.assert_not_called()
|
||||
|
||||
if hasattr(eng, "_finalizer"):
|
||||
try:
|
||||
eng._finalizer.detach()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user