mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2026-04-23 00:17:25 +08:00
[Cherry-Pick][APIServer][Feature] Add configurable worker health check timeout via FD_WORKER_ALIVE_TIMEOUT(#5865) (#5867)
* Initial plan * Cherry-pick PR #5865: Add configurable worker health check timeout via FD_WORKER_ALIVE_TIMEOUT Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
This commit is contained in:
@@ -88,5 +88,8 @@ environment_variables: dict[str, Callable[[], Any]] = {
|
||||
|
||||
# Count for cache_transfer_manager process error
|
||||
"FD_CACHE_PROC_ERROR_COUNT": lambda: int(os.getenv("FD_CACHE_PROC_ERROR_COUNT", "10")),
|
||||
|
||||
# Worker process health check timeout when waiting for responses in seconds (default: 30)
|
||||
"FD_WORKER_ALIVE_TIMEOUT": lambda: int(os.getenv("FD_WORKER_ALIVE_TIMEOUT", "30")),
|
||||
}
|
||||
```
|
||||
|
||||
@@ -87,5 +87,9 @@ environment_variables: dict[str, Callable[[], Any]] = {
|
||||
"FD_CACHE_PROC_EXIT_TIMEOUT": lambda: int(os.getenv("FD_CACHE_PROC_EXIT_TIMEOUT", "600")),
|
||||
|
||||
# cache_transfer_manager 进程残留时连续错误阈值
|
||||
"FD_CACHE_PROC_ERROR_COUNT": lambda: int(os.getenv("FD_CACHE_PROC_ERROR_COUNT", "10")),}
|
||||
"FD_CACHE_PROC_ERROR_COUNT": lambda: int(os.getenv("FD_CACHE_PROC_ERROR_COUNT", "10")),
|
||||
|
||||
# Worker 进程响应等待时的健康检查超时时间(秒),默认 30 秒
|
||||
"FD_WORKER_ALIVE_TIMEOUT": lambda: int(os.getenv("FD_WORKER_ALIVE_TIMEOUT", "30")),
|
||||
}
|
||||
```
|
||||
|
||||
@@ -24,6 +24,7 @@ from typing import List, Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
import fastdeploy.envs as envs
|
||||
from fastdeploy.entrypoints.openai.protocol import (
|
||||
ChatCompletionRequest,
|
||||
ChatCompletionResponse,
|
||||
@@ -264,7 +265,7 @@ class OpenAIServingChat:
|
||||
except asyncio.TimeoutError:
|
||||
current_waiting_time += 10
|
||||
if current_waiting_time == 300:
|
||||
status, msg = self.engine_client.check_health()
|
||||
status, msg = self.engine_client.check_health(time_interval_threashold=envs.FD_WORKER_ALIVE_TIMEOUT)
|
||||
if not status:
|
||||
if choices:
|
||||
chunk.choices = choices
|
||||
@@ -557,7 +558,7 @@ class OpenAIServingChat:
|
||||
except asyncio.TimeoutError:
|
||||
current_waiting_time += 10
|
||||
if current_waiting_time == 300:
|
||||
status, msg = self.engine_client.check_health()
|
||||
status, msg = self.engine_client.check_health(time_interval_threashold=envs.FD_WORKER_ALIVE_TIMEOUT)
|
||||
if not status:
|
||||
raise ValueError(f"Engine is not healthy: {msg}")
|
||||
else:
|
||||
|
||||
@@ -25,6 +25,7 @@ from typing import List, Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
import fastdeploy.envs as envs
|
||||
from fastdeploy.engine.request import RequestOutput
|
||||
from fastdeploy.entrypoints.openai.protocol import (
|
||||
CompletionLogprobs,
|
||||
@@ -280,7 +281,7 @@ class OpenAIServingCompletion:
|
||||
except asyncio.TimeoutError:
|
||||
current_waiting_time += 10
|
||||
if current_waiting_time == 300:
|
||||
status, msg = self.engine_client.check_health()
|
||||
status, msg = self.engine_client.check_health(time_interval_threashold=envs.FD_WORKER_ALIVE_TIMEOUT)
|
||||
if not status:
|
||||
raise ValueError(f"Engine is not healthy: {msg}")
|
||||
else:
|
||||
@@ -436,7 +437,7 @@ class OpenAIServingCompletion:
|
||||
except asyncio.TimeoutError:
|
||||
current_waiting_time += 10
|
||||
if current_waiting_time == 300:
|
||||
status, msg = self.engine_client.check_health()
|
||||
status, msg = self.engine_client.check_health(time_interval_threashold=envs.FD_WORKER_ALIVE_TIMEOUT)
|
||||
if not status:
|
||||
raise ValueError(f"Engine is not healthy: {msg}")
|
||||
else:
|
||||
|
||||
@@ -151,6 +151,8 @@ environment_variables: dict[str, Callable[[], Any]] = {
|
||||
# "Number of tokens in the group for Mixture of Experts (MoE) computation processing on HPU"
|
||||
"FD_HPU_CHUNK_SIZE": lambda: int(os.getenv("FD_HPU_CHUNK_SIZE", "64")),
|
||||
"FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS": lambda: int(os.getenv("FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS", "30")),
|
||||
# Timeout for worker process health check in seconds
|
||||
"FD_WORKER_ALIVE_TIMEOUT": lambda: int(os.getenv("FD_WORKER_ALIVE_TIMEOUT", "30")),
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user