[Feature] Enhance Router with /v1/completions, docs, scripts, and version info (#5966)

* [Doc] Update prerequisites in the documentation

* [Feature] Enhance Router with /v1/completions, docs, scripts, and version info

* [Feature] Enhance Router with /v1/completions, docs, scripts, and version info

---------

Co-authored-by: mouxin <mouxin@baidu.com>
This commit is contained in:
mouxin
2026-01-30 10:28:48 +08:00
committed by GitHub
parent c4abb01f9c
commit 506f1545cd
13 changed files with 1340 additions and 228 deletions
+247
View File
@@ -0,0 +1,247 @@
[简体中文](../zh/online_serving/router.md)
# Load-Balancing Scheduling Router
FastDeploy provides a Golang-based [Router](https://github.com/PaddlePaddle/FastDeploy/tree/develop/fastdeploy/golang_router) for request scheduling. The Router supports both centralized deployment and Prefill/Decode (PD) disaggregated deployment.。
## Installation
### 1. Prebuilt Binaries
Starting from FastDeploy v2.5.0, the official Docker images include the Go language environment required to build the Golang Router and also provide a precompiled Router binary. The Router binary is located by default in the `/usr/local/bin` directory and can be used directly without additional compilation. For installation details, please refer to the [FastDeploy Installation Guide](../get_started/installation/nvidia_gpu.md)
### 2. Build from Source
You need to build the Router from source in the following scenarios:
* The official Docker image is not used
* FastDeploy version is earlier than v2.5.0
* Custom modifications to the Router are required
Environment Requirements:
* Go >= 1.21
Clone the FastDeploy repository and build the Router:
```
git clone https://github.com/PaddlePaddle/FastDeploy.git
cd FastDeploy/fastdeploy/golang_router
bash build.sh
```
## Centralized Deployment
Start the Router service. The `--port` parameter specifies the scheduling port for centralized deployment.
```
./fd-router --port 30000
```
Start a mixed inference instance. Compared to standalone deployment, specify the Router endpoint via `--router`. Other parameters remain unchanged.
```
export CUDA_VISIBLE_DEVICES=0
export FD_LOG_DIR="log_mixed"
python -m fastdeploy.entrypoints.openai.api_server \
--model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \
--port 31000 \
--router "0.0.0.0:30000"
```
## PD Disaggregated Deployment
Start the Router service with PD disaggregation enabled using the `--splitwise` flag.
```
./fd-router \
--port 30000 \
--splitwise
```
Launch a prefill instance. Compared with standalone deployment, add the `--splitwise-role` parameter to specify the instance role as Prefill, and add the `--router` parameter to specify the Router endpoint. All other parameters remain the same as in standalone deployment.
```
export CUDA_VISIBLE_DEVICES=0
export FD_LOG_DIR="log_prefill"
python -m fastdeploy.entrypoints.openai.api_server \
--model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \
--port 31000 \
--splitwise-role prefill \
--router "0.0.0.0:30000"
```
Launch a decode instance.
```
export CUDA_VISIBLE_DEVICES=1
export FD_LOG_DIR="log_decode"
python -m fastdeploy.entrypoints.openai.api_server \
--model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \
--port 32000 \
--splitwise-role decode \
--router "0.0.0.0:30000"
```
Once both Prefill and Decode instances are successfully launched and registered with the Router, requests can be sent:
```
curl -X POST "http://0.0.0.0:30000/v1/chat/completions" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"role": "user", "content": "hello"}
],
"max_tokens": 100,
"stream": false
}'
```
For more details on PD disaggregated deployment, please refer to the [Usage Guide](../features/disaggregated.md)
## CacheAware
The Load-Balancing Scheduling Router supports the CacheAware strategy, mainly applied to PD separation deployment to optimize request allocation and improve cache hit rate.
To use the CacheAware strategy, default configurations need to be modified. You can copy the configuration template and make adjustments (an example is available at [Router](https://github.com/PaddlePaddle/FastDeploy/tree/develop/fastdeploy/golang_router) directory under examples/run_with_config):
```bash
pushd examples/run_with_config
cp config/config.example.yaml config/config.yaml
popd
```
Launch the Router with the custom configuration specified via `--config_path`:
```
./fd-router \
--port 30000 \
--splitwise \
--config_path examples/run_with_config/config/config.yaml
```
Prefill and Decode instance startup are the same as PD disaggregated deployment.
Launch the prefill instance.
```
export CUDA_VISIBLE_DEVICES=0
export FD_LOG_DIR="log_prefill"
python -m fastdeploy.entrypoints.openai.api_server \
--model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \
--port 31000 \
--splitwise-role prefill \
--router "0.0.0.0:30000"
```
Launch the decode instance.
```
export CUDA_VISIBLE_DEVICES=1
export FD_LOG_DIR="log_decode"
python -m fastdeploy.entrypoints.openai.api_server \
--model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \
--port 32000 \
--splitwise-role decode \
--router "0.0.0.0:30000"
```
## HTTP Service Description
The Router exposes a set of HTTP services to provide unified request scheduling, runtime health checking, and monitoring metrics, facilitating integration and operations.
| Method | Path | Description |
|----------|------|------|
| POST | `/v1/chat/completions` | Provide scheduling services for inference requests based on the Chat Completions API |
| POST | `/v1/completions` | Provide scheduling services for general text completion inference requests |
| POST | `/register` | Allow inference instances to register their metadata with the Router for scheduling |
| GET | `/registered` | Query the list of currently registered inference instances |
| GET | `/registered_number` | Query the number of currently registered inference instances |
| GET | `/health_generate` | Check the health status of registered Prefill / Decode inference instances |
| GET | `/metrics` | Provide Prometheus-formatted Router runtime metrics for monitoring and observability |
## Deployment Parameters
### Router Startup Parameters
* --port: Specify the Router scheduling port.
* --splitwise: Enable PD disaggregated scheduling mode.
* --config_path: Specify the Router configuration file path for loading custom scheduling and runtime parameters.
### Configuration File Preparation
Before using `--config_path`, prepare a configuration file that conforms to the Router specification.
The configuration file is typically written in YAML format. For detailed parameters, refer to [Configuration Parameteres](#configuration-parameteres)。You may copy and modify the configuration template (example available at examples/run_with_config)
```bash
cp config/config.example.yaml config/config.yaml
```
The Load-Balancing Scheduling Router also supports registering inference instances through configuration files at startup (example available at examples/run_with_default_workers):
```bash
cp config/config.example.yaml config/config.yaml
cp config/register.example.yaml config/register.yaml
```
### Configuration Parameteres
config.yaml example:
```yaml
server:
port: "8080" # Listening port
host: "0.0.0.0" # Listening address
mode: "debug" # Startup mode: debug, release, test
splitwise: true # true enables PD disaggregation; false disables it
scheduler:
policy: "power_of_two" # Scheduling policy (optional): random, power_of_two, round_robin, process_tokens, request_num, cache_aware, fd_metrics_score
prefill-policy: "cache_aware" # Prefill scheduling policy in PD mode
decode-policy: "fd_metrics_score" # Decode scheduling policy in PD mode
eviction-interval-secs: 60 # Cache eviction interval for CacheAware scheduling
balance-abs-threshold: 1 # Absolute threshold for CacheAware balancing
balance-rel-threshold: 0.2 # Relative threshold for CacheAware balancing
hit-ratio-weight: 1.0 # Cache hit ratio weight
load-balance-weight: 0.05 # Load balancing weight
cache-block-size: 4 # Cache block size
tokenizer-url: "http://0.0.0.0:8098" # Tokenizer service endpoint (optional)
tokenizer-timeout-secs: 2 # Tokenizer service timeout
waiting-weight: 10 # Waiting weight for CacheAware scheduling
manager:
health-failure-threshold: 3 # Number of failed health checks before marking unhealthy
health-success-threshold: 2 # Number of successful health checks before marking healthy
health-check-timeout-secs: 5 # Health check timeout
health-check-interval-secs: 5 # Health check interval
health-check-endpoint: /health # Health check endpoint
register-path: "config/register.yaml" # Path to instance registration config (optional)
log:
level: "info" # Log level: debug / info / warn / error
output: "file" # Log output: stdout / file
```
register.yaml example
```yaml
instances:
- role: "prefill"
host_ip: 127.0.0.1
port: 8097
connector_port: 8001
engine_worker_queue_port: 8002
transfer_protocol:
- ipc
- rdma
rdma_ports: [7100, "7101"]
device_ids: [0, "1"]
metrics_port: 8003
- role: "decode"
host_ip: 127.0.0.1
port: 8098
connector_port: 8001
engine_worker_queue_port: 8002
transfer_protocol: ["ipc","rdma"]
rdma_ports: ["7100", "7101"]
device_ids: ["0", "1"]
```
Instance Registration Parameters
* role: Instance role, one of: decode, prefill, mixed.
* host_ip: IP address of the inference instance host.
* port: Service port of the inference instance.
* connector_port: Connector port used for PD communication.
* engine_worker_queue_port: Shared queue communication port within the inference instance.
* transfer_protocol: Specify KV Cache transfer protocol, optional values: ipc / rdma, multiple protocols separated by commas
* rdma_ports: Specify RDMA communication ports, multiple ports separated by commas (only takes effect when transfer_protocol contains rdma)
* device_ids: GPU device IDs of the inference instance, multiple IDs separated by commas
* metrics_port: Port number of the inference instance's metrics
Among these, `role`, `host_ip`, and `port` are required; all other parameters are optional.
+247
View File
@@ -0,0 +1,247 @@
[English](../../online_serving/router.md)
# 负载均衡调度Router
FastDeploy提供Golang版本[Router](https://github.com/PaddlePaddle/FastDeploy/tree/develop/fastdeploy/golang_router),用于实现请求的调度。Router支持集中式部署和PD分离式部署。
## 安装
### 1. 预编译库下载
在 FastDeploy v2.5.0 及之后版本中,官方 Docker 镜像将内置 Golang Router 编译所需的 Go 语言环境,并提供已编译完成的 Router 二进制文件。该二进制文件默认位于 `/usr/local/bin` 目录下,可直接使用。相关安装方式可参考 [FastDeploy 安装文档](../get_started/installation/nvidia_gpu.md)
### 2. 编译安装
在以下场景中,需要从源码编译 Router:
* 未使用官方 Docker 镜像
* FastDeploy 版本早于 v2.5.0
* 需要对 Router 进行定制化修改
环境要求:
* Go >= 1.21
拉取FastDeploy最新代码,编译安装:
```
git clone https://github.com/PaddlePaddle/FastDeploy.git
cd FastDeploy/fastdeploy/golang_router
bash build.sh
```
## 集中式部署
启动Router服务,其中`--port`参数指定集中式部署的调度端口.
```
./fd-router \
--port 30000
```
启动mixed实例。对比单机部署,增加`--router`参数指定Router的接口,其他参数和单机部署相同。
```
export CUDA_VISIBLE_DEVICES=0
export FD_LOG_DIR="log_mixed"
python -m fastdeploy.entrypoints.openai.api_server \
--model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \
--port 31000 \
--router "0.0.0.0:30000"
```
## PD分离部署
启动Router服务,其中`--splitwise`参数指定为分离式部署的调度方式.
```
./fd-router \
--port 30000 \
--splitwise
```
启动Prefill实例。对比单机部署,增加`--splitwise-role`参数指定实例角色为Prefill,增加`--router`参数指定Router的接口,其他参数和单机部署相同。
```
export CUDA_VISIBLE_DEVICES=0
export FD_LOG_DIR="log_prefill"
python -m fastdeploy.entrypoints.openai.api_server \
--model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \
--port 31000 \
--splitwise-role prefill \
--router "0.0.0.0:30000"
```
启动Decode实例。
```
export CUDA_VISIBLE_DEVICES=1
export FD_LOG_DIR="log_decode"
python -m fastdeploy.entrypoints.openai.api_server \
--model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \
--port 32000 \
--splitwise-role decode \
--router "0.0.0.0:30000"
```
Prefill和Decode实例启动成功,并且向Router注册成功后,可以发送请求。
```
curl -X POST "http://0.0.0.0:30000/v1/chat/completions" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"role": "user", "content": "hello"}
],
"max_tokens": 100,
"stream": false
}'
```
具体的 PD 分离式部署方案,请参考[使用文档](../features/disaggregated.md)
## CacheAware
负载均衡调度Router支持CacheAware策略,主要应用于 PD 分离部署,以优化请求分配,提高缓存命中率。
使用CacheAware策略需修改默认配置,可复制配置模板并进行调整(示例可参考[Router](https://github.com/PaddlePaddle/FastDeploy/tree/develop/fastdeploy/golang_router)目录下的examples/run_with_config:
```bash
pushd examples/run_with_config
cp config/config.example.yaml config/config.yaml
popd
```
在Router启动Router服务,其中`--config_path`参数指定配置路径.
```
./fd-router \
--port 30000 \
--splitwise \
--config_path examples/run_with_config/config/config.yaml
```
Prefill和Decode实例启动同PD分离部署。
启动Prefill实例。
```
export CUDA_VISIBLE_DEVICES=0
export FD_LOG_DIR="log_prefill"
python -m fastdeploy.entrypoints.openai.api_server \
--model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \
--port 31000 \
--splitwise-role prefill \
--router "0.0.0.0:30000"
```
启动Decode实例。
```
export CUDA_VISIBLE_DEVICES=1
export FD_LOG_DIR="log_decode"
python -m fastdeploy.entrypoints.openai.api_server \
--model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \
--port 32000 \
--splitwise-role decode \
--router "0.0.0.0:30000"
```
## HTTP服务说明
Router 通过 HTTP 接口对外提供统一的调度服务,同时支持运行状态探测与监控指标暴露,便于集成与运维。
| 方法 | 路径 | 说明 |
|----------|------|------|
| POST | `/v1/chat/completions` | 对外提供基于 Chat 接口的推理请求调度服务 |
| POST | `/v1/completions` | 对外提供通用文本补全请求的调度服务 |
| POST | `/register` | 推理实例向 Router 注册自身信息,用于参与调度 |
| GET | `/registered` | 查询当前已注册的推理实例列表 |
| GET | `/registered_number` | 查询当前已注册的推理实例数量 |
| GET | `/health_generate` | 检查已注册 Prefill / Decode 推理实例的健康状态 |
| GET | `/metrics` | 提供 Prometheus 格式的 Router 运行指标,用于监控与观测 |
## 部署参数说明
### Router启动参数说明
* --port: 指定Router的调度端口。
* --splitwise: 指定为PD分离式部署的调度方式。
* --config_path: 指定Router配置文件路径,用于加载自定义调度与运行参数。
### 配置文件准备
在使用 `--config_path` 参数前,请准备符合 Router 规范的配置文件。配置文件通常以 YAML 形式存在,具体参考[配置参数说明](#配置参数说明)。可复制配置模板并进行调整(示例可参考 examples/run_with_config):
```bash
cp config/config.example.yaml config/config.yaml
```
负载均衡调度Router还支持通过配置文件在启动阶段注册推理实例(示例可参考 examples/run_with_default_workers):
```bash
cp config/config.example.yaml config/config.yaml
cp config/register.example.yaml config/register.yaml
```
### 配置参数说明
config.yaml 示例:
```yaml
server:
port: "8080" # 监听端口
host: "0.0.0.0" # 监听地址
mode: "debug" # 启动模式: debug, release, test
splitwise: true # true代表开启pd分离模式,false代表开启非pd分离模式
scheduler:
policy: "power_of_two" # 调度策略(可选): random, power_of_two, round_robin, process_tokens, request_num, cache_aware, fd_metrics_score
prefill-policy: "cache_aware" # pd分离模式下prefill节点调度策略
decode-policy: "fd_metrics_score" # pd分离模式下decode节点调度策略
eviction-interval-secs: 60 # cache-aware策略清理过期cache的间隔时间
balance-abs-threshold: 1 # cache-aware策略绝对阈值
balance-rel-threshold: 0.2 # cache-aware策略相对阈值
hit-ratio-weight: 1.0 # cache-aware策略命中率权重
load-balance-weight: 0.05 # cache-aware策略负载均衡权重
cache-block-size: 4 # cache-aware策略cache block大小
tokenizer-url: "http://0.0.0.0:8098" # tokenizer服务地址(可选)
tokenizer-timeout-secs: 2 # tokenizer服务超时时间
waiting-weight: 10 # cache-aware策略等待权重
manager:
health-failure-threshold: 3 # 健康检查失败次数,超过次数后认为节点不健康
health-success-threshold: 2 # 健康检查成功次数,超过次数后认为节点健康
health-check-timeout-secs: 5 # 健康检查超时时间
health-check-interval-secs: 5 # 健康检查间隔时间
health-check-endpoint: /health # 健康检查接口
register-path: "config/register.yaml" # 推理实例注册配置文件路径(可选)
log:
level: "info" # 日志打印级别: debug / info / warn / error
output: "file" # 日志输出方式: stdout / file
```
register.yaml 示例:
```yaml
instances:
- role: "prefill"
host_ip: 127.0.0.1
port: 8097
connector_port: 8001
engine_worker_queue_port: 8002
transfer_protocol:
- ipc
- rdma
rdma_ports: [7100, "7101"]
device_ids: [0, "1"]
metrics_port: 8003
- role: "decode"
host_ip: 127.0.0.1
port: 8098
connector_port: 8001
engine_worker_queue_port: 8002
transfer_protocol: ["ipc","rdma"]
rdma_ports: ["7100", "7101"]
device_ids: ["0", "1"]
```
注册实例参数说明:
* role: 实例角色,可选值 decode / prefill / mixed。
* host_ip: 推理实例所在机器的IP地址。
* port: 推理实例的端口号。
* connector_port: 推理实例指定pd通信的端口。
* engine_worker_queue_port: 推理实例内部的共享队列通信端口。
* transfer_protocol:指定KV Cache传输协议,可选值 ipc / rdma,多个协议用逗号分隔。
* rdma_ports: 指定RDMA通信端口,多个端口用逗号隔开(仅当transfer_protocol包含rdma时生效)。
* device_ids: 推理实例的GPU设备ID,多个设备ID用逗号隔开。
* metrics_port: 推理实例的metrics端口号。
其中 `role``host_ip``port` 为必填参数,其余参数为可选。
+109
View File
@@ -0,0 +1,109 @@
#!/bin/bash
set -e
# Test mixed server + router
# prepare environment
export MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle"
export FD_DEBUG=1
unset http_proxy && unset https_proxy
rm -rf log_*
source ./utils.sh
S1_PORT=52400
S2_PORT=52500
ROUTER_PORT=52600
FD_BIN_DIR="/usr/local/bin"
FD_ROUTER_BIN="${FD_BIN_DIR}/fd-router"
FD_ROUTER_URL="https://paddle-qa.bj.bcebos.com/FastDeploy/fd-router"
FD_ROUTER_SHA256="67640aaeebdd886826d3534930b2154cd2c1441a26bc3f38c3af5f0aadba7c2d"
ports=(
$S1_PORT $((S1_PORT + 1)) $((S1_PORT + 2)) $((S1_PORT + 3))
$S2_PORT $((S2_PORT + 1)) $((S2_PORT + 2)) $((S2_PORT + 3))
$ROUTER_PORT
)
check_ports "${ports[@]}" || {
echo "❌ Some ports are in use. Please release them."
exit 1
}
# check fd-router binary
if [ ! -x "${FD_ROUTER_BIN}" ]; then
echo "⚠️ fd-router not found, downloading..."
mkdir -p "${FD_BIN_DIR}"
TMP_BIN="${FD_ROUTER_BIN}.tmp"
wget -q --no-proxy "${FD_ROUTER_URL}" -O "${TMP_BIN}" || exit 1
echo "${FD_ROUTER_SHA256} ${TMP_BIN}" | sha256sum -c - || {
echo "❌ Integrity check failed"
rm -f "${TMP_BIN}"
exit 1
}
mv "${TMP_BIN}" "${FD_ROUTER_BIN}"
chmod +x "${FD_ROUTER_BIN}"
echo "fd-router installed and verified"
else
echo "fd-router already exists"
fi
# start router
export FD_LOG_DIR="log_router"
mkdir -p ${FD_LOG_DIR}
nohup /usr/local/bin/fd-router \
--port ${ROUTER_PORT} \
2>&1 >${FD_LOG_DIR}/nohup &
# start modelserver 0
export CUDA_VISIBLE_DEVICES=0
export FD_LOG_DIR="log_server_0"
mkdir -p ${FD_LOG_DIR}
nohup python -m fastdeploy.entrypoints.openai.api_server \
--model ${MODEL_NAME} \
--port ${S1_PORT} \
--metrics-port $((S1_PORT + 1)) \
--engine-worker-queue-port $((S1_PORT + 2)) \
--cache-queue-port $((S1_PORT + 3)) \
--max-model-len 32768 \
--router "0.0.0.0:${ROUTER_PORT}" \
2>&1 >${FD_LOG_DIR}/nohup &
wait_for_health ${S1_PORT}
# start modelserver 1
export CUDA_VISIBLE_DEVICES=1
export FD_LOG_DIR="log_server_1"
mkdir -p ${FD_LOG_DIR}
nohup python -m fastdeploy.entrypoints.openai.api_server \
--model ${MODEL_NAME} \
--port ${S2_PORT} \
--metrics-port $((S2_PORT + 1)) \
--engine-worker-queue-port $((S2_PORT + 2)) \
--cache-queue-port $((S2_PORT + 3)) \
--max-model-len 32768 \
--router "0.0.0.0:${ROUTER_PORT}" \
2>&1 >${FD_LOG_DIR}/nohup &
wait_for_health ${S2_PORT}
# send request
sleep 10 # make sure server is registered to router
echo "send request..."
curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"role": "user", "content": "hello"}
],
"max_tokens": 20,
"stream": false
}'
@@ -0,0 +1,81 @@
#!/bin/bash
set -e
# Test mixed server + router
# prepare environment
export MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle"
export FD_DEBUG=1
unset http_proxy && unset https_proxy
rm -rf log_*
source ./utils.sh
S1_PORT=52400
S2_PORT=52500
ROUTER_PORT=52600
ports=(
$S1_PORT $((S1_PORT + 1)) $((S1_PORT + 2)) $((S1_PORT + 3))
$S2_PORT $((S2_PORT + 1)) $((S2_PORT + 2)) $((S2_PORT + 3))
$ROUTER_PORT
)
check_ports "${ports[@]}" || {
echo "❌ Some ports are in use. Please release them."
exit 1
}
# start router
export FD_LOG_DIR="log_router"
mkdir -p ${FD_LOG_DIR}
nohup python -m fastdeploy.router.launch \
--port ${ROUTER_PORT} \
2>&1 >${FD_LOG_DIR}/nohup &
# start modelserver 0
export CUDA_VISIBLE_DEVICES=0
export FD_LOG_DIR="log_server_0"
mkdir -p ${FD_LOG_DIR}
nohup python -m fastdeploy.entrypoints.openai.api_server \
--model ${MODEL_NAME} \
--port ${S1_PORT} \
--metrics-port $((S1_PORT + 1)) \
--engine-worker-queue-port $((S1_PORT + 2)) \
--cache-queue-port $((S1_PORT + 3)) \
--max-model-len 32768 \
--router "0.0.0.0:${ROUTER_PORT}" \
2>&1 >${FD_LOG_DIR}/nohup &
wait_for_health ${S1_PORT}
# start modelserver 1
export CUDA_VISIBLE_DEVICES=1
export FD_LOG_DIR="log_server_1"
mkdir -p ${FD_LOG_DIR}
nohup python -m fastdeploy.entrypoints.openai.api_server \
--model ${MODEL_NAME} \
--port ${S2_PORT} \
--metrics-port $((S2_PORT + 1)) \
--engine-worker-queue-port $((S2_PORT + 2)) \
--cache-queue-port $((S2_PORT + 3)) \
--max-model-len 32768 \
--router "0.0.0.0:${ROUTER_PORT}" \
2>&1 >${FD_LOG_DIR}/nohup &
wait_for_health ${S2_PORT}
# send request
sleep 10 # make sure server is registered to router
echo "send request..."
curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"role": "user", "content": "hello"}
],
"max_tokens": 20,
"stream": false
}'
+73
View File
@@ -0,0 +1,73 @@
#!/bin/bash
set -e
# prepare environment
export MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle"
export FD_DEBUG=1
SCRIPT_PATH=$(readlink -f "$0")
SCRIPT_DIR=$(dirname "$SCRIPT_PATH")
source ${SCRIPT_DIR}/utils.sh
unset http_proxy && unset https_proxy
P_PORT=52400
D_PORT=52500
ROUTER_PORT=52700
LOG_DATE=$(date +%Y%m%d_%H%M%S)
ports=($P_PORT $D_PORT $ROUTER_PORT)
check_ports "${ports[@]}" || {
echo "❌ Some ports are in use. Please release them."
exit 1
}
# start router
export FD_LOG_DIR="log/$LOG_DATE/router"
rm -rf ${FD_LOG_DIR} && mkdir -p ${FD_LOG_DIR}
nohup python -m fastdeploy.router.launch \
--port ${ROUTER_PORT} \
--splitwise \
2>&1 >${FD_LOG_DIR}/nohup &
# start prefill
export CUDA_VISIBLE_DEVICES=0
export FD_LOG_DIR="log/$LOG_DATE/prefill"
rm -rf ${FD_LOG_DIR} && mkdir -p ${FD_LOG_DIR}
nohup python -m fastdeploy.entrypoints.openai.api_server \
--model ${MODEL_NAME} \
--port "${P_PORT}" \
--splitwise-role "prefill" \
--router "0.0.0.0:${ROUTER_PORT}" \
2>&1 >${FD_LOG_DIR}/nohup &
wait_for_health ${P_PORT}
# start decode
export CUDA_VISIBLE_DEVICES=1
export FD_LOG_DIR="log/$LOG_DATE/decode"
rm -rf ${FD_LOG_DIR} && mkdir -p ${FD_LOG_DIR}
nohup python -m fastdeploy.entrypoints.openai.api_server \
--model ${MODEL_NAME} \
--port "${D_PORT}" \
--splitwise-role "decode" \
--router "0.0.0.0:${ROUTER_PORT}" \
2>&1 >${FD_LOG_DIR}/nohup &
wait_for_health ${D_PORT}
# send request
sleep 10 # make sure server is registered to router
echo "send request..."
curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"role": "user", "content": "hello"}
],
"max_tokens": 100,
"stream": false
}'
+99
View File
@@ -0,0 +1,99 @@
#!/bin/bash
is_port_free() {
local port=$1
if ss -ltun | awk '{print $4}' | grep -q ":${port}$"; then
return 1 # Port is occupied
fi
return 0 # Port is free
}
check_ports() {
for port in "$@"; do
if ! is_port_free $port; then
echo "❌ Port $port is already in use"
return 1
fi
done
return 0
}
wait_for_health() {
IFS=',' read -r -a server_ports <<< "$1"
local num_ports=${#server_ports[@]}
local total_lines=$((num_ports + 1))
local first_run=true
local GREEN='\033[0;32m'
local RED='\033[0;31m'
local NC='\033[0m' # No Color
local start_time=$(date +%s)
echo "-------- WAIT FOR HEALTH --------"
while true; do
local all_ready=true
for port in "${server_ports[@]}"; do
status_code=$(curl -s --max-time 1 -o /dev/null -w "%{http_code}" "http://0.0.0.0:${port}/health" || echo "000")
if [ "$status_code" -eq 200 ]; then
printf "Port %s: ${GREEN}[OK] 200${NC}\033[K\n" "$port"
else
all_ready=false
printf "Port %s: ${RED}[WAIT] %s${NC}\033[K\n" "$port" "$status_code"
fi
done
cur_time=$(date +%s)
if [ "$all_ready" = "true" ]; then
echo "All services are ready! [$((cur_time-start_time))s]"
break
else
echo "Services not ready.. [$((cur_time-start_time))s]"
printf "\033[%dA" "$total_lines" # roll back cursor
sleep 1
fi
done
echo "---------------------------------"
}
get_free_ports() {
free_ports_num=${1:-1}
start_port=${2:-8000}
end_port=${3:-9000}
free_ports=()
if [[ ! -n ${free_ports_num} || "${free_ports_num}" -le 0 ]]; then
log_warn "param can't be empty, and should > 0"
echo ${free_ports[@]}
return 1
fi
used_ports1=$(netstat -an | grep -E "(0.0.0.0|127.0.0.1|${POD_IP}|tcp6)" | awk '{n=split($4,a,":"); if(a[n]~/^[0-9]+$/) print a[n];}' | sort -u)
used_ports2=$(netstat -an | grep -E "(0.0.0.0|127.0.0.1|${POD_IP}|tcp6)" | awk '{n=split($5,a,":"); if(a[n]~/^[0-9]+$/) print a[n];}' | sort -u)
all_used_ports=$(printf "%s\n" "${used_ports1}" "${used_ports2}" | sort -u)
# Generate random number between 0 and 32767
random_num=$(( RANDOM ))
port=$(( random_num % (end_port - start_port + 1) + start_port ))
while true; do
(( port++ ))
if [[ ${port} -ge ${end_port} ]]; then
port=${start_port}
fi
if [[ "${all_used_ports[@]}" =~ "${port}" ]]; then
continue
fi
if is_port_free ${port}; then
free_ports+=("${port}")
(( free_ports_num-- ))
if [[ ${free_ports_num} = 0 ]]; then
break
fi
fi
done
# echo ${free_ports[@]}
IFS=',' && echo "${free_ports[*]}"
return 0
}
-184
View File
@@ -1,184 +0,0 @@
# Golang-Router
## 关于
【正在开发迭代中】
Golang-Router 是一个面向大语言模型推理系统的高性能 Golang 路由框架,作为系统的**控制与调度平面**运行,负责请求接入、实例选择与流量转发,设计上适配 Prefill–DecodePD)分离推理架构。
Golang-Router 可独立部署运行,也可通过 HTTP 接口与 FastDeploy 推理实例协同工作。框架提供基础而稳定的路由、中间件扩展与健康检查能力,适用于单点推理部署场景,并在架构层面为后续的水平扩展与调度能力演进预留空间。
### 背景与动机
在大语言模型推理系统中,路由组件已从传统的流量转发层演进为影响系统性能与资源利用效率的关键基础设施。随着 Prefill–Decode 分离推理架构的广泛采用,不同推理阶段在计算特征、显存占用与缓存行为方面呈现出明显差异,仅依赖请求级静态信息进行调度已难以满足稳定性与效率需求。
在保持请求级调度模型不变的前提下,引入更细粒度的运行时信号辅助调度决策,成为提升调度能力与系统可预测性的工程共识。Golang-Router 正是在这一背景下构建,作为独立的路由与调度组件,为推理系统提供清晰、可扩展的控制平面。
### 设计目标
Golang-Router 聚焦解决以下核心问题:
- **调度决策信息不足**
传统 Router 通常仅基于请求级元信息或粗粒度实例状态进行调度,难以利用推理过程中产生的细粒度缓存相关信号,从而限制了 cache-aware 策略的实际效果。
- **调度逻辑与推理执行强耦合**
路由与调度逻辑内嵌于推理框架内部,增加了系统复杂度,限制了调度策略的独立演进与复用能力。
- **高并发场景下的可扩展性挑战**
在高并发推理负载下,实例状态维护与实例选择逻辑对路由组件的并发模型、性能与稳定性提出更高要求。
### 核心特性
- 基于 Golang 实现的高性能路由与调度组件,适用于高并发、低延迟推理场景
- 请求级调度模型,保持接口语义清晰与系统复杂度可控
- 利用token级缓存相关运行时信息作为调度策略的辅助输入,用于提升实例选择的准确性与稳定性
- 模块化架构设计(Gateway / Scheduler / Manager),职责边界清晰,便于扩展与维护
- 面向 Prefill–Decode 分离推理架构设计,为复杂调度策略与能力演进提供结构性支持
### 与现有方案的差异
与 sglang 等推理框架内置 Router 相比,Golang-Router 以**独立 Golang 服务**的形式运行,将路由、调度与实例状态管理能力从推理执行逻辑中解耦。
Golang-Router 已支持 cache-aware 调度,在请求级调度框架内引入 token 级缓存相关运行时信号,辅助调度决策制定,以更稳定地适配 Prefill–Decode 分离推理架构下的缓存利用需求。
## 功能特性
- 高性能 HTTP/HTTPS 服务器
- RESTful API 路由支持
- 可扩展的中间件系统
- 动态配置管理
- 内置健康检查和监控
- 负载均衡
- 日志记录和指标收集
## 快速开始
### 前置要求
- Go 1.21
- 构建不依赖特定系统环境
- 可直接在 FastDeploy 官方 Docker 环境中编译与运行
### 编译
```bash
./build.sh
```
### 配置
1. 配置文件准备(可选)
如需修改默认配置,可复制配置模板并进行调整(示例可参考 examples/run_with_config):
```bash
cp config/config.example.yaml config/config.yaml
```
2. 主要配置项说明:
```yaml
server:
port: "8080" # 监听端口
host: "0.0.0.0" # 监听地址
mode: "debug" # 启动模式: debug, release, test
splitwise: true # true代表开启pd分离模式,false代表开启非pd分离模式
scheduler:
policy: "power_of_two" # 调度策略(可选): random, power_of_two, round_robin, process_tokens, request_num
prefill-policy: "cache_aware" # pd分离模式下prefill节点调度策略
decode-policy: "fd_metrics_score" # pd分离模式下decode节点调度策略
eviction-interval-secs: 60 # cache-aware策略清理过期cache的间隔时间
balance-abs-threshold: 1 # cache-aware策略绝对阈值
balance-rel-threshold: 0.2 # cache-aware策略相对阈值
hit-ratio-weight: 1.0 # cache-aware策略命中率权重
load-balance-weight: 0.05 # cache-aware策略负载均衡权重
cache-block-size: 4 # cache-aware策略cache block大小
tokenizer-url: "http://0.0.0.0:8098" # tokenizer服务地址(可选)
tokenizer-timeout-secs: 2 # tokenizer服务超时时间
waiting-weight: 10 # cache-aware策略等待权重
manager:
health-failure-threshold: 3 # 健康检查失败次数,超过次数后认为节点不健康
health-success-threshold: 2 # 健康检查成功次数,超过次数后认为节点健康
health-check-timeout-secs: 5 # 健康检查超时时间
health-check-interval-secs: 5 # 健康检查间隔时间
health-check-endpoint: /health # 健康检查接口
register-path: "config/register.yaml" # 推理实例注册配置文件路径(可选)
log:
level: "info" # 日志打印级别: debug / info / warn / error
output: "file" # 日志输出方式: stdout / file
```
3. 启动时注册实例(可选)
支持通过配置文件在启动阶段注册推理实例(示例可参考 examples/run_with_default_workers):
```bash
cp config/config.example.yaml config/config.yaml
cp config/register.example.yaml config/register.yaml
```
### 运行
本项目支持两种运行方式:直接运行源码 或 构建二进制文件后运行。
方式一:直接运行源码
在项目根目录下,使用 go run 启动服务:
```bash
go run cmd/main.go
```
该方式适用于本地开发与调试场景。
方式二:构建并运行二进制文件
1. 构建二进制文件
通过构建脚本生成可执行文件:
```bash
./build.sh
```
构建完成后,二进制文件将被安装到指定目录(默认为 /usr/local/bin,可通过修改 Makefile 中的 OUTDIR 进行调整)。
此外,也可以在项目根目录下手动构建二进制文件:
```bash
go build -o ./fd-router ./cmd
```
该方式便于本地测试或将二进制文件与配置文件一并分发。
2. 运行二进制文件
可以通过运行脚本启动服务:
```bash
./run.sh
```
运行脚本会自动处理常见启动参数及日志目录,适合标准化部署场景。
也可以直接运行二进制文件,在项目根目录或二进制所在目录下执行:
```bash
./fd-router \
--port 8080 \
--splitwise \
--config_path ./config/config.yaml
```
其中:
- --port 为必填参数
- 其他参数可根据实际需求配置
## 项目结构
```
.
├── cmd/ # 主程序入口
├── config/ # 配置文件
├── internal/ # 核心实现代码
│ ├── common/ # 公共接口定义
│ ├── config/ # 配置处理
│ ├── gateway/ # API网关实现
│ ├── manager/ # 路由管理
│ ├── middleware/ # 中间件实现
│ ├── router/ # 路由核心逻辑
│ └── scheduler/ # 调度器实现
├── logs/ # 日志目录
├── output/ # 构建输出
├── pkg/ # 可复用组件
│ ├── logger/ # 日志组件
│ └── metrics/ # 监控指标
├── build.sh # 构建脚本
├── go.mod # Go模块定义
├── go.sum # 依赖校验
├── Makefile # 构建管理
├── README.md # 项目说明
└── run.sh # 启动脚本
```
### 运行测试
```bash
make test
```
## 贡献
欢迎提交 Issue 和 Pull Request
+50 -2
View File
@@ -3,7 +3,9 @@ package main
import ( import (
"context" "context"
"flag" "flag"
"fmt"
"log" "log"
"runtime/debug"
"github.com/PaddlePaddle/FastDeploy/router/internal/config" "github.com/PaddlePaddle/FastDeploy/router/internal/config"
"github.com/PaddlePaddle/FastDeploy/router/internal/manager" "github.com/PaddlePaddle/FastDeploy/router/internal/manager"
@@ -14,13 +16,24 @@ import (
func main() { func main() {
// Parse command line arguments // Parse command line arguments
var configPath, port string var (
var splitwise bool configPath string
port string
splitwise bool
showVersion bool
)
flag.StringVar(&configPath, "config_path", "", "path to config file") flag.StringVar(&configPath, "config_path", "", "path to config file")
flag.StringVar(&port, "port", "", "listen port of router") flag.StringVar(&port, "port", "", "listen port of router")
flag.BoolVar(&splitwise, "splitwise", false, "enable splitwise mode") flag.BoolVar(&splitwise, "splitwise", false, "enable splitwise mode")
flag.BoolVar(&showVersion, "version", false, "print version info")
flag.BoolVar(&showVersion, "V", false, "print version info (shorthand)")
flag.Parse() flag.Parse()
if showVersion {
printVersion()
return
}
// Load configuration // Load configuration
cfg, err := config.Load(configPath, port, splitwise) cfg, err := config.Load(configPath, port, splitwise)
if err != nil { if err != nil {
@@ -52,3 +65,38 @@ func main() {
log.Fatalf("Failed to start server: %v", err) log.Fatalf("Failed to start server: %v", err)
} }
} }
func printVersion() {
if info, ok := debug.ReadBuildInfo(); ok {
var (
commit = "unknown"
vcsTime = "unknown"
dirty = "unknown"
)
for _, s := range info.Settings {
switch s.Key {
case "vcs.revision":
if len(s.Value) >= 7 {
commit = s.Value[:7]
} else {
commit = s.Value
}
case "vcs.time":
vcsTime = s.Value
case "vcs.modified":
dirty = s.Value
}
}
fmt.Printf("golang-router\n")
fmt.Printf(" version: %s\n", info.Main.Version)
fmt.Printf(" commit: %s\n", commit)
fmt.Printf(" dirty: %s\n", dirty)
fmt.Printf(" vcsTime: %s\n", vcsTime)
fmt.Printf(" module: %s\n", info.Main.Path)
return
}
fmt.Println("version info not available")
}
@@ -3,6 +3,7 @@ package gateway
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"context"
crand "crypto/rand" crand "crypto/rand"
"encoding/json" "encoding/json"
"fmt" "fmt"
@@ -35,6 +36,8 @@ func newRequestID() string {
return fmt.Sprintf("%d-%d", time.Now().UnixNano(), rand.Int63()) return fmt.Sprintf("%d-%d", time.Now().UnixNano(), rand.Int63())
} }
type PromptExtractor func(rawReq map[string]any) string
// extractPromptFromChatRequest extracts text prompt from OpenAI ChatCompletions style request // extractPromptFromChatRequest extracts text prompt from OpenAI ChatCompletions style request
func extractPromptFromChatRequest(rawReq map[string]any) string { func extractPromptFromChatRequest(rawReq map[string]any) string {
messagesVal, ok := rawReq["messages"] messagesVal, ok := rawReq["messages"]
@@ -95,12 +98,55 @@ func extractPromptFromChatRequest(rawReq map[string]any) string {
return builder.String() return builder.String()
} }
func extractPromptFromCompletionsRequest(rawReq map[string]any) string {
promptVal, ok := rawReq["prompt"]
if !ok {
return ""
}
var builder strings.Builder
appendText := func(s string) {
s = strings.TrimSpace(s)
if s == "" {
return
}
if builder.Len() > 0 {
builder.WriteByte(' ')
}
builder.WriteString(s)
}
switch v := promptVal.(type) {
case string:
appendText(v)
case []string:
for _, s := range v {
appendText(s)
}
case []any:
for _, item := range v {
if s, ok := item.(string); ok {
appendText(s)
}
}
default:
// Other structures are ignored for now
}
return builder.String()
}
// PostToPD sends requests to both Prefill and Decode instances, only returns Decode node response // PostToPD sends requests to both Prefill and Decode instances, only returns Decode node response
func PostToPD(c *gin.Context, decodeURL, prefillURL string, reqBody []byte) (*http.Response, error) { func PostToPD(c *gin.Context, decodeURL, prefillURL string, reqBody []byte, isStream bool, completionEndpoint string) (*http.Response, error) {
ctx := c.Request.Context() ctx := c.Request.Context()
decodeEndpoint := fmt.Sprintf("%s/v1/%s", decodeURL, "chat/completions") decodeEndpoint := fmt.Sprintf("%s/v1/%s", decodeURL, completionEndpoint)
prefillEndpoint := fmt.Sprintf("%s/v1/%s", prefillURL, "chat/completions") prefillEndpoint := fmt.Sprintf("%s/v1/%s", prefillURL, completionEndpoint)
// Construct two requests // Construct two requests
decodeReq, err := http.NewRequestWithContext(ctx, "POST", decodeEndpoint, bytes.NewReader(reqBody)) decodeReq, err := http.NewRequestWithContext(ctx, "POST", decodeEndpoint, bytes.NewReader(reqBody))
@@ -160,14 +206,71 @@ func PostToPD(c *gin.Context, decodeURL, prefillURL string, reqBody []byte) (*ht
} }
if prefillRes.resp != nil { if prefillRes.resp != nil {
prefillRes.resp.Body.Close() go readPrefillRecv(ctx, prefillURL, isStream, prefillRes.resp)
} }
return decodeRes.resp, nil return decodeRes.resp, nil
} }
func readPrefillRecv(ctx context.Context, url string, isStream bool, backendResp *http.Response) {
if backendResp == nil || backendResp.Body == nil {
return
}
defer backendResp.Body.Close()
if isStream {
buffer := bytebufferpool.Get()
buffer.Reset()
defer bytebufferpool.Put(buffer)
scanner := bufio.NewScanner(backendResp.Body)
scanner.Buffer(buffer.B, maxCapacity)
released := false
defer func() {
// Fallback to ensure release
if !released {
scheduler_handler.Release(ctx, url)
logger.Debug("[prefill] release in defer (fallback) url=%s", url)
}
}()
for scanner.Scan() {
line := scanner.Text()
// First read that returns data
if !released {
scheduler_handler.Release(ctx, url)
released = true
logger.Debug("[prefill] first chunk received, release scheduler url=%s", url)
}
logger.Debug("[prefill] recv result: %s", line)
}
if err := scanner.Err(); err != nil {
logger.Debug("[prefill] scanner error: %v", err)
}
} else {
_, err := io.Copy(io.Discard, backendResp.Body)
if err != nil {
logger.Debug("[prefill] copy error: %v", err)
}
}
}
// ChatCompletions implements request forwarding to actual large model inference service // ChatCompletions implements request forwarding to actual large model inference service
func ChatCompletions(c *gin.Context) { func ChatCompletions(c *gin.Context) {
completionEndpoint := "chat/completions"
CommonCompletions(c, extractPromptFromChatRequest, completionEndpoint)
}
func Completions(c *gin.Context) {
completionEndpoint := "completions"
CommonCompletions(c, extractPromptFromCompletionsRequest, completionEndpoint)
}
func CommonCompletions(c *gin.Context, extractor PromptExtractor, completionEndpoint string) {
ctx := c.Request.Context() ctx := c.Request.Context()
bodyBytes, err := io.ReadAll(c.Request.Body) bodyBytes, err := io.ReadAll(c.Request.Body)
@@ -190,13 +293,15 @@ func ChatCompletions(c *gin.Context) {
destURL string destURL string
releaseTargets []string releaseTargets []string
requestBodyData []byte requestBodyData []byte
prefillURL string
decodeURL string
) )
if isSplitwise { if isSplitwise {
// PD mode: select instances for Prefill/Decode separately // PD mode: select instances for Prefill/Decode separately
message := extractPromptFromChatRequest(rawReq) message := extractor(rawReq)
prefillURL, decodeURL, err := manager.SelectWorkerPair(ctx, message) prefillURL, decodeURL, err = manager.SelectWorkerPair(ctx, message)
if err != nil { if err != nil {
c.Writer.WriteHeader(http.StatusBadGateway) c.Writer.WriteHeader(http.StatusBadGateway)
c.Writer.Write([]byte(`{"error": "Failed to select worker pair"}`)) c.Writer.Write([]byte(`{"error": "Failed to select worker pair"}`))
@@ -208,6 +313,9 @@ func ChatCompletions(c *gin.Context) {
return return
} }
// Prefill node token count was added in SelectWorker, release when request ends
defer scheduler_handler.ReleasePrefillTokens(ctx, prefillURL, message)
// Construct disaggregate_info to ensure selected P/D work in pairs within FastDeploy // Construct disaggregate_info to ensure selected P/D work in pairs within FastDeploy
disagg, err := manager.BuildDisaggregateInfo(ctx, prefillURL, decodeURL) disagg, err := manager.BuildDisaggregateInfo(ctx, prefillURL, decodeURL)
if err != nil { if err != nil {
@@ -237,9 +345,6 @@ func ChatCompletions(c *gin.Context) {
// Expose scheduling results to caller for debugging/validating scheduling strategy // Expose scheduling results to caller for debugging/validating scheduling strategy
c.Writer.Header().Set("X-Router-Prefill-URL", prefillURL) c.Writer.Header().Set("X-Router-Prefill-URL", prefillURL)
c.Writer.Header().Set("X-Router-Decode-URL", decodeURL) c.Writer.Header().Set("X-Router-Decode-URL", decodeURL)
// Prefill node token count was added in SelectWorker, release when request ends
defer scheduler_handler.ReleasePrefillTokens(ctx, prefillURL, message)
} else { } else {
// Non-PD mode: use Mixed instance // Non-PD mode: use Mixed instance
dest, err := manager.SelectWorker(ctx, "") dest, err := manager.SelectWorker(ctx, "")
@@ -260,10 +365,18 @@ func ChatCompletions(c *gin.Context) {
} }
}() }()
isStream := false
if v, ok := rawReq["stream"]; ok {
stream, ok := v.(bool)
if ok && stream {
isStream = true
}
}
// Send request // Send request
var backendResp *http.Response var backendResp *http.Response
if isSplitwise { if isSplitwise {
backendResp, err = PostToPD(c, destURL, releaseTargets[0], requestBodyData) backendResp, err = PostToPD(c, decodeURL, prefillURL, requestBodyData, isStream, completionEndpoint)
} else { } else {
backendResp, err = GetClientWithRetry(c, requestBodyData, destURL) backendResp, err = GetClientWithRetry(c, requestBodyData, destURL)
} }
@@ -291,14 +404,6 @@ func ChatCompletions(c *gin.Context) {
c.Writer.WriteHeader(backendResp.StatusCode) c.Writer.WriteHeader(backendResp.StatusCode)
} }
isStream := false
if v, ok := rawReq["stream"]; ok {
stream, ok := v.(bool)
if ok && stream {
isStream = true
}
}
redirect(c, isStream, backendResp) redirect(c, isStream, backendResp)
} }
@@ -2,9 +2,12 @@ package gateway
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"io"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strings"
"testing" "testing"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -57,29 +60,6 @@ func TestExtractPromptFromChatRequest(t *testing.T) {
} }
} }
func TestPostToPD(t *testing.T) {
// Setup test servers
prefillTS := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer prefillTS.Close()
decodeTS := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"response": "test"}`))
}))
defer decodeTS.Close()
// Setup test context
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(`{"test": "data"}`))
resp, err := PostToPD(c, decodeTS.URL, prefillTS.URL, []byte(`{"test": "data"}`))
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
func TestRedirect(t *testing.T) { func TestRedirect(t *testing.T) {
// Setup test server // Setup test server
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -144,3 +124,309 @@ func TestNewRequestID(t *testing.T) {
// Check that IDs are different // Check that IDs are different
assert.NotEqual(t, id1, id2) assert.NotEqual(t, id1, id2)
} }
func TestExtractPromptFromCompletionsRequest(t *testing.T) {
tests := []struct {
name string
input string
expected string
}{
{
"simple string prompt",
`{"prompt": "hello world"}`,
"hello world",
},
{
"string array prompt",
`{"prompt": ["first", "second", "third"]}`,
"first second third",
},
{
"interface array prompt",
`{"prompt": ["first", "second", "third"]}`,
"first second third",
},
{
"empty prompt",
`{"prompt": ""}`,
"",
},
{
"empty array prompt",
`{"prompt": []}`,
"",
},
{
"missing prompt field",
`{"other": "field"}`,
"",
},
{
"array with empty strings",
`{"prompt": ["", "hello", ""]}`,
"hello",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var rawReq map[string]any
err := json.Unmarshal([]byte(tt.input), &rawReq)
assert.NoError(t, err)
result := extractPromptFromCompletionsRequest(rawReq)
assert.Equal(t, tt.expected, result)
})
}
}
func TestPostToPD(t *testing.T) {
// Setup test context
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/v1/chat/completions",
bytes.NewBufferString(`{"test": "data"}`))
reqBody := []byte(`{"test": "data"}`)
t.Run("successful request to both P and D", func(t *testing.T) {
// Setup test servers for prefill and decode
prefillServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("prefill response"))
}))
defer prefillServer.Close()
decodeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("decode response"))
}))
defer decodeServer.Close()
resp, err := PostToPD(c, decodeServer.URL, prefillServer.URL, reqBody, false, "chat/completions")
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.NotNil(t, resp)
defer resp.Body.Close()
})
t.Run("decode server connection error", func(t *testing.T) {
prefillServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer prefillServer.Close()
// Use invalid URL to simulate connection error
resp, err := PostToPD(c, "http://invalid-server:9999", prefillServer.URL, reqBody, false, "chat/completions")
assert.Error(t, err)
assert.Nil(t, resp)
})
t.Run("prefill server connection error", func(t *testing.T) {
decodeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer decodeServer.Close()
// Use invalid URL to simulate connection error
resp, err := PostToPD(c, decodeServer.URL, "http://invalid-server:9999", reqBody, false, "chat/completions")
assert.Error(t, err)
assert.Nil(t, resp)
})
}
func TestGetClientWithRetry(t *testing.T) {
t.Run("success after connection errors", func(t *testing.T) {
retryCount := 0
shouldFail := true
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
retryCount++
if shouldFail && retryCount < 3 {
// Simulate network connection error by closing connection
hj, ok := w.(http.Hijacker)
if ok {
conn, _, _ := hj.Hijack()
conn.Close()
return
}
w.WriteHeader(http.StatusInternalServerError)
return
}
shouldFail = false
w.WriteHeader(http.StatusOK)
w.Write([]byte("success"))
}))
defer ts.Close()
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/v1/chat/completions",
bytes.NewBufferString(`{"test": "data"}`))
reqBody := []byte(`{"test": "data"}`)
resp, err := GetClientWithRetry(c, reqBody, ts.URL)
assert.NoError(t, err)
assert.NotNil(t, resp)
assert.Equal(t, http.StatusOK, resp.StatusCode)
})
t.Run("all retries fail with connection errors", func(t *testing.T) {
retryCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
retryCount++
// Always simulate network connection error
hj, ok := w.(http.Hijacker)
if ok {
conn, _, _ := hj.Hijack()
conn.Close()
return
}
w.WriteHeader(http.StatusInternalServerError)
}))
defer ts.Close()
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/v1/chat/completions",
bytes.NewBufferString(`{"test": "data"}`))
reqBody := []byte(`{"test": "data"}`)
resp, err := GetClientWithRetry(c, reqBody, ts.URL)
assert.Error(t, err)
assert.Nil(t, resp)
})
t.Run("success on first try", func(t *testing.T) {
retryCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
retryCount++
w.WriteHeader(http.StatusOK)
w.Write([]byte("success"))
}))
defer ts.Close()
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/v1/chat/completions",
bytes.NewBufferString(`{"test": "data"}`))
reqBody := []byte(`{"test": "data"}`)
resp, err := GetClientWithRetry(c, reqBody, ts.URL)
assert.NoError(t, err)
assert.NotNil(t, resp)
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, 1, retryCount)
})
}
func TestCompletions(t *testing.T) {
// This is a basic test that just verifies the function calls CommonCompletions
// More comprehensive testing would require mocking the manager dependencies
t.Run("function exists", func(t *testing.T) {
// Just verify that the function can be called without panic
// Actual behavior testing requires integration test setup
assert.NotNil(t, Completions)
})
}
func TestReadPrefillRecv(t *testing.T) {
t.Run("nil response handling", func(t *testing.T) {
ctx := context.Background()
// Should handle nil response gracefully without panic
readPrefillRecv(ctx, "test-url", false, nil)
})
t.Run("nil response body handling", func(t *testing.T) {
ctx := context.Background()
// Create a mock response with nil body
resp := &http.Response{
StatusCode: http.StatusOK,
Body: nil,
}
// Should handle nil body gracefully without panic
readPrefillRecv(ctx, "test-url", false, resp)
})
t.Run("mock response without scheduler dependency", func(t *testing.T) {
ctx := context.Background()
// Create a simple response that doesn't trigger scheduler calls
resp := &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewBufferString("test")),
}
// This test verifies basic error handling and response body consumption
// without triggering scheduler initialization requirements
readPrefillRecv(ctx, "test-url", false, resp)
})
}
func TestCommonCompletions(t *testing.T) {
// Setup a basic test server for backend responses
backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check if it's a stream request
bodyBytes, _ := io.ReadAll(r.Body)
var reqBody map[string]any
json.Unmarshal(bodyBytes, &reqBody)
if stream, ok := reqBody["stream"].(bool); ok && stream {
// Stream response
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)
w.Write([]byte("data: {\"choices\":[{\"text\":\"chunk1\"}]}\n"))
w.Write([]byte("data: {\"choices\":[{\"text\":\"chunk2\"}]}\n"))
w.Write([]byte("data: [DONE]\n"))
} else {
// Non-stream response
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"choices":[{"text":"test response"}]}`))
}
}))
defer backendServer.Close()
t.Run("basic request handling", func(t *testing.T) {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/v1/completions",
bytes.NewBufferString(`{"prompt": "test", "stream": false}`))
// Mock the manager functions to return our test server
// This would normally require more sophisticated mocking
// For now, this test verifies the function structure
assert.NotNil(t, CommonCompletions)
})
t.Run("invalid JSON request", func(t *testing.T) {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/v1/completions",
bytes.NewBufferString(`invalid json`))
CommonCompletions(c, extractPromptFromCompletionsRequest, "completions")
// Should return 400 Bad Request
assert.Equal(t, http.StatusBadRequest, w.Code)
assert.Contains(t, w.Body.String(), "Invalid JSON format")
})
t.Run("empty request body", func(t *testing.T) {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/v1/completions", nil)
CommonCompletions(c, extractPromptFromCompletionsRequest, "completions")
// Should return 400 Bad Request with appropriate error message
assert.Equal(t, http.StatusBadRequest, w.Code)
// The error message could be either "Invalid request body" or "Invalid JSON format"
// depending on how empty body is handled
assert.True(t, strings.Contains(w.Body.String(), "Invalid request body") ||
strings.Contains(w.Body.String(), "Invalid JSON format") ||
w.Body.String() != "")
})
}
@@ -24,7 +24,7 @@ func New(cfg *config.Config) *gin.Engine {
v1 := r.Group("/v1") v1 := r.Group("/v1")
{ {
v1.POST("/chat/completions", gateway.ChatCompletions) v1.POST("/chat/completions", gateway.ChatCompletions)
v1.POST("/completions", gateway.ChatCompletions) v1.POST("/completions", gateway.Completions)
} }
r.POST("/register", manager.RegisterInstance) r.POST("/register", manager.RegisterInstance)
r.GET("/registered_number", manager.RegisteredNumber) r.GET("/registered_number", manager.RegisteredNumber)
+1
View File
@@ -132,6 +132,7 @@ nav:
- Monitor Metrics: online_serving/metrics.md - Monitor Metrics: online_serving/metrics.md
- Scheduler: online_serving/scheduler.md - Scheduler: online_serving/scheduler.md
- Graceful Shutdown: online_serving/graceful_shutdown_service.md - Graceful Shutdown: online_serving/graceful_shutdown_service.md
- Load-Balancing Scheduling Router: online_serving/router.md
- Offline Inference: offline_inference.md - Offline Inference: offline_inference.md
- Best Practices: - Best Practices:
- ERNIE-4.5-0.3B: best_practices/ERNIE-4.5-0.3B-Paddle.md - ERNIE-4.5-0.3B: best_practices/ERNIE-4.5-0.3B-Paddle.md