diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 4850fdfe06..ad2f6bf845 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -49,6 +49,7 @@ from fastdeploy.engine.request import ( ) from fastdeploy.engine.resource_manager import ResourceManager from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1 +from fastdeploy.engine.sched.scheduler_metrics_logger import SchedulerMetricsLogger from fastdeploy.eplb.utils import init_eplb_signals from fastdeploy.input.preprocess import InputPreprocessor from fastdeploy.inter_communicator import ( @@ -146,6 +147,13 @@ class EngineService: ) self.token_processor.set_resource_manager(self.resource_manager) + self.scheduler_metrics_logger = SchedulerMetricsLogger( + enabled=True, + dp_rank=self.cfg.parallel_config.local_data_parallel_id, + ) + self.resource_manager.scheduler_metrics_logger = self.scheduler_metrics_logger + self.token_processor.set_scheduler_metrics_logger(self.scheduler_metrics_logger) + self.partial_chunked_tokens = [0] * (self.cfg.max_num_partial_prefills + 1) for idx in range(1, self.cfg.max_num_partial_prefills + 1): self.partial_chunked_tokens[idx] = ( diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index d7bb878eb4..5ad6854f4d 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -957,6 +957,36 @@ class ResourceManagerV1(ResourceManager): if self.current_reserve_output_block_num == 0: self.can_relax_prefill_strategy = True + if hasattr(self, "scheduler_metrics_logger") and self.scheduler_metrics_logger is not None: + total_blocks = self.total_block_number() + free_blocks = self.available_block_num() + used_blocks = max(total_blocks - free_blocks, 0) + tokens_used = used_blocks * self.config.cache_config.block_size + token_usage = used_blocks / total_blocks if total_blocks > 0 else 0.0 + running_cnt = len(self.running) + queue_cnt = len(self.waiting) + + prefill_reqs = [ + r for r in scheduled_reqs if isinstance(r, Request) and r.task_type == RequestType.PREFILL + ] + has_decode = any(getattr(r, "task_type", None) == RequestType.DECODE for r in scheduled_reqs) + + self.scheduler_metrics_logger.log_prefill_batch( + prefill_reqs=prefill_reqs, + running_cnt=running_cnt, + queue_cnt=queue_cnt, + tokens_used=tokens_used, + token_usage=token_usage, + ) + if has_decode: + self.scheduler_metrics_logger.log_decode_batch( + running_cnt=running_cnt, + queue_cnt=queue_cnt, + tokens_used=tokens_used, + token_usage=token_usage, + use_cudagraph=self.config.graph_opt_config.use_cudagraph, + ) + self.update_metrics() return scheduled_reqs, error_reqs diff --git a/fastdeploy/engine/sched/scheduler_metrics_logger.py b/fastdeploy/engine/sched/scheduler_metrics_logger.py new file mode 100644 index 0000000000..c896e6bf16 --- /dev/null +++ b/fastdeploy/engine/sched/scheduler_metrics_logger.py @@ -0,0 +1,124 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +import logging +import threading +import time +from typing import Iterable + + +class SchedulerMetricsLogger: + """ + Lightweight console logger for scheduler-level prefill/decode metrics. + """ + + def __init__(self, enabled: bool = True, dp_rank: int = 0) -> None: + self.enabled = enabled + self.dp_rank = dp_rank + self._lock = threading.Lock() + self._last_decode_tic = time.perf_counter() + self._decode_tokens_since_last = 0 + self._logger = self._get_logger() + + def _get_logger(self) -> logging.Logger: + logger = logging.getLogger("fastdeploy.scheduler_metrics") + if not getattr(logger, "_fd_scheduler_metrics_configured", False): + logger.setLevel(logging.INFO) + logger.propagate = False + handler = logging.StreamHandler() + formatter = logging.Formatter( + "[%(asctime)s] [%(process)d] [%(levelname)s] %(message)s", + "%Y-%m-%d %H:%M:%S", + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger._fd_scheduler_metrics_configured = True + return logger + + def on_decode_tokens(self, num_tokens: int) -> None: + if not self.enabled: + return + if num_tokens <= 0: + return + with self._lock: + self._decode_tokens_since_last += num_tokens + + def log_prefill_batch( + self, + prefill_reqs: Iterable, + running_cnt: int, + queue_cnt: int, + tokens_used: int, + token_usage: float, + ) -> None: + if not self.enabled: + return + prefill_reqs = list(prefill_reqs) + if not prefill_reqs: + return + + new_tokens = 0 + cached_tokens = 0 + for req in prefill_reqs: + start = getattr(req, "prefill_start_index", 0) or 0 + end = getattr(req, "prefill_end_index", 0) or 0 + if end > start: + new_tokens += end - start + cached_tokens += getattr(req, "num_cached_tokens", 0) or 0 + + msg = ( + "Prefill batch, " + f"dp_rank: {self.dp_rank}, " + f"#new-seq: {len(prefill_reqs)}, " + f"#new-token: {new_tokens}, " + f"#cached-token: {cached_tokens}, " + f"token usage: {token_usage:.2f}, " + f"#running-req: {running_cnt}, " + f"#queue-req: {queue_cnt}, " + ) + self._logger.info(msg) + + def log_decode_batch( + self, + running_cnt: int, + queue_cnt: int, + tokens_used: int, + token_usage: float, + use_cudagraph: bool, + ) -> None: + if not self.enabled: + return + with self._lock: + now = time.perf_counter() + elapsed = now - self._last_decode_tic + if elapsed > 0: + throughput = self._decode_tokens_since_last / elapsed + else: + throughput = 0.0 + self._decode_tokens_since_last = 0 + self._last_decode_tic = now + + msg = ( + "Decode batch, " + f"dp_rank: {self.dp_rank}, " + f"#running-req: {running_cnt}, " + f"#token: {tokens_used}, " + f"token usage: {token_usage:.2f}, " + f"cuda graph: {use_cudagraph}, " + f"gen throughput (token/s): {throughput:.2f}, " + f"#queue-req: {queue_cnt}, " + ) + self._logger.info(msg) diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 73c20dd457..e499e7b54a 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -66,6 +66,7 @@ class TokenProcessor: self.cfg = cfg self.cached_generated_tokens = cached_generated_tokens self.resource_manager = None + self.scheduler_metrics_logger = None self.engine_worker_queue = engine_worker_queue self.tokens_counter = Counter() self.split_connector = split_connector @@ -160,6 +161,16 @@ class TokenProcessor: assert self.resource_manager is None, "The resource manager is not None, cannot set again." self.resource_manager = resource_manager + def set_scheduler_metrics_logger(self, scheduler_metrics_logger): + self.scheduler_metrics_logger = scheduler_metrics_logger + + def _is_decode_stage(self, task): + if task is None: + return False + if task.need_prefill_tokens is None: + return False + return task.num_computed_tokens >= task.need_prefill_tokens + def run(self): """ start thread to get tokens @@ -790,6 +801,9 @@ class TokenProcessor: self.resource_manager.reschedule_preempt_task(task_id) continue + if self.scheduler_metrics_logger and self._is_decode_stage(task): + self.scheduler_metrics_logger.on_decode_tokens(len(token_ids)) + if task.get("prefill_chunk_info", None) is not None: prefill_chunk_num = task.get("prefill_chunk_num", 0) task.prefill_chunk_num = prefill_chunk_num + 1 diff --git a/tests/engine/test_scheduler_metrics_logger.py b/tests/engine/test_scheduler_metrics_logger.py new file mode 100644 index 0000000000..35daeb291c --- /dev/null +++ b/tests/engine/test_scheduler_metrics_logger.py @@ -0,0 +1,74 @@ +""" +Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import types +from unittest import mock + +from fastdeploy.engine.sched.scheduler_metrics_logger import SchedulerMetricsLogger + + +def test_on_decode_tokens_accumulates(): + logger = SchedulerMetricsLogger(enabled=True, dp_rank=0) + logger._decode_tokens_since_last = 0 + + logger.on_decode_tokens(3) + logger.on_decode_tokens(0) + logger.on_decode_tokens(-1) + + assert logger._decode_tokens_since_last == 3 + + +def test_log_prefill_batch_logs_expected_message(): + logger = SchedulerMetricsLogger(enabled=True, dp_rank=2) + logger._logger = mock.Mock() + + reqs = [ + types.SimpleNamespace(prefill_start_index=0, prefill_end_index=4, num_cached_tokens=2), + types.SimpleNamespace(prefill_start_index=3, prefill_end_index=3, num_cached_tokens=1), + ] + + logger.log_prefill_batch(prefill_reqs=reqs, running_cnt=5, queue_cnt=6, tokens_used=10, token_usage=0.75) + + logger._logger.info.assert_called_once() + message = logger._logger.info.call_args[0][0] + assert "Prefill batch" in message + assert "dp_rank: 2" in message + assert "#new-seq: 2" in message + assert "#new-token: 4" in message + assert "#cached-token: 3" in message + assert "token usage: 0.75" in message + assert "#running-req: 5" in message + assert "#queue-req: 6" in message + + +def test_log_decode_batch_computes_throughput(monkeypatch): + logger = SchedulerMetricsLogger(enabled=True, dp_rank=1) + logger._logger = mock.Mock() + logger._decode_tokens_since_last = 10 + logger._last_decode_tic = 1.0 + + monkeypatch.setattr("fastdeploy.engine.sched.scheduler_metrics_logger.time.perf_counter", lambda: 3.0) + + logger.log_decode_batch(running_cnt=4, queue_cnt=7, tokens_used=8, token_usage=0.5, use_cudagraph=True) + + logger._logger.info.assert_called_once() + message = logger._logger.info.call_args[0][0] + assert "Decode batch" in message + assert "dp_rank: 1" in message + assert "gen throughput (token/s): 5.00" in message + assert "#queue-req: 7" in message + assert logger._decode_tokens_since_last == 0 + assert logger._last_decode_tic == 3.0 diff --git a/tests/output/test_process_batch_output.py b/tests/output/test_process_batch_output.py index 96e58bd521..6859259387 100644 --- a/tests/output/test_process_batch_output.py +++ b/tests/output/test_process_batch_output.py @@ -146,6 +146,7 @@ class TestTokenProcessorProcessBatchOutput(unittest.TestCase): processor.engine_worker_queue = Mock() processor.split_connector = Mock() processor.resource_manager = MockResourceManager() + processor.scheduler_metrics_logger = None task1 = MockTask() task2 = MockTask() processor.resource_manager.tasks_list = [task1, task2] diff --git a/tests/output/test_token_processor.py b/tests/output/test_token_processor.py index c30c39392e..e8ff821a26 100644 --- a/tests/output/test_token_processor.py +++ b/tests/output/test_token_processor.py @@ -199,6 +199,21 @@ def test_init_allocates_expected_buffers(): processor_logprob, _, _, _ = _make_processor(enable_logprob=True) assert list(processor_logprob.output_scores.shape) == [MAX_BSZ * (K + 1), 1] + +def test_is_decode_stage(): + processor, _, _, _ = _make_processor() + + assert processor._is_decode_stage(None) is False + + task = types.SimpleNamespace(need_prefill_tokens=None, num_computed_tokens=0) + assert processor._is_decode_stage(task) is False + + task = types.SimpleNamespace(need_prefill_tokens=4, num_computed_tokens=3) + assert processor._is_decode_stage(task) is False + + task = types.SimpleNamespace(need_prefill_tokens=4, num_computed_tokens=4) + assert processor._is_decode_stage(task) is True + processor_spec, _, _, _ = _make_processor(speculative_method="mtp", enable_logprob=False) assert processor_spec.output_tokens.shape[0] == SPECULATE_MAX_BSZ * MAX_DRAFT_TOKENS + SPECULATE_MAX_BSZ + 2