diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index cde2ad3e34..1b02f6a369 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -1614,13 +1614,14 @@ class EngineService: engine_recv_first_token_time=request.metrics.engine_recv_first_token_time if request.metrics else now, request_start_time=request.metrics.arrival_time if request.metrics else now, ) + eos_token_ids = getattr(request, "eos_token_ids", [0]) result = RequestOutput( request_id=req_id, finished=True, outputs=CompletionOutput( index=0, send_idx=len(partial_token_ids), - token_ids=[self.data_processor.eos_token_ids[0]], + token_ids=[eos_token_ids[0]], ), metrics=abort_metrics, error_code=200, @@ -1664,10 +1665,19 @@ class EngineService: reset progress state if any, then continue monitoring """ target_set = set(target_req_ids) + target_set = target_set & (set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys())) prev_remaining_count = len(target_set) last_progress_time = time.time() remaining = target_set & self.resource_manager.get_reqs_in_aborting() while remaining: + alive_reqs = set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys()) + finished_reqs = target_set - alive_reqs + if finished_reqs: + self.llm_logger.info(f"abort targets already finished, skip: {finished_reqs}") + for req_id in finished_reqs: + self.resource_manager.waiting_abort_req_id_set.discard(req_id) + self.resource_manager.to_be_aborted_req_id_set.discard(req_id) + target_set -= finished_reqs remaining = target_set & self.resource_manager.get_reqs_in_aborting() if not remaining: self.llm_logger.info(f"all {len(target_set)} abort reqs cleaned") diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 2048b29d6e..38bd25bed0 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -315,7 +315,7 @@ class ResourceManagerV1(ResourceManager): self.stop_flags[request.idx] = True # 设置停止标志 del self.requests[request_id] del self.req_dict[request_id] - self.to_be_aborted_req_id_set.remove(request_id) + self.to_be_aborted_req_id_set.discard(request_id) self.update_metrics() def _trigger_abort(self, request_id, batch_request): @@ -327,7 +327,7 @@ class ResourceManagerV1(ResourceManager): abort_request.cached_block_num = 0 batch_request.add_request(self._prepare_abort_task(abort_request)) self.to_be_aborted_req_id_set.add(request_id) - self.waiting_abort_req_id_set.remove(request_id) + self.waiting_abort_req_id_set.discard(request_id) def _info_each_block(self): """ @@ -1622,6 +1622,8 @@ class ResourceManagerV1(ResourceManager): del self.requests[req_id] if req_id in self.req_dict: del self.req_dict[req_id] + self.waiting_abort_req_id_set.discard(req_id) + self.to_be_aborted_req_id_set.discard(req_id) # Do not block the main thread here # Write cache to storage if kvcache_storage_backend is enabled diff --git a/fastdeploy/entrypoints/openai/protocol.py b/fastdeploy/entrypoints/openai/protocol.py index 9c21b05182..524d3cd997 100644 --- a/fastdeploy/entrypoints/openai/protocol.py +++ b/fastdeploy/entrypoints/openai/protocol.py @@ -270,7 +270,7 @@ class ChatCompletionResponseChoice(BaseModel): logprobs: Optional[LogProbs] = None draft_logprobs: Optional[LogProbs] = None prompt_logprobs: Optional[PromptLogprobs] = None - finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] + finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] speculate_metrics: Optional[SpeculateMetrics] = None @@ -335,7 +335,7 @@ class ChatCompletionResponseStreamChoice(BaseModel): logprobs: Optional[LogProbs] = None draft_logprobs: Optional[LogProbs] = None prompt_logprobs: Optional[PromptLogprobs] = None - finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None + finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None arrival_time: Optional[float] = None speculate_metrics: Optional[SpeculateMetrics] = None @@ -371,7 +371,7 @@ class CompletionResponseChoice(BaseModel): draft_logprobs: Optional[CompletionLogprobs] = None prompt_logprobs: Optional[PromptLogprobs] = None reasoning_content: Optional[str] = None - finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None + finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None speculate_metrics: Optional[SpeculateMetrics] = None @@ -417,7 +417,7 @@ class CompletionResponseStreamChoice(BaseModel): prompt_tokens: Optional[str] = None completion_tokens: Optional[str] = None reasoning_content: Optional[str] = None - finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] = None + finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop", "abort"]] = None tool_calls: Optional[List[DeltaToolCall | ToolCall]] = None speculate_metrics: Optional[SpeculateMetrics] = None diff --git a/tests/engine/test_common_engine.py b/tests/engine/test_common_engine.py index 8c1bfb50c9..ac30f26d9a 100644 --- a/tests/engine/test_common_engine.py +++ b/tests/engine/test_common_engine.py @@ -3700,6 +3700,8 @@ class TestCommonEngineAdditionalCoverage(unittest.TestCase): """_wait_abort_complete exits when background thread cleans up.""" eng = self._make_abort_engine() eng.resource_manager.waiting_abort_req_id_set = {"req-1_0"} + # Add the request to requests dict so it won't be filtered out + eng.resource_manager.requests = {"req-1_0": self._make_fake_request()} call_count = [0] @@ -3718,6 +3720,8 @@ class TestCommonEngineAdditionalCoverage(unittest.TestCase): """Stall timeout triggers force cleanup for requests in to_be_aborted_req_id_set.""" eng = self._make_abort_engine() eng.resource_manager.to_be_aborted_req_id_set = {"req-1_0"} + # Add the request to requests dict so it won't be filtered out + eng.resource_manager.requests = {"req-1_0": self._make_fake_request()} def mock_recycle(req_id): eng.resource_manager.to_be_aborted_req_id_set.discard(req_id)