[Feature] console print statistical metrics (#6339)

* [Feature] console print statistical data

* [Feature] console print statistical data v2 dp_rank

* [Feature] console print statistical data v2 unittest

* [Feature] console print statistical data v3 unittest
This commit is contained in:
CSWYF3634076
2026-02-05 19:20:36 +08:00
committed by GitHub
parent de02a909c8
commit 1c0a2b055f
7 changed files with 266 additions and 0 deletions
+8
View File
@@ -49,6 +49,7 @@ from fastdeploy.engine.request import (
)
from fastdeploy.engine.resource_manager import ResourceManager
from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1
from fastdeploy.engine.sched.scheduler_metrics_logger import SchedulerMetricsLogger
from fastdeploy.eplb.utils import init_eplb_signals
from fastdeploy.input.preprocess import InputPreprocessor
from fastdeploy.inter_communicator import (
@@ -146,6 +147,13 @@ class EngineService:
)
self.token_processor.set_resource_manager(self.resource_manager)
self.scheduler_metrics_logger = SchedulerMetricsLogger(
enabled=True,
dp_rank=self.cfg.parallel_config.local_data_parallel_id,
)
self.resource_manager.scheduler_metrics_logger = self.scheduler_metrics_logger
self.token_processor.set_scheduler_metrics_logger(self.scheduler_metrics_logger)
self.partial_chunked_tokens = [0] * (self.cfg.max_num_partial_prefills + 1)
for idx in range(1, self.cfg.max_num_partial_prefills + 1):
self.partial_chunked_tokens[idx] = (
@@ -957,6 +957,36 @@ class ResourceManagerV1(ResourceManager):
if self.current_reserve_output_block_num == 0:
self.can_relax_prefill_strategy = True
if hasattr(self, "scheduler_metrics_logger") and self.scheduler_metrics_logger is not None:
total_blocks = self.total_block_number()
free_blocks = self.available_block_num()
used_blocks = max(total_blocks - free_blocks, 0)
tokens_used = used_blocks * self.config.cache_config.block_size
token_usage = used_blocks / total_blocks if total_blocks > 0 else 0.0
running_cnt = len(self.running)
queue_cnt = len(self.waiting)
prefill_reqs = [
r for r in scheduled_reqs if isinstance(r, Request) and r.task_type == RequestType.PREFILL
]
has_decode = any(getattr(r, "task_type", None) == RequestType.DECODE for r in scheduled_reqs)
self.scheduler_metrics_logger.log_prefill_batch(
prefill_reqs=prefill_reqs,
running_cnt=running_cnt,
queue_cnt=queue_cnt,
tokens_used=tokens_used,
token_usage=token_usage,
)
if has_decode:
self.scheduler_metrics_logger.log_decode_batch(
running_cnt=running_cnt,
queue_cnt=queue_cnt,
tokens_used=tokens_used,
token_usage=token_usage,
use_cudagraph=self.config.graph_opt_config.use_cudagraph,
)
self.update_metrics()
return scheduled_reqs, error_reqs
@@ -0,0 +1,124 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import logging
import threading
import time
from typing import Iterable
class SchedulerMetricsLogger:
"""
Lightweight console logger for scheduler-level prefill/decode metrics.
"""
def __init__(self, enabled: bool = True, dp_rank: int = 0) -> None:
self.enabled = enabled
self.dp_rank = dp_rank
self._lock = threading.Lock()
self._last_decode_tic = time.perf_counter()
self._decode_tokens_since_last = 0
self._logger = self._get_logger()
def _get_logger(self) -> logging.Logger:
logger = logging.getLogger("fastdeploy.scheduler_metrics")
if not getattr(logger, "_fd_scheduler_metrics_configured", False):
logger.setLevel(logging.INFO)
logger.propagate = False
handler = logging.StreamHandler()
formatter = logging.Formatter(
"[%(asctime)s] [%(process)d] [%(levelname)s] %(message)s",
"%Y-%m-%d %H:%M:%S",
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger._fd_scheduler_metrics_configured = True
return logger
def on_decode_tokens(self, num_tokens: int) -> None:
if not self.enabled:
return
if num_tokens <= 0:
return
with self._lock:
self._decode_tokens_since_last += num_tokens
def log_prefill_batch(
self,
prefill_reqs: Iterable,
running_cnt: int,
queue_cnt: int,
tokens_used: int,
token_usage: float,
) -> None:
if not self.enabled:
return
prefill_reqs = list(prefill_reqs)
if not prefill_reqs:
return
new_tokens = 0
cached_tokens = 0
for req in prefill_reqs:
start = getattr(req, "prefill_start_index", 0) or 0
end = getattr(req, "prefill_end_index", 0) or 0
if end > start:
new_tokens += end - start
cached_tokens += getattr(req, "num_cached_tokens", 0) or 0
msg = (
"Prefill batch, "
f"dp_rank: {self.dp_rank}, "
f"#new-seq: {len(prefill_reqs)}, "
f"#new-token: {new_tokens}, "
f"#cached-token: {cached_tokens}, "
f"token usage: {token_usage:.2f}, "
f"#running-req: {running_cnt}, "
f"#queue-req: {queue_cnt}, "
)
self._logger.info(msg)
def log_decode_batch(
self,
running_cnt: int,
queue_cnt: int,
tokens_used: int,
token_usage: float,
use_cudagraph: bool,
) -> None:
if not self.enabled:
return
with self._lock:
now = time.perf_counter()
elapsed = now - self._last_decode_tic
if elapsed > 0:
throughput = self._decode_tokens_since_last / elapsed
else:
throughput = 0.0
self._decode_tokens_since_last = 0
self._last_decode_tic = now
msg = (
"Decode batch, "
f"dp_rank: {self.dp_rank}, "
f"#running-req: {running_cnt}, "
f"#token: {tokens_used}, "
f"token usage: {token_usage:.2f}, "
f"cuda graph: {use_cudagraph}, "
f"gen throughput (token/s): {throughput:.2f}, "
f"#queue-req: {queue_cnt}, "
)
self._logger.info(msg)
+14
View File
@@ -66,6 +66,7 @@ class TokenProcessor:
self.cfg = cfg
self.cached_generated_tokens = cached_generated_tokens
self.resource_manager = None
self.scheduler_metrics_logger = None
self.engine_worker_queue = engine_worker_queue
self.tokens_counter = Counter()
self.split_connector = split_connector
@@ -160,6 +161,16 @@ class TokenProcessor:
assert self.resource_manager is None, "The resource manager is not None, cannot set again."
self.resource_manager = resource_manager
def set_scheduler_metrics_logger(self, scheduler_metrics_logger):
self.scheduler_metrics_logger = scheduler_metrics_logger
def _is_decode_stage(self, task):
if task is None:
return False
if task.need_prefill_tokens is None:
return False
return task.num_computed_tokens >= task.need_prefill_tokens
def run(self):
"""
start thread to get tokens
@@ -790,6 +801,9 @@ class TokenProcessor:
self.resource_manager.reschedule_preempt_task(task_id)
continue
if self.scheduler_metrics_logger and self._is_decode_stage(task):
self.scheduler_metrics_logger.on_decode_tokens(len(token_ids))
if task.get("prefill_chunk_info", None) is not None:
prefill_chunk_num = task.get("prefill_chunk_num", 0)
task.prefill_chunk_num = prefill_chunk_num + 1
@@ -0,0 +1,74 @@
"""
Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import types
from unittest import mock
from fastdeploy.engine.sched.scheduler_metrics_logger import SchedulerMetricsLogger
def test_on_decode_tokens_accumulates():
logger = SchedulerMetricsLogger(enabled=True, dp_rank=0)
logger._decode_tokens_since_last = 0
logger.on_decode_tokens(3)
logger.on_decode_tokens(0)
logger.on_decode_tokens(-1)
assert logger._decode_tokens_since_last == 3
def test_log_prefill_batch_logs_expected_message():
logger = SchedulerMetricsLogger(enabled=True, dp_rank=2)
logger._logger = mock.Mock()
reqs = [
types.SimpleNamespace(prefill_start_index=0, prefill_end_index=4, num_cached_tokens=2),
types.SimpleNamespace(prefill_start_index=3, prefill_end_index=3, num_cached_tokens=1),
]
logger.log_prefill_batch(prefill_reqs=reqs, running_cnt=5, queue_cnt=6, tokens_used=10, token_usage=0.75)
logger._logger.info.assert_called_once()
message = logger._logger.info.call_args[0][0]
assert "Prefill batch" in message
assert "dp_rank: 2" in message
assert "#new-seq: 2" in message
assert "#new-token: 4" in message
assert "#cached-token: 3" in message
assert "token usage: 0.75" in message
assert "#running-req: 5" in message
assert "#queue-req: 6" in message
def test_log_decode_batch_computes_throughput(monkeypatch):
logger = SchedulerMetricsLogger(enabled=True, dp_rank=1)
logger._logger = mock.Mock()
logger._decode_tokens_since_last = 10
logger._last_decode_tic = 1.0
monkeypatch.setattr("fastdeploy.engine.sched.scheduler_metrics_logger.time.perf_counter", lambda: 3.0)
logger.log_decode_batch(running_cnt=4, queue_cnt=7, tokens_used=8, token_usage=0.5, use_cudagraph=True)
logger._logger.info.assert_called_once()
message = logger._logger.info.call_args[0][0]
assert "Decode batch" in message
assert "dp_rank: 1" in message
assert "gen throughput (token/s): 5.00" in message
assert "#queue-req: 7" in message
assert logger._decode_tokens_since_last == 0
assert logger._last_decode_tic == 3.0
@@ -146,6 +146,7 @@ class TestTokenProcessorProcessBatchOutput(unittest.TestCase):
processor.engine_worker_queue = Mock()
processor.split_connector = Mock()
processor.resource_manager = MockResourceManager()
processor.scheduler_metrics_logger = None
task1 = MockTask()
task2 = MockTask()
processor.resource_manager.tasks_list = [task1, task2]
+15
View File
@@ -199,6 +199,21 @@ def test_init_allocates_expected_buffers():
processor_logprob, _, _, _ = _make_processor(enable_logprob=True)
assert list(processor_logprob.output_scores.shape) == [MAX_BSZ * (K + 1), 1]
def test_is_decode_stage():
processor, _, _, _ = _make_processor()
assert processor._is_decode_stage(None) is False
task = types.SimpleNamespace(need_prefill_tokens=None, num_computed_tokens=0)
assert processor._is_decode_stage(task) is False
task = types.SimpleNamespace(need_prefill_tokens=4, num_computed_tokens=3)
assert processor._is_decode_stage(task) is False
task = types.SimpleNamespace(need_prefill_tokens=4, num_computed_tokens=4)
assert processor._is_decode_stage(task) is True
processor_spec, _, _, _ = _make_processor(speculative_method="mtp", enable_logprob=False)
assert processor_spec.output_tokens.shape[0] == SPECULATE_MAX_BSZ * MAX_DRAFT_TOKENS + SPECULATE_MAX_BSZ + 2