From 9c91ecb1ec81ff4dc69e4de596ec68e5a8cc49c4 Mon Sep 17 00:00:00 2001 From: qwes5s5 <45442318+qwes5s5@users.noreply.github.com> Date: Wed, 22 Apr 2026 15:49:51 +0800 Subject: [PATCH] [Cherry-Pick][BugFix] Fix bugs in /v1/abort_requests interface from PR(#6992) (#7176) (#7551) * abort api bug fix * bug fix * bug fix --- fastdeploy/engine/common_engine.py | 12 +++++++++++- fastdeploy/engine/sched/resource_manager_v1.py | 6 ++++-- fastdeploy/entrypoints/openai/protocol.py | 8 ++++---- tests/engine/test_common_engine.py | 4 ++++ 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index dabed9e434..5f91a31855 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -1587,13 +1587,14 @@ class EngineService: engine_recv_first_token_time=request.metrics.engine_recv_first_token_time if request.metrics else now, request_start_time=request.metrics.arrival_time if request.metrics else now, ) + eos_token_ids = getattr(request, "eos_token_ids", [0]) result = RequestOutput( request_id=req_id, finished=True, outputs=CompletionOutput( index=0, send_idx=len(partial_token_ids), - token_ids=[self.data_processor.eos_token_ids[0]], + token_ids=[eos_token_ids[0]], ), metrics=abort_metrics, error_code=200, @@ -1637,10 +1638,19 @@ class EngineService: reset progress state if any, then continue monitoring """ target_set = set(target_req_ids) + target_set = target_set & (set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys())) prev_remaining_count = len(target_set) last_progress_time = time.time() remaining = target_set & self.resource_manager.get_reqs_in_aborting() while remaining: + alive_reqs = set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys()) + finished_reqs = target_set - alive_reqs + if finished_reqs: + self.llm_logger.info(f"abort targets already finished, skip: {finished_reqs}") + for req_id in finished_reqs: + self.resource_manager.waiting_abort_req_id_set.discard(req_id) + self.resource_manager.to_be_aborted_req_id_set.discard(req_id) + target_set -= finished_reqs remaining = target_set & self.resource_manager.get_reqs_in_aborting() if not remaining: self.llm_logger.info(f"all {len(target_set)} abort reqs cleaned") diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index ffc9c0bacf..f3704a533b 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -281,7 +281,7 @@ class ResourceManagerV1(ResourceManager): self.stop_flags[request.idx] = True # 设置停止标志 del self.requests[request_id] del self.req_dict[request_id] - self.to_be_aborted_req_id_set.remove(request_id) + self.to_be_aborted_req_id_set.discard(request_id) self.update_metrics() def _trigger_abort(self, request_id, scheduled_reqs): @@ -293,7 +293,7 @@ class ResourceManagerV1(ResourceManager): abort_request.cached_block_num = 0 scheduled_reqs.append(self._prepare_abort_task(abort_request)) self.to_be_aborted_req_id_set.add(request_id) - self.waiting_abort_req_id_set.remove(request_id) + self.waiting_abort_req_id_set.discard(request_id) def _info_each_block(self): """ @@ -1544,6 +1544,8 @@ class ResourceManagerV1(ResourceManager): del self.requests[req_id] if req_id in self.req_dict: del self.req_dict[req_id] + self.waiting_abort_req_id_set.discard(req_id) + self.to_be_aborted_req_id_set.discard(req_id) # Do not block the main thread here # Write cache to storage if kvcache_storage_backend is enabled diff --git a/fastdeploy/entrypoints/openai/protocol.py b/fastdeploy/entrypoints/openai/protocol.py index 3560f3a8ae..b4e87e7a20 100644 --- a/fastdeploy/entrypoints/openai/protocol.py +++ b/fastdeploy/entrypoints/openai/protocol.py @@ -268,7 +268,7 @@ class ChatCompletionResponseChoice(BaseModel): logprobs: Optional[LogProbs] = None draft_logprobs: Optional[LogProbs] = None prompt_logprobs: Optional[PromptLogprobs] = None - finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] + finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] speculate_metrics: Optional[SpeculateMetrics] = None @@ -333,7 +333,7 @@ class ChatCompletionResponseStreamChoice(BaseModel): logprobs: Optional[LogProbs] = None draft_logprobs: Optional[LogProbs] = None prompt_logprobs: Optional[PromptLogprobs] = None - finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None + finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None arrival_time: Optional[float] = None speculate_metrics: Optional[SpeculateMetrics] = None @@ -369,7 +369,7 @@ class CompletionResponseChoice(BaseModel): draft_logprobs: Optional[CompletionLogprobs] = None prompt_logprobs: Optional[PromptLogprobs] = None reasoning_content: Optional[str] = None - finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None + finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None speculate_metrics: Optional[SpeculateMetrics] = None @@ -415,7 +415,7 @@ class CompletionResponseStreamChoice(BaseModel): prompt_tokens: Optional[str] = None completion_tokens: Optional[str] = None reasoning_content: Optional[str] = None - finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None + finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None speculate_metrics: Optional[SpeculateMetrics] = None diff --git a/tests/engine/test_common_engine.py b/tests/engine/test_common_engine.py index 8778e2013e..53bd8462d8 100644 --- a/tests/engine/test_common_engine.py +++ b/tests/engine/test_common_engine.py @@ -3700,6 +3700,8 @@ class TestCommonEngineAdditionalCoverage(unittest.TestCase): """_wait_abort_complete exits when background thread cleans up.""" eng = self._make_abort_engine() eng.resource_manager.waiting_abort_req_id_set = {"req-1_0"} + # Add the request to requests dict so it won't be filtered out + eng.resource_manager.requests = {"req-1_0": self._make_fake_request()} call_count = [0] @@ -3718,6 +3720,8 @@ class TestCommonEngineAdditionalCoverage(unittest.TestCase): """Stall timeout triggers force cleanup for requests in to_be_aborted_req_id_set.""" eng = self._make_abort_engine() eng.resource_manager.to_be_aborted_req_id_set = {"req-1_0"} + # Add the request to requests dict so it won't be filtered out + eng.resource_manager.requests = {"req-1_0": self._make_fake_request()} def mock_recycle(req_id): eng.resource_manager.to_be_aborted_req_id_set.discard(req_id)