mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2026-04-23 00:17:25 +08:00
[BugFix] Fix inaccurate cache hit rate and TTFT after request preemption (#6620)
* [chore] add has_been_rescheduled flag for requests * [refactor] rename reschedule to preempted for accuracy and fix cache hit metrics * [chore] add ttft_s
This commit is contained in:
@@ -509,17 +509,22 @@ class EngineService:
|
||||
if not is_decode:
|
||||
self.llm_logger.info(f"Tasks are sent to engine, req_ids={req_ids}")
|
||||
for task in tasks:
|
||||
task.metrics.inference_start_time = time.time()
|
||||
tracing.trace_report_span(
|
||||
tracing.TraceSpanName.SCHEDULE,
|
||||
task.request_id.split("_")[0],
|
||||
int(task.metrics.scheduler_recv_req_time * 1e9),
|
||||
int(task.metrics.inference_start_time * 1e9),
|
||||
thread_finish_flag=True,
|
||||
)
|
||||
trace_print(LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", ""))
|
||||
trace_print(LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", ""))
|
||||
trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", ""))
|
||||
if not getattr(task, "has_been_preempted_before", False):
|
||||
task.metrics.inference_start_time = time.time()
|
||||
tracing.trace_report_span(
|
||||
tracing.TraceSpanName.SCHEDULE,
|
||||
task.request_id.split("_")[0],
|
||||
int(task.metrics.scheduler_recv_req_time * 1e9),
|
||||
int(task.metrics.inference_start_time * 1e9),
|
||||
thread_finish_flag=True,
|
||||
)
|
||||
trace_print(LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", ""))
|
||||
trace_print(LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", ""))
|
||||
trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", ""))
|
||||
else:
|
||||
trace_print(
|
||||
LoggingEventName.RESCHEDULED_INFERENCE_START, task.request_id, getattr(task, "user", "")
|
||||
)
|
||||
if not is_prefill:
|
||||
if not self.cfg.model_config.enable_mm:
|
||||
self.update_requests_chunk_size(tasks)
|
||||
@@ -1022,28 +1027,37 @@ class EngineService:
|
||||
for task in tasks:
|
||||
if task.task_type == RequestType.PREFILL:
|
||||
rid = task.request_id.split("_")[0]
|
||||
trace_carrier = task.trace_carrier
|
||||
tracing.trace_set_proc_propagate_context(rid, trace_carrier)
|
||||
trace_carrier = tracing.trace_get_proc_propagate_context(rid)
|
||||
task.trace_carrier = trace_carrier
|
||||
tracing.trace_report_span(
|
||||
tracing.TraceSpanName.SCHEDULE,
|
||||
rid,
|
||||
int(task.metrics.scheduler_recv_req_time * 1e9),
|
||||
int(time.time() * 1e9),
|
||||
thread_finish_flag=True,
|
||||
)
|
||||
trace_print(
|
||||
LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", "")
|
||||
)
|
||||
trace_print(
|
||||
LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", "")
|
||||
)
|
||||
trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", ""))
|
||||
if isinstance(task, Request) and task.has_been_preempted_before:
|
||||
trace_print(
|
||||
LoggingEventName.RESCHEDULED_INFERENCE_START,
|
||||
task.request_id,
|
||||
getattr(task, "user", ""),
|
||||
)
|
||||
else:
|
||||
trace_carrier = task.trace_carrier
|
||||
tracing.trace_set_proc_propagate_context(rid, trace_carrier)
|
||||
trace_carrier = tracing.trace_get_proc_propagate_context(rid)
|
||||
task.trace_carrier = trace_carrier
|
||||
tracing.trace_report_span(
|
||||
tracing.TraceSpanName.SCHEDULE,
|
||||
rid,
|
||||
int(task.metrics.scheduler_recv_req_time * 1e9),
|
||||
int(time.time() * 1e9),
|
||||
thread_finish_flag=True,
|
||||
)
|
||||
trace_print(
|
||||
LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", "")
|
||||
)
|
||||
trace_print(
|
||||
LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", "")
|
||||
)
|
||||
trace_print(
|
||||
LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", "")
|
||||
)
|
||||
if isinstance(task, Request):
|
||||
if self.cfg.scheduler_config.splitwise_role == "decode":
|
||||
task.metrics.decode_inference_start_time = time.time()
|
||||
else:
|
||||
elif not task.has_been_preempted_before:
|
||||
task.metrics.inference_start_time = time.time()
|
||||
self.engine_worker_queue.put_tasks((tasks, self.resource_manager.real_bsz))
|
||||
|
||||
|
||||
@@ -184,6 +184,7 @@ class Request:
|
||||
# status
|
||||
self.status = RequestStatus.WAITING
|
||||
self.task_type = RequestType.PREFILL
|
||||
self.has_been_preempted_before = False
|
||||
self.idx = None
|
||||
self.need_prefill_tokens = self.prompt_token_ids_len
|
||||
self.audio_output_token_ids = []
|
||||
@@ -873,6 +874,7 @@ class RequestMetrics:
|
||||
storage_cache_token_num: Optional[int] = 0
|
||||
cpu_cache_prepare_time: Optional[float] = None
|
||||
storage_cache_prepare_time: Optional[float] = None
|
||||
preempted_count: int = 0
|
||||
|
||||
def __post_init__(self):
|
||||
if self.arrival_time is None:
|
||||
|
||||
@@ -46,6 +46,8 @@ from fastdeploy.inter_communicator import IPCSignal
|
||||
from fastdeploy.metrics.metrics import main_process_metrics
|
||||
from fastdeploy.multimodal.hasher import MultimodalHasher
|
||||
from fastdeploy.platforms import current_platform
|
||||
from fastdeploy.trace.constants import LoggingEventName
|
||||
from fastdeploy.trace.trace_logger import print as trace_print
|
||||
from fastdeploy.utils import download_from_bos, init_bos_client, llm_logger
|
||||
|
||||
|
||||
@@ -246,6 +248,8 @@ class ResourceManagerV1(ResourceManager):
|
||||
llm_logger.debug(f"reschedule {request_id} into waiting queue")
|
||||
if request_id in self.to_be_rescheduled_request_id_set and request_id in self.requests:
|
||||
request = self.requests[request_id]
|
||||
request.has_been_preempted_before = True
|
||||
request.metrics.preempted_count += 1
|
||||
if process_func is not None:
|
||||
process_func(request)
|
||||
llm_logger.debug(f"self.waiting append request:{request.request_id},req.type:{request.status}")
|
||||
@@ -284,6 +288,7 @@ class ResourceManagerV1(ResourceManager):
|
||||
self._free_blocks(req)
|
||||
req.cached_block_num = 0
|
||||
self.to_be_rescheduled_request_id_set.add(req.request_id)
|
||||
trace_print(LoggingEventName.PREEMPTED, req.request_id, getattr(req, "user", ""))
|
||||
preempted_reqs.append(self._prepare_preempt_task(req))
|
||||
return preempted_reqs
|
||||
|
||||
@@ -329,6 +334,9 @@ class ResourceManagerV1(ResourceManager):
|
||||
self._free_blocks(preempted_req)
|
||||
preempted_req.num_cached_blocks = 0
|
||||
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
|
||||
trace_print(
|
||||
LoggingEventName.PREEMPTED, preempted_req.request_id, getattr(preempted_req, "user", "")
|
||||
)
|
||||
llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}")
|
||||
preempted_reqs.append(preempted_req)
|
||||
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
|
||||
@@ -1189,16 +1197,17 @@ class ResourceManagerV1(ResourceManager):
|
||||
elif common_block_ids[block_idx] in metrics["match_storage_block_ids"]:
|
||||
metrics["storage_match_token_num"] -= self.config.cache_config.block_size
|
||||
|
||||
request.metrics.gpu_cache_token_num = metrics["gpu_match_token_num"]
|
||||
request.metrics.cpu_cache_token_num = metrics["cpu_match_token_num"]
|
||||
request.metrics.storage_cache_token_num = metrics["storage_match_token_num"]
|
||||
request.metrics.cpu_cache_prepare_time = metrics["cpu_cache_prepare_time"]
|
||||
request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"]
|
||||
if not request.has_been_preempted_before:
|
||||
# NOTE: Do not log or report metrics for cache hit rate when request is being rescheduled
|
||||
request.metrics.gpu_cache_token_num = metrics["gpu_match_token_num"]
|
||||
request.metrics.cpu_cache_token_num = metrics["cpu_match_token_num"]
|
||||
request.metrics.storage_cache_token_num = metrics["storage_match_token_num"]
|
||||
request.metrics.cpu_cache_prepare_time = metrics["cpu_cache_prepare_time"]
|
||||
request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"]
|
||||
|
||||
# Report the number of cached tokens to Prometheus metrics
|
||||
main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens)
|
||||
main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num)
|
||||
main_process_metrics.prefix_cpu_cache_token_num.inc(request.metrics.cpu_cache_token_num)
|
||||
main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens)
|
||||
main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num)
|
||||
main_process_metrics.prefix_cpu_cache_token_num.inc(request.metrics.cpu_cache_token_num)
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
|
||||
@@ -252,7 +252,8 @@ class TokenProcessor:
|
||||
f"Request={task_id}, InputToken={task.prompt_token_ids_len}, "
|
||||
f"CachedDetail={cached_detail}, OutputToken={self.tokens_counter[task_id]}, "
|
||||
f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, "
|
||||
f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}"
|
||||
f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, "
|
||||
f"PreemptedCount={getattr(task.metrics, 'preempted_count', 0)}"
|
||||
)
|
||||
|
||||
main_process_metrics.request_token_ratio.observe(token_ratio)
|
||||
@@ -943,11 +944,13 @@ class TokenProcessor:
|
||||
|
||||
# Print combined log with all required information
|
||||
ttft = task.metrics.first_token_time if task.metrics.first_token_time else 0
|
||||
ttft_s = ttft + task.metrics.time_in_queue
|
||||
llm_logger.info(
|
||||
f"Request={task_id}, InputToken={task.prompt_token_ids_len}, "
|
||||
f"CachedDetail={cached_detail}, OutputToken={self.tokens_counter[task_id]}, "
|
||||
f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, "
|
||||
f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}"
|
||||
f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, TTFT_S={ttft_s:.2f}, "
|
||||
f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, "
|
||||
f"PreemptedCount={getattr(task.metrics, 'preempted_count', 0)}"
|
||||
)
|
||||
|
||||
main_process_metrics.request_token_ratio.observe(token_ratio)
|
||||
|
||||
@@ -34,6 +34,8 @@ class LoggingEventName(Enum):
|
||||
INFERENCE_END = "INFERENCE_END"
|
||||
POSTPROCESSING_START = "POSTPROCESSING_START"
|
||||
POSTPROCESSING_END = "POSTPROCESSING_END"
|
||||
PREEMPTED = "PREEMPTED"
|
||||
RESCHEDULED_INFERENCE_START = "RESCHEDULED_INFERENCE_START"
|
||||
|
||||
|
||||
class StageName(Enum):
|
||||
@@ -60,6 +62,8 @@ EVENT_TO_STAGE_MAP = {
|
||||
LoggingEventName.INFERENCE_START: StageName.PREFILL,
|
||||
LoggingEventName.FIRST_TOKEN_GENERATED: StageName.PREFILL,
|
||||
LoggingEventName.DECODE_START: StageName.DECODE,
|
||||
LoggingEventName.PREEMPTED: StageName.DECODE,
|
||||
LoggingEventName.RESCHEDULED_INFERENCE_START: StageName.DECODE,
|
||||
LoggingEventName.INFERENCE_END: StageName.DECODE,
|
||||
LoggingEventName.POSTPROCESSING_START: StageName.POSTPROCESSING,
|
||||
LoggingEventName.POSTPROCESSING_END: StageName.POSTPROCESSING,
|
||||
|
||||
Reference in New Issue
Block a user