diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 2244fea060..6398b03ea1 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -509,17 +509,22 @@ class EngineService: if not is_decode: self.llm_logger.info(f"Tasks are sent to engine, req_ids={req_ids}") for task in tasks: - task.metrics.inference_start_time = time.time() - tracing.trace_report_span( - tracing.TraceSpanName.SCHEDULE, - task.request_id.split("_")[0], - int(task.metrics.scheduler_recv_req_time * 1e9), - int(task.metrics.inference_start_time * 1e9), - thread_finish_flag=True, - ) - trace_print(LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", "")) - trace_print(LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", "")) - trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", "")) + if not getattr(task, "has_been_preempted_before", False): + task.metrics.inference_start_time = time.time() + tracing.trace_report_span( + tracing.TraceSpanName.SCHEDULE, + task.request_id.split("_")[0], + int(task.metrics.scheduler_recv_req_time * 1e9), + int(task.metrics.inference_start_time * 1e9), + thread_finish_flag=True, + ) + trace_print(LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", "")) + trace_print(LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", "")) + trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", "")) + else: + trace_print( + LoggingEventName.RESCHEDULED_INFERENCE_START, task.request_id, getattr(task, "user", "") + ) if not is_prefill: if not self.cfg.model_config.enable_mm: self.update_requests_chunk_size(tasks) @@ -1022,28 +1027,37 @@ class EngineService: for task in tasks: if task.task_type == RequestType.PREFILL: rid = task.request_id.split("_")[0] - trace_carrier = task.trace_carrier - tracing.trace_set_proc_propagate_context(rid, trace_carrier) - trace_carrier = tracing.trace_get_proc_propagate_context(rid) - task.trace_carrier = trace_carrier - tracing.trace_report_span( - tracing.TraceSpanName.SCHEDULE, - rid, - int(task.metrics.scheduler_recv_req_time * 1e9), - int(time.time() * 1e9), - thread_finish_flag=True, - ) - trace_print( - LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", "") - ) - trace_print( - LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", "") - ) - trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", "")) + if isinstance(task, Request) and task.has_been_preempted_before: + trace_print( + LoggingEventName.RESCHEDULED_INFERENCE_START, + task.request_id, + getattr(task, "user", ""), + ) + else: + trace_carrier = task.trace_carrier + tracing.trace_set_proc_propagate_context(rid, trace_carrier) + trace_carrier = tracing.trace_get_proc_propagate_context(rid) + task.trace_carrier = trace_carrier + tracing.trace_report_span( + tracing.TraceSpanName.SCHEDULE, + rid, + int(task.metrics.scheduler_recv_req_time * 1e9), + int(time.time() * 1e9), + thread_finish_flag=True, + ) + trace_print( + LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", "") + ) + trace_print( + LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", "") + ) + trace_print( + LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", "") + ) if isinstance(task, Request): if self.cfg.scheduler_config.splitwise_role == "decode": task.metrics.decode_inference_start_time = time.time() - else: + elif not task.has_been_preempted_before: task.metrics.inference_start_time = time.time() self.engine_worker_queue.put_tasks((tasks, self.resource_manager.real_bsz)) diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index f6fc9d5162..d84338e559 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -184,6 +184,7 @@ class Request: # status self.status = RequestStatus.WAITING self.task_type = RequestType.PREFILL + self.has_been_preempted_before = False self.idx = None self.need_prefill_tokens = self.prompt_token_ids_len self.audio_output_token_ids = [] @@ -873,6 +874,7 @@ class RequestMetrics: storage_cache_token_num: Optional[int] = 0 cpu_cache_prepare_time: Optional[float] = None storage_cache_prepare_time: Optional[float] = None + preempted_count: int = 0 def __post_init__(self): if self.arrival_time is None: diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index aea09262c4..27d5308f1b 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -46,6 +46,8 @@ from fastdeploy.inter_communicator import IPCSignal from fastdeploy.metrics.metrics import main_process_metrics from fastdeploy.multimodal.hasher import MultimodalHasher from fastdeploy.platforms import current_platform +from fastdeploy.trace.constants import LoggingEventName +from fastdeploy.trace.trace_logger import print as trace_print from fastdeploy.utils import download_from_bos, init_bos_client, llm_logger @@ -246,6 +248,8 @@ class ResourceManagerV1(ResourceManager): llm_logger.debug(f"reschedule {request_id} into waiting queue") if request_id in self.to_be_rescheduled_request_id_set and request_id in self.requests: request = self.requests[request_id] + request.has_been_preempted_before = True + request.metrics.preempted_count += 1 if process_func is not None: process_func(request) llm_logger.debug(f"self.waiting append request:{request.request_id},req.type:{request.status}") @@ -284,6 +288,7 @@ class ResourceManagerV1(ResourceManager): self._free_blocks(req) req.cached_block_num = 0 self.to_be_rescheduled_request_id_set.add(req.request_id) + trace_print(LoggingEventName.PREEMPTED, req.request_id, getattr(req, "user", "")) preempted_reqs.append(self._prepare_preempt_task(req)) return preempted_reqs @@ -329,6 +334,9 @@ class ResourceManagerV1(ResourceManager): self._free_blocks(preempted_req) preempted_req.num_cached_blocks = 0 self.to_be_rescheduled_request_id_set.add(preempted_req.request_id) + trace_print( + LoggingEventName.PREEMPTED, preempted_req.request_id, getattr(preempted_req, "user", "") + ) llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}") preempted_reqs.append(preempted_req) scheduled_reqs.append(self._prepare_preempt_task(preempted_req)) @@ -1189,16 +1197,17 @@ class ResourceManagerV1(ResourceManager): elif common_block_ids[block_idx] in metrics["match_storage_block_ids"]: metrics["storage_match_token_num"] -= self.config.cache_config.block_size - request.metrics.gpu_cache_token_num = metrics["gpu_match_token_num"] - request.metrics.cpu_cache_token_num = metrics["cpu_match_token_num"] - request.metrics.storage_cache_token_num = metrics["storage_match_token_num"] - request.metrics.cpu_cache_prepare_time = metrics["cpu_cache_prepare_time"] - request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"] + if not request.has_been_preempted_before: + # NOTE: Do not log or report metrics for cache hit rate when request is being rescheduled + request.metrics.gpu_cache_token_num = metrics["gpu_match_token_num"] + request.metrics.cpu_cache_token_num = metrics["cpu_match_token_num"] + request.metrics.storage_cache_token_num = metrics["storage_match_token_num"] + request.metrics.cpu_cache_prepare_time = metrics["cpu_cache_prepare_time"] + request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"] - # Report the number of cached tokens to Prometheus metrics - main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens) - main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num) - main_process_metrics.prefix_cpu_cache_token_num.inc(request.metrics.cpu_cache_token_num) + main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens) + main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num) + main_process_metrics.prefix_cpu_cache_token_num.inc(request.metrics.cpu_cache_token_num) return True except Exception as e: diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 86e697bebd..39b86c2e44 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -252,7 +252,8 @@ class TokenProcessor: f"Request={task_id}, InputToken={task.prompt_token_ids_len}, " f"CachedDetail={cached_detail}, OutputToken={self.tokens_counter[task_id]}, " f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, " - f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}" + f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, " + f"PreemptedCount={getattr(task.metrics, 'preempted_count', 0)}" ) main_process_metrics.request_token_ratio.observe(token_ratio) @@ -943,11 +944,13 @@ class TokenProcessor: # Print combined log with all required information ttft = task.metrics.first_token_time if task.metrics.first_token_time else 0 + ttft_s = ttft + task.metrics.time_in_queue llm_logger.info( f"Request={task_id}, InputToken={task.prompt_token_ids_len}, " f"CachedDetail={cached_detail}, OutputToken={self.tokens_counter[task_id]}, " - f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, " - f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}" + f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, TTFT_S={ttft_s:.2f}, " + f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, " + f"PreemptedCount={getattr(task.metrics, 'preempted_count', 0)}" ) main_process_metrics.request_token_ratio.observe(token_ratio) diff --git a/fastdeploy/trace/constants.py b/fastdeploy/trace/constants.py index a503dfc47a..eaf54d6808 100644 --- a/fastdeploy/trace/constants.py +++ b/fastdeploy/trace/constants.py @@ -34,6 +34,8 @@ class LoggingEventName(Enum): INFERENCE_END = "INFERENCE_END" POSTPROCESSING_START = "POSTPROCESSING_START" POSTPROCESSING_END = "POSTPROCESSING_END" + PREEMPTED = "PREEMPTED" + RESCHEDULED_INFERENCE_START = "RESCHEDULED_INFERENCE_START" class StageName(Enum): @@ -60,6 +62,8 @@ EVENT_TO_STAGE_MAP = { LoggingEventName.INFERENCE_START: StageName.PREFILL, LoggingEventName.FIRST_TOKEN_GENERATED: StageName.PREFILL, LoggingEventName.DECODE_START: StageName.DECODE, + LoggingEventName.PREEMPTED: StageName.DECODE, + LoggingEventName.RESCHEDULED_INFERENCE_START: StageName.DECODE, LoggingEventName.INFERENCE_END: StageName.DECODE, LoggingEventName.POSTPROCESSING_START: StageName.POSTPROCESSING, LoggingEventName.POSTPROCESSING_END: StageName.POSTPROCESSING,