From 506f1545cde8bc0ca1981922473dc1084e06b7e1 Mon Sep 17 00:00:00 2001 From: mouxin <494624263qq@gmail.com> Date: Fri, 30 Jan 2026 10:28:48 +0800 Subject: [PATCH] [Feature] Enhance Router with /v1/completions, docs, scripts, and version info (#5966) * [Doc] Update prerequisites in the documentation * [Feature] Enhance Router with /v1/completions, docs, scripts, and version info * [Feature] Enhance Router with /v1/completions, docs, scripts, and version info --------- Co-authored-by: mouxin --- docs/online_serving/router.md | 247 +++++++++++++ docs/zh/online_serving/router.md | 247 +++++++++++++ examples/router/start_mixed_go_router.sh | 109 ++++++ examples/router/start_mixed_python_router.sh | 81 +++++ .../start_pd_go_router.sh} | 0 examples/router/start_pd_python_router.sh | 73 ++++ examples/router/utils.sh | 99 ++++++ fastdeploy/golang_router/README_CN.md | 184 ---------- fastdeploy/golang_router/cmd/main.go | 52 ++- .../internal/gateway/completions.go | 141 +++++++- .../internal/gateway/completions_test.go | 332 ++++++++++++++++-- .../golang_router/internal/router/router.go | 2 +- mkdocs.yml | 1 + 13 files changed, 1340 insertions(+), 228 deletions(-) create mode 100644 docs/online_serving/router.md create mode 100644 docs/zh/online_serving/router.md create mode 100644 examples/router/start_mixed_go_router.sh create mode 100644 examples/router/start_mixed_python_router.sh rename examples/{splitwise/start_v2_tp1.sh => router/start_pd_go_router.sh} (100%) create mode 100644 examples/router/start_pd_python_router.sh create mode 100644 examples/router/utils.sh delete mode 100644 fastdeploy/golang_router/README_CN.md diff --git a/docs/online_serving/router.md b/docs/online_serving/router.md new file mode 100644 index 0000000000..400e44e9c3 --- /dev/null +++ b/docs/online_serving/router.md @@ -0,0 +1,247 @@ +[简体中文](../zh/online_serving/router.md) + +# Load-Balancing Scheduling Router + +FastDeploy provides a Golang-based [Router](https://github.com/PaddlePaddle/FastDeploy/tree/develop/fastdeploy/golang_router) for request scheduling. The Router supports both centralized deployment and Prefill/Decode (PD) disaggregated deployment.。 + +## Installation + +### 1. Prebuilt Binaries + +Starting from FastDeploy v2.5.0, the official Docker images include the Go language environment required to build the Golang Router and also provide a precompiled Router binary. The Router binary is located by default in the `/usr/local/bin` directory and can be used directly without additional compilation. For installation details, please refer to the [FastDeploy Installation Guide](../get_started/installation/nvidia_gpu.md) + +### 2. Build from Source + +You need to build the Router from source in the following scenarios: + +* The official Docker image is not used +* FastDeploy version is earlier than v2.5.0 +* Custom modifications to the Router are required + +Environment Requirements: + +* Go >= 1.21 + +Clone the FastDeploy repository and build the Router: +``` +git clone https://github.com/PaddlePaddle/FastDeploy.git +cd FastDeploy/fastdeploy/golang_router +bash build.sh +``` + +## Centralized Deployment + +Start the Router service. The `--port` parameter specifies the scheduling port for centralized deployment. +``` +./fd-router --port 30000 +``` + +Start a mixed inference instance. Compared to standalone deployment, specify the Router endpoint via `--router`. Other parameters remain unchanged. +``` +export CUDA_VISIBLE_DEVICES=0 +export FD_LOG_DIR="log_mixed" +python -m fastdeploy.entrypoints.openai.api_server \ + --model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \ + --port 31000 \ + --router "0.0.0.0:30000" +``` + +## PD Disaggregated Deployment + +Start the Router service with PD disaggregation enabled using the `--splitwise` flag. +``` +./fd-router \ + --port 30000 \ + --splitwise +``` + +Launch a prefill instance. Compared with standalone deployment, add the `--splitwise-role` parameter to specify the instance role as Prefill, and add the `--router` parameter to specify the Router endpoint. All other parameters remain the same as in standalone deployment. +``` +export CUDA_VISIBLE_DEVICES=0 +export FD_LOG_DIR="log_prefill" +python -m fastdeploy.entrypoints.openai.api_server \ + --model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \ + --port 31000 \ + --splitwise-role prefill \ + --router "0.0.0.0:30000" +``` + +Launch a decode instance. +``` +export CUDA_VISIBLE_DEVICES=1 +export FD_LOG_DIR="log_decode" +python -m fastdeploy.entrypoints.openai.api_server \ + --model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \ + --port 32000 \ + --splitwise-role decode \ + --router "0.0.0.0:30000" +``` + +Once both Prefill and Decode instances are successfully launched and registered with the Router, requests can be sent: +``` +curl -X POST "http://0.0.0.0:30000/v1/chat/completions" \ +-H "Content-Type: application/json" \ +-d '{ + "messages": [ + {"role": "user", "content": "hello"} + ], + "max_tokens": 100, + "stream": false +}' +``` + +For more details on PD disaggregated deployment, please refer to the [Usage Guide](../features/disaggregated.md) + +## CacheAware + +The Load-Balancing Scheduling Router supports the CacheAware strategy, mainly applied to PD separation deployment to optimize request allocation and improve cache hit rate. + +To use the CacheAware strategy, default configurations need to be modified. You can copy the configuration template and make adjustments (an example is available at [Router](https://github.com/PaddlePaddle/FastDeploy/tree/develop/fastdeploy/golang_router) directory under examples/run_with_config): +```bash +pushd examples/run_with_config +cp config/config.example.yaml config/config.yaml +popd +``` + +Launch the Router with the custom configuration specified via `--config_path`: +``` +./fd-router \ + --port 30000 \ + --splitwise \ + --config_path examples/run_with_config/config/config.yaml +``` + +Prefill and Decode instance startup are the same as PD disaggregated deployment. + +Launch the prefill instance. +``` +export CUDA_VISIBLE_DEVICES=0 +export FD_LOG_DIR="log_prefill" +python -m fastdeploy.entrypoints.openai.api_server \ + --model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \ + --port 31000 \ + --splitwise-role prefill \ + --router "0.0.0.0:30000" +``` + +Launch the decode instance. +``` +export CUDA_VISIBLE_DEVICES=1 +export FD_LOG_DIR="log_decode" +python -m fastdeploy.entrypoints.openai.api_server \ + --model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \ + --port 32000 \ + --splitwise-role decode \ + --router "0.0.0.0:30000" +``` + +## HTTP Service Description + +The Router exposes a set of HTTP services to provide unified request scheduling, runtime health checking, and monitoring metrics, facilitating integration and operations. + +| Method | Path | Description | +|----------|------|------| +| POST | `/v1/chat/completions` | Provide scheduling services for inference requests based on the Chat Completions API | +| POST | `/v1/completions` | Provide scheduling services for general text completion inference requests | +| POST | `/register` | Allow inference instances to register their metadata with the Router for scheduling | +| GET | `/registered` | Query the list of currently registered inference instances | +| GET | `/registered_number` | Query the number of currently registered inference instances | +| GET | `/health_generate` | Check the health status of registered Prefill / Decode inference instances | +| GET | `/metrics` | Provide Prometheus-formatted Router runtime metrics for monitoring and observability | + +## Deployment Parameters + +### Router Startup Parameters + +* --port: Specify the Router scheduling port. +* --splitwise: Enable PD disaggregated scheduling mode. +* --config_path: Specify the Router configuration file path for loading custom scheduling and runtime parameters. + +### Configuration File Preparation + +Before using `--config_path`, prepare a configuration file that conforms to the Router specification. +The configuration file is typically written in YAML format. For detailed parameters, refer to [Configuration Parameteres](#configuration-parameteres)。You may copy and modify the configuration template (example available at examples/run_with_config): +```bash +cp config/config.example.yaml config/config.yaml +``` + +The Load-Balancing Scheduling Router also supports registering inference instances through configuration files at startup (example available at examples/run_with_default_workers): +```bash +cp config/config.example.yaml config/config.yaml +cp config/register.example.yaml config/register.yaml +``` + +### Configuration Parameteres + +config.yaml example: +```yaml +server: + port: "8080" # Listening port + host: "0.0.0.0" # Listening address + mode: "debug" # Startup mode: debug, release, test + splitwise: true # true enables PD disaggregation; false disables it + +scheduler: + policy: "power_of_two" # Scheduling policy (optional): random, power_of_two, round_robin, process_tokens, request_num, cache_aware, fd_metrics_score + prefill-policy: "cache_aware" # Prefill scheduling policy in PD mode + decode-policy: "fd_metrics_score" # Decode scheduling policy in PD mode + eviction-interval-secs: 60 # Cache eviction interval for CacheAware scheduling + balance-abs-threshold: 1 # Absolute threshold for CacheAware balancing + balance-rel-threshold: 0.2 # Relative threshold for CacheAware balancing + hit-ratio-weight: 1.0 # Cache hit ratio weight + load-balance-weight: 0.05 # Load balancing weight + cache-block-size: 4 # Cache block size + tokenizer-url: "http://0.0.0.0:8098" # Tokenizer service endpoint (optional) + tokenizer-timeout-secs: 2 # Tokenizer service timeout + waiting-weight: 10 # Waiting weight for CacheAware scheduling + +manager: + health-failure-threshold: 3 # Number of failed health checks before marking unhealthy + health-success-threshold: 2 # Number of successful health checks before marking healthy + health-check-timeout-secs: 5 # Health check timeout + health-check-interval-secs: 5 # Health check interval + health-check-endpoint: /health # Health check endpoint + register-path: "config/register.yaml" # Path to instance registration config (optional) + +log: + level: "info" # Log level: debug / info / warn / error + output: "file" # Log output: stdout / file +``` + +register.yaml example: +```yaml +instances: + - role: "prefill" + host_ip: 127.0.0.1 + port: 8097 + connector_port: 8001 + engine_worker_queue_port: 8002 + transfer_protocol: + - ipc + - rdma + rdma_ports: [7100, "7101"] + device_ids: [0, "1"] + metrics_port: 8003 + - role: "decode" + host_ip: 127.0.0.1 + port: 8098 + connector_port: 8001 + engine_worker_queue_port: 8002 + transfer_protocol: ["ipc","rdma"] + rdma_ports: ["7100", "7101"] + device_ids: ["0", "1"] +``` + +Instance Registration Parameters: + +* role: Instance role, one of: decode, prefill, mixed. +* host_ip: IP address of the inference instance host. +* port: Service port of the inference instance. +* connector_port: Connector port used for PD communication. +* engine_worker_queue_port: Shared queue communication port within the inference instance. +* transfer_protocol: Specify KV Cache transfer protocol, optional values: ipc / rdma, multiple protocols separated by commas +* rdma_ports: Specify RDMA communication ports, multiple ports separated by commas (only takes effect when transfer_protocol contains rdma) +* device_ids: GPU device IDs of the inference instance, multiple IDs separated by commas +* metrics_port: Port number of the inference instance's metrics + +Among these, `role`, `host_ip`, and `port` are required; all other parameters are optional. diff --git a/docs/zh/online_serving/router.md b/docs/zh/online_serving/router.md new file mode 100644 index 0000000000..86828b0c9d --- /dev/null +++ b/docs/zh/online_serving/router.md @@ -0,0 +1,247 @@ +[English](../../online_serving/router.md) + +# 负载均衡调度Router + +FastDeploy提供Golang版本[Router](https://github.com/PaddlePaddle/FastDeploy/tree/develop/fastdeploy/golang_router),用于实现请求的调度。Router支持集中式部署和PD分离式部署。 + +## 安装 + +### 1. 预编译库下载 + +在 FastDeploy v2.5.0 及之后版本中,官方 Docker 镜像将内置 Golang Router 编译所需的 Go 语言环境,并提供已编译完成的 Router 二进制文件。该二进制文件默认位于 `/usr/local/bin` 目录下,可直接使用。相关安装方式可参考 [FastDeploy 安装文档](../get_started/installation/nvidia_gpu.md) + +### 2. 编译安装 + +在以下场景中,需要从源码编译 Router: + +* 未使用官方 Docker 镜像 +* FastDeploy 版本早于 v2.5.0 +* 需要对 Router 进行定制化修改 + +环境要求: + +* Go >= 1.21 + +拉取FastDeploy最新代码,编译安装: +``` +git clone https://github.com/PaddlePaddle/FastDeploy.git +cd FastDeploy/fastdeploy/golang_router +bash build.sh +``` + +## 集中式部署 + +启动Router服务,其中`--port`参数指定集中式部署的调度端口. +``` +./fd-router \ + --port 30000 +``` + +启动mixed实例。对比单机部署,增加`--router`参数指定Router的接口,其他参数和单机部署相同。 +``` +export CUDA_VISIBLE_DEVICES=0 +export FD_LOG_DIR="log_mixed" +python -m fastdeploy.entrypoints.openai.api_server \ + --model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \ + --port 31000 \ + --router "0.0.0.0:30000" +``` + +## PD分离部署 + +启动Router服务,其中`--splitwise`参数指定为分离式部署的调度方式. +``` +./fd-router \ + --port 30000 \ + --splitwise +``` + +启动Prefill实例。对比单机部署,增加`--splitwise-role`参数指定实例角色为Prefill,增加`--router`参数指定Router的接口,其他参数和单机部署相同。 +``` +export CUDA_VISIBLE_DEVICES=0 +export FD_LOG_DIR="log_prefill" +python -m fastdeploy.entrypoints.openai.api_server \ + --model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \ + --port 31000 \ + --splitwise-role prefill \ + --router "0.0.0.0:30000" +``` + +启动Decode实例。 +``` +export CUDA_VISIBLE_DEVICES=1 +export FD_LOG_DIR="log_decode" +python -m fastdeploy.entrypoints.openai.api_server \ + --model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \ + --port 32000 \ + --splitwise-role decode \ + --router "0.0.0.0:30000" +``` + +Prefill和Decode实例启动成功,并且向Router注册成功后,可以发送请求。 +``` +curl -X POST "http://0.0.0.0:30000/v1/chat/completions" \ +-H "Content-Type: application/json" \ +-d '{ + "messages": [ + {"role": "user", "content": "hello"} + ], + "max_tokens": 100, + "stream": false +}' +``` + +具体的 PD 分离式部署方案,请参考[使用文档](../features/disaggregated.md) + +## CacheAware + +负载均衡调度Router支持CacheAware策略,主要应用于 PD 分离部署,以优化请求分配,提高缓存命中率。 + +使用CacheAware策略需修改默认配置,可复制配置模板并进行调整(示例可参考[Router](https://github.com/PaddlePaddle/FastDeploy/tree/develop/fastdeploy/golang_router)目录下的examples/run_with_config): +```bash +pushd examples/run_with_config +cp config/config.example.yaml config/config.yaml +popd +``` + +在Router启动Router服务,其中`--config_path`参数指定配置路径. +``` +./fd-router \ + --port 30000 \ + --splitwise \ + --config_path examples/run_with_config/config/config.yaml +``` + +Prefill和Decode实例启动同PD分离部署。 + +启动Prefill实例。 +``` +export CUDA_VISIBLE_DEVICES=0 +export FD_LOG_DIR="log_prefill" +python -m fastdeploy.entrypoints.openai.api_server \ + --model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \ + --port 31000 \ + --splitwise-role prefill \ + --router "0.0.0.0:30000" +``` + +启动Decode实例。 +``` +export CUDA_VISIBLE_DEVICES=1 +export FD_LOG_DIR="log_decode" +python -m fastdeploy.entrypoints.openai.api_server \ + --model "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" \ + --port 32000 \ + --splitwise-role decode \ + --router "0.0.0.0:30000" +``` + +## HTTP服务说明 + +Router 通过 HTTP 接口对外提供统一的调度服务,同时支持运行状态探测与监控指标暴露,便于集成与运维。 + +| 方法 | 路径 | 说明 | +|----------|------|------| +| POST | `/v1/chat/completions` | 对外提供基于 Chat 接口的推理请求调度服务 | +| POST | `/v1/completions` | 对外提供通用文本补全请求的调度服务 | +| POST | `/register` | 推理实例向 Router 注册自身信息,用于参与调度 | +| GET | `/registered` | 查询当前已注册的推理实例列表 | +| GET | `/registered_number` | 查询当前已注册的推理实例数量 | +| GET | `/health_generate` | 检查已注册 Prefill / Decode 推理实例的健康状态 | +| GET | `/metrics` | 提供 Prometheus 格式的 Router 运行指标,用于监控与观测 | + +## 部署参数说明 + +### Router启动参数说明 + +* --port: 指定Router的调度端口。 +* --splitwise: 指定为PD分离式部署的调度方式。 +* --config_path: 指定Router配置文件路径,用于加载自定义调度与运行参数。 + +### 配置文件准备 + +在使用 `--config_path` 参数前,请准备符合 Router 规范的配置文件。配置文件通常以 YAML 形式存在,具体参考[配置参数说明](#配置参数说明)。可复制配置模板并进行调整(示例可参考 examples/run_with_config): +```bash +cp config/config.example.yaml config/config.yaml +``` + +负载均衡调度Router还支持通过配置文件在启动阶段注册推理实例(示例可参考 examples/run_with_default_workers): +```bash +cp config/config.example.yaml config/config.yaml +cp config/register.example.yaml config/register.yaml +``` + +### 配置参数说明 + +config.yaml 示例: +```yaml +server: + port: "8080" # 监听端口 + host: "0.0.0.0" # 监听地址 + mode: "debug" # 启动模式: debug, release, test + splitwise: true # true代表开启pd分离模式,false代表开启非pd分离模式 + +scheduler: + policy: "power_of_two" # 调度策略(可选): random, power_of_two, round_robin, process_tokens, request_num, cache_aware, fd_metrics_score + prefill-policy: "cache_aware" # pd分离模式下prefill节点调度策略 + decode-policy: "fd_metrics_score" # pd分离模式下decode节点调度策略 + eviction-interval-secs: 60 # cache-aware策略清理过期cache的间隔时间 + balance-abs-threshold: 1 # cache-aware策略绝对阈值 + balance-rel-threshold: 0.2 # cache-aware策略相对阈值 + hit-ratio-weight: 1.0 # cache-aware策略命中率权重 + load-balance-weight: 0.05 # cache-aware策略负载均衡权重 + cache-block-size: 4 # cache-aware策略cache block大小 + tokenizer-url: "http://0.0.0.0:8098" # tokenizer服务地址(可选) + tokenizer-timeout-secs: 2 # tokenizer服务超时时间 + waiting-weight: 10 # cache-aware策略等待权重 + +manager: + health-failure-threshold: 3 # 健康检查失败次数,超过次数后认为节点不健康 + health-success-threshold: 2 # 健康检查成功次数,超过次数后认为节点健康 + health-check-timeout-secs: 5 # 健康检查超时时间 + health-check-interval-secs: 5 # 健康检查间隔时间 + health-check-endpoint: /health # 健康检查接口 + register-path: "config/register.yaml" # 推理实例注册配置文件路径(可选) + +log: + level: "info" # 日志打印级别: debug / info / warn / error + output: "file" # 日志输出方式: stdout / file +``` + +register.yaml 示例: +```yaml +instances: + - role: "prefill" + host_ip: 127.0.0.1 + port: 8097 + connector_port: 8001 + engine_worker_queue_port: 8002 + transfer_protocol: + - ipc + - rdma + rdma_ports: [7100, "7101"] + device_ids: [0, "1"] + metrics_port: 8003 + - role: "decode" + host_ip: 127.0.0.1 + port: 8098 + connector_port: 8001 + engine_worker_queue_port: 8002 + transfer_protocol: ["ipc","rdma"] + rdma_ports: ["7100", "7101"] + device_ids: ["0", "1"] +``` + +注册实例参数说明: + +* role: 实例角色,可选值 decode / prefill / mixed。 +* host_ip: 推理实例所在机器的IP地址。 +* port: 推理实例的端口号。 +* connector_port: 推理实例指定pd通信的端口。 +* engine_worker_queue_port: 推理实例内部的共享队列通信端口。 +* transfer_protocol:指定KV Cache传输协议,可选值 ipc / rdma,多个协议用逗号分隔。 +* rdma_ports: 指定RDMA通信端口,多个端口用逗号隔开(仅当transfer_protocol包含rdma时生效)。 +* device_ids: 推理实例的GPU设备ID,多个设备ID用逗号隔开。 +* metrics_port: 推理实例的metrics端口号。 + +其中 `role`、`host_ip` 和 `port` 为必填参数,其余参数为可选。 diff --git a/examples/router/start_mixed_go_router.sh b/examples/router/start_mixed_go_router.sh new file mode 100644 index 0000000000..76067dc9f7 --- /dev/null +++ b/examples/router/start_mixed_go_router.sh @@ -0,0 +1,109 @@ +#!/bin/bash +set -e + +# Test mixed server + router + +# prepare environment +export MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" +export FD_DEBUG=1 + +unset http_proxy && unset https_proxy +rm -rf log_* +source ./utils.sh + +S1_PORT=52400 +S2_PORT=52500 +ROUTER_PORT=52600 + +FD_BIN_DIR="/usr/local/bin" +FD_ROUTER_BIN="${FD_BIN_DIR}/fd-router" +FD_ROUTER_URL="https://paddle-qa.bj.bcebos.com/FastDeploy/fd-router" +FD_ROUTER_SHA256="67640aaeebdd886826d3534930b2154cd2c1441a26bc3f38c3af5f0aadba7c2d" + +ports=( + $S1_PORT $((S1_PORT + 1)) $((S1_PORT + 2)) $((S1_PORT + 3)) + $S2_PORT $((S2_PORT + 1)) $((S2_PORT + 2)) $((S2_PORT + 3)) + $ROUTER_PORT +) +check_ports "${ports[@]}" || { + echo "❌ Some ports are in use. Please release them." + exit 1 +} + +# check fd-router binary +if [ ! -x "${FD_ROUTER_BIN}" ]; then + echo "⚠️ fd-router not found, downloading..." + + mkdir -p "${FD_BIN_DIR}" + TMP_BIN="${FD_ROUTER_BIN}.tmp" + + wget -q --no-proxy "${FD_ROUTER_URL}" -O "${TMP_BIN}" || exit 1 + + echo "${FD_ROUTER_SHA256} ${TMP_BIN}" | sha256sum -c - || { + echo "❌ Integrity check failed" + rm -f "${TMP_BIN}" + exit 1 + } + + mv "${TMP_BIN}" "${FD_ROUTER_BIN}" + chmod +x "${FD_ROUTER_BIN}" + + echo "fd-router installed and verified" +else + echo "fd-router already exists" +fi + +# start router +export FD_LOG_DIR="log_router" +mkdir -p ${FD_LOG_DIR} + +nohup /usr/local/bin/fd-router \ + --port ${ROUTER_PORT} \ + 2>&1 >${FD_LOG_DIR}/nohup & + +# start modelserver 0 +export CUDA_VISIBLE_DEVICES=0 +export FD_LOG_DIR="log_server_0" +mkdir -p ${FD_LOG_DIR} + +nohup python -m fastdeploy.entrypoints.openai.api_server \ + --model ${MODEL_NAME} \ + --port ${S1_PORT} \ + --metrics-port $((S1_PORT + 1)) \ + --engine-worker-queue-port $((S1_PORT + 2)) \ + --cache-queue-port $((S1_PORT + 3)) \ + --max-model-len 32768 \ + --router "0.0.0.0:${ROUTER_PORT}" \ + 2>&1 >${FD_LOG_DIR}/nohup & + +wait_for_health ${S1_PORT} + +# start modelserver 1 +export CUDA_VISIBLE_DEVICES=1 +export FD_LOG_DIR="log_server_1" +mkdir -p ${FD_LOG_DIR} + +nohup python -m fastdeploy.entrypoints.openai.api_server \ + --model ${MODEL_NAME} \ + --port ${S2_PORT} \ + --metrics-port $((S2_PORT + 1)) \ + --engine-worker-queue-port $((S2_PORT + 2)) \ + --cache-queue-port $((S2_PORT + 3)) \ + --max-model-len 32768 \ + --router "0.0.0.0:${ROUTER_PORT}" \ + 2>&1 >${FD_LOG_DIR}/nohup & + +wait_for_health ${S2_PORT} + +# send request +sleep 10 # make sure server is registered to router +echo "send request..." +curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \ +-H "Content-Type: application/json" \ +-d '{ + "messages": [ + {"role": "user", "content": "hello"} + ], + "max_tokens": 20, + "stream": false +}' diff --git a/examples/router/start_mixed_python_router.sh b/examples/router/start_mixed_python_router.sh new file mode 100644 index 0000000000..37699caeb2 --- /dev/null +++ b/examples/router/start_mixed_python_router.sh @@ -0,0 +1,81 @@ +#!/bin/bash +set -e + +# Test mixed server + router + +# prepare environment +export MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" +export FD_DEBUG=1 + +unset http_proxy && unset https_proxy +rm -rf log_* +source ./utils.sh + +S1_PORT=52400 +S2_PORT=52500 +ROUTER_PORT=52600 + +ports=( + $S1_PORT $((S1_PORT + 1)) $((S1_PORT + 2)) $((S1_PORT + 3)) + $S2_PORT $((S2_PORT + 1)) $((S2_PORT + 2)) $((S2_PORT + 3)) + $ROUTER_PORT +) +check_ports "${ports[@]}" || { + echo "❌ Some ports are in use. Please release them." + exit 1 +} + +# start router +export FD_LOG_DIR="log_router" +mkdir -p ${FD_LOG_DIR} + +nohup python -m fastdeploy.router.launch \ + --port ${ROUTER_PORT} \ + 2>&1 >${FD_LOG_DIR}/nohup & + +# start modelserver 0 +export CUDA_VISIBLE_DEVICES=0 +export FD_LOG_DIR="log_server_0" +mkdir -p ${FD_LOG_DIR} + +nohup python -m fastdeploy.entrypoints.openai.api_server \ + --model ${MODEL_NAME} \ + --port ${S1_PORT} \ + --metrics-port $((S1_PORT + 1)) \ + --engine-worker-queue-port $((S1_PORT + 2)) \ + --cache-queue-port $((S1_PORT + 3)) \ + --max-model-len 32768 \ + --router "0.0.0.0:${ROUTER_PORT}" \ + 2>&1 >${FD_LOG_DIR}/nohup & + +wait_for_health ${S1_PORT} + +# start modelserver 1 +export CUDA_VISIBLE_DEVICES=1 +export FD_LOG_DIR="log_server_1" +mkdir -p ${FD_LOG_DIR} + +nohup python -m fastdeploy.entrypoints.openai.api_server \ + --model ${MODEL_NAME} \ + --port ${S2_PORT} \ + --metrics-port $((S2_PORT + 1)) \ + --engine-worker-queue-port $((S2_PORT + 2)) \ + --cache-queue-port $((S2_PORT + 3)) \ + --max-model-len 32768 \ + --router "0.0.0.0:${ROUTER_PORT}" \ + 2>&1 >${FD_LOG_DIR}/nohup & + +wait_for_health ${S2_PORT} + +# send request +sleep 10 # make sure server is registered to router +echo "send request..." +curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \ +-H "Content-Type: application/json" \ +-d '{ + "messages": [ + {"role": "user", "content": "hello"} + ], + "max_tokens": 20, + "stream": false +}' diff --git a/examples/splitwise/start_v2_tp1.sh b/examples/router/start_pd_go_router.sh similarity index 100% rename from examples/splitwise/start_v2_tp1.sh rename to examples/router/start_pd_go_router.sh diff --git a/examples/router/start_pd_python_router.sh b/examples/router/start_pd_python_router.sh new file mode 100644 index 0000000000..ec8e03bb1d --- /dev/null +++ b/examples/router/start_pd_python_router.sh @@ -0,0 +1,73 @@ +#!/bin/bash +set -e + +# prepare environment +export MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" +export FD_DEBUG=1 + +SCRIPT_PATH=$(readlink -f "$0") +SCRIPT_DIR=$(dirname "$SCRIPT_PATH") +source ${SCRIPT_DIR}/utils.sh + +unset http_proxy && unset https_proxy + +P_PORT=52400 +D_PORT=52500 +ROUTER_PORT=52700 +LOG_DATE=$(date +%Y%m%d_%H%M%S) + +ports=($P_PORT $D_PORT $ROUTER_PORT) +check_ports "${ports[@]}" || { + echo "❌ Some ports are in use. Please release them." + exit 1 +} + +# start router +export FD_LOG_DIR="log/$LOG_DATE/router" +rm -rf ${FD_LOG_DIR} && mkdir -p ${FD_LOG_DIR} + +nohup python -m fastdeploy.router.launch \ + --port ${ROUTER_PORT} \ + --splitwise \ + 2>&1 >${FD_LOG_DIR}/nohup & + +# start prefill +export CUDA_VISIBLE_DEVICES=0 +export FD_LOG_DIR="log/$LOG_DATE/prefill" +rm -rf ${FD_LOG_DIR} && mkdir -p ${FD_LOG_DIR} + +nohup python -m fastdeploy.entrypoints.openai.api_server \ + --model ${MODEL_NAME} \ + --port "${P_PORT}" \ + --splitwise-role "prefill" \ + --router "0.0.0.0:${ROUTER_PORT}" \ +2>&1 >${FD_LOG_DIR}/nohup & + +wait_for_health ${P_PORT} + +# start decode +export CUDA_VISIBLE_DEVICES=1 +export FD_LOG_DIR="log/$LOG_DATE/decode" +rm -rf ${FD_LOG_DIR} && mkdir -p ${FD_LOG_DIR} + +nohup python -m fastdeploy.entrypoints.openai.api_server \ + --model ${MODEL_NAME} \ + --port "${D_PORT}" \ + --splitwise-role "decode" \ + --router "0.0.0.0:${ROUTER_PORT}" \ +2>&1 >${FD_LOG_DIR}/nohup & + +wait_for_health ${D_PORT} + +# send request +sleep 10 # make sure server is registered to router +echo "send request..." +curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \ +-H "Content-Type: application/json" \ +-d '{ + "messages": [ + {"role": "user", "content": "hello"} + ], + "max_tokens": 100, + "stream": false +}' diff --git a/examples/router/utils.sh b/examples/router/utils.sh new file mode 100644 index 0000000000..31ef8fc5b6 --- /dev/null +++ b/examples/router/utils.sh @@ -0,0 +1,99 @@ +#!/bin/bash + +is_port_free() { + local port=$1 + if ss -ltun | awk '{print $4}' | grep -q ":${port}$"; then + return 1 # Port is occupied + fi + return 0 # Port is free +} + +check_ports() { + for port in "$@"; do + if ! is_port_free $port; then + echo "❌ Port $port is already in use" + return 1 + fi + done + return 0 +} + +wait_for_health() { + IFS=',' read -r -a server_ports <<< "$1" + local num_ports=${#server_ports[@]} + local total_lines=$((num_ports + 1)) + local first_run=true + local GREEN='\033[0;32m' + local RED='\033[0;31m' + local NC='\033[0m' # No Color + local start_time=$(date +%s) + + echo "-------- WAIT FOR HEALTH --------" + while true; do + local all_ready=true + for port in "${server_ports[@]}"; do + status_code=$(curl -s --max-time 1 -o /dev/null -w "%{http_code}" "http://0.0.0.0:${port}/health" || echo "000") + if [ "$status_code" -eq 200 ]; then + printf "Port %s: ${GREEN}[OK] 200${NC}\033[K\n" "$port" + else + all_ready=false + printf "Port %s: ${RED}[WAIT] %s${NC}\033[K\n" "$port" "$status_code" + fi + done + cur_time=$(date +%s) + if [ "$all_ready" = "true" ]; then + echo "All services are ready! [$((cur_time-start_time))s]" + break + else + echo "Services not ready.. [$((cur_time-start_time))s]" + printf "\033[%dA" "$total_lines" # roll back cursor + sleep 1 + fi + done + echo "---------------------------------" +} + +get_free_ports() { + free_ports_num=${1:-1} + start_port=${2:-8000} + end_port=${3:-9000} + + free_ports=() + if [[ ! -n ${free_ports_num} || "${free_ports_num}" -le 0 ]]; then + log_warn "param can't be empty, and should > 0" + echo ${free_ports[@]} + return 1 + fi + + used_ports1=$(netstat -an | grep -E "(0.0.0.0|127.0.0.1|${POD_IP}|tcp6)" | awk '{n=split($4,a,":"); if(a[n]~/^[0-9]+$/) print a[n];}' | sort -u) + used_ports2=$(netstat -an | grep -E "(0.0.0.0|127.0.0.1|${POD_IP}|tcp6)" | awk '{n=split($5,a,":"); if(a[n]~/^[0-9]+$/) print a[n];}' | sort -u) + all_used_ports=$(printf "%s\n" "${used_ports1}" "${used_ports2}" | sort -u) + + # Generate random number between 0 and 32767 + random_num=$(( RANDOM )) + port=$(( random_num % (end_port - start_port + 1) + start_port )) + + while true; do + (( port++ )) + if [[ ${port} -ge ${end_port} ]]; then + port=${start_port} + fi + + if [[ "${all_used_ports[@]}" =~ "${port}" ]]; then + continue + fi + + if is_port_free ${port}; then + free_ports+=("${port}") + (( free_ports_num-- )) + if [[ ${free_ports_num} = 0 ]]; then + break + fi + fi + + done + + # echo ${free_ports[@]} + IFS=',' && echo "${free_ports[*]}" + return 0 +} diff --git a/fastdeploy/golang_router/README_CN.md b/fastdeploy/golang_router/README_CN.md deleted file mode 100644 index a40d03227f..0000000000 --- a/fastdeploy/golang_router/README_CN.md +++ /dev/null @@ -1,184 +0,0 @@ -# Golang-Router -## 关于 -【正在开发迭代中】 -Golang-Router 是一个面向大语言模型推理系统的高性能 Golang 路由框架,作为系统的**控制与调度平面**运行,负责请求接入、实例选择与流量转发,设计上适配 Prefill–Decode(PD)分离推理架构。 - -Golang-Router 可独立部署运行,也可通过 HTTP 接口与 FastDeploy 推理实例协同工作。框架提供基础而稳定的路由、中间件扩展与健康检查能力,适用于单点推理部署场景,并在架构层面为后续的水平扩展与调度能力演进预留空间。 - -### 背景与动机 -在大语言模型推理系统中,路由组件已从传统的流量转发层演进为影响系统性能与资源利用效率的关键基础设施。随着 Prefill–Decode 分离推理架构的广泛采用,不同推理阶段在计算特征、显存占用与缓存行为方面呈现出明显差异,仅依赖请求级静态信息进行调度已难以满足稳定性与效率需求。 - -在保持请求级调度模型不变的前提下,引入更细粒度的运行时信号辅助调度决策,成为提升调度能力与系统可预测性的工程共识。Golang-Router 正是在这一背景下构建,作为独立的路由与调度组件,为推理系统提供清晰、可扩展的控制平面。 - -### 设计目标 -Golang-Router 聚焦解决以下核心问题: -- **调度决策信息不足** - 传统 Router 通常仅基于请求级元信息或粗粒度实例状态进行调度,难以利用推理过程中产生的细粒度缓存相关信号,从而限制了 cache-aware 策略的实际效果。 -- **调度逻辑与推理执行强耦合** - 路由与调度逻辑内嵌于推理框架内部,增加了系统复杂度,限制了调度策略的独立演进与复用能力。 -- **高并发场景下的可扩展性挑战** - 在高并发推理负载下,实例状态维护与实例选择逻辑对路由组件的并发模型、性能与稳定性提出更高要求。 - -### 核心特性 -- 基于 Golang 实现的高性能路由与调度组件,适用于高并发、低延迟推理场景 -- 请求级调度模型,保持接口语义清晰与系统复杂度可控 -- 利用token级缓存相关运行时信息作为调度策略的辅助输入,用于提升实例选择的准确性与稳定性 -- 模块化架构设计(Gateway / Scheduler / Manager),职责边界清晰,便于扩展与维护 -- 面向 Prefill–Decode 分离推理架构设计,为复杂调度策略与能力演进提供结构性支持 - -### 与现有方案的差异 -与 sglang 等推理框架内置 Router 相比,Golang-Router 以**独立 Golang 服务**的形式运行,将路由、调度与实例状态管理能力从推理执行逻辑中解耦。 - -Golang-Router 已支持 cache-aware 调度,在请求级调度框架内引入 token 级缓存相关运行时信号,辅助调度决策制定,以更稳定地适配 Prefill–Decode 分离推理架构下的缓存利用需求。 - -## 功能特性 - -- 高性能 HTTP/HTTPS 服务器 -- RESTful API 路由支持 -- 可扩展的中间件系统 -- 动态配置管理 -- 内置健康检查和监控 -- 负载均衡 -- 日志记录和指标收集 - -## 快速开始 - -### 前置要求 - -- Go 1.21 -- 构建不依赖特定系统环境 -- 可直接在 FastDeploy 官方 Docker 环境中编译与运行 - -### 编译 - -```bash -./build.sh -``` - -### 配置 - -1. 配置文件准备(可选) -如需修改默认配置,可复制配置模板并进行调整(示例可参考 examples/run_with_config): - -```bash -cp config/config.example.yaml config/config.yaml -``` - -2. 主要配置项说明: - -```yaml -server: - port: "8080" # 监听端口 - host: "0.0.0.0" # 监听地址 - mode: "debug" # 启动模式: debug, release, test - splitwise: true # true代表开启pd分离模式,false代表开启非pd分离模式 - -scheduler: - policy: "power_of_two" # 调度策略(可选): random, power_of_two, round_robin, process_tokens, request_num - prefill-policy: "cache_aware" # pd分离模式下prefill节点调度策略 - decode-policy: "fd_metrics_score" # pd分离模式下decode节点调度策略 - eviction-interval-secs: 60 # cache-aware策略清理过期cache的间隔时间 - balance-abs-threshold: 1 # cache-aware策略绝对阈值 - balance-rel-threshold: 0.2 # cache-aware策略相对阈值 - hit-ratio-weight: 1.0 # cache-aware策略命中率权重 - load-balance-weight: 0.05 # cache-aware策略负载均衡权重 - cache-block-size: 4 # cache-aware策略cache block大小 - tokenizer-url: "http://0.0.0.0:8098" # tokenizer服务地址(可选) - tokenizer-timeout-secs: 2 # tokenizer服务超时时间 - waiting-weight: 10 # cache-aware策略等待权重 - -manager: - health-failure-threshold: 3 # 健康检查失败次数,超过次数后认为节点不健康 - health-success-threshold: 2 # 健康检查成功次数,超过次数后认为节点健康 - health-check-timeout-secs: 5 # 健康检查超时时间 - health-check-interval-secs: 5 # 健康检查间隔时间 - health-check-endpoint: /health # 健康检查接口 - register-path: "config/register.yaml" # 推理实例注册配置文件路径(可选) - -log: - level: "info" # 日志打印级别: debug / info / warn / error - output: "file" # 日志输出方式: stdout / file - -``` - -3. 启动时注册实例(可选) -支持通过配置文件在启动阶段注册推理实例(示例可参考 examples/run_with_default_workers): - -```bash -cp config/config.example.yaml config/config.yaml -cp config/register.example.yaml config/register.yaml -``` - -### 运行 -本项目支持两种运行方式:直接运行源码 或 构建二进制文件后运行。 -方式一:直接运行源码 -在项目根目录下,使用 go run 启动服务: -```bash -go run cmd/main.go -``` -该方式适用于本地开发与调试场景。 -方式二:构建并运行二进制文件 -1. 构建二进制文件 -通过构建脚本生成可执行文件: -```bash -./build.sh -``` -构建完成后,二进制文件将被安装到指定目录(默认为 /usr/local/bin,可通过修改 Makefile 中的 OUTDIR 进行调整)。 -此外,也可以在项目根目录下手动构建二进制文件: -```bash -go build -o ./fd-router ./cmd -``` -该方式便于本地测试或将二进制文件与配置文件一并分发。 -2. 运行二进制文件 -可以通过运行脚本启动服务: -```bash -./run.sh -``` -运行脚本会自动处理常见启动参数及日志目录,适合标准化部署场景。 -也可以直接运行二进制文件,在项目根目录或二进制所在目录下执行: -```bash -./fd-router \ - --port 8080 \ - --splitwise \ - --config_path ./config/config.yaml -``` -其中: -- --port 为必填参数 -- 其他参数可根据实际需求配置 - -## 项目结构 - -``` -. -├── cmd/ # 主程序入口 -├── config/ # 配置文件 -├── internal/ # 核心实现代码 -│ ├── common/ # 公共接口定义 -│ ├── config/ # 配置处理 -│ ├── gateway/ # API网关实现 -│ ├── manager/ # 路由管理 -│ ├── middleware/ # 中间件实现 -│ ├── router/ # 路由核心逻辑 -│ └── scheduler/ # 调度器实现 -├── logs/ # 日志目录 -├── output/ # 构建输出 -├── pkg/ # 可复用组件 -│ ├── logger/ # 日志组件 -│ └── metrics/ # 监控指标 -├── build.sh # 构建脚本 -├── go.mod # Go模块定义 -├── go.sum # 依赖校验 -├── Makefile # 构建管理 -├── README.md # 项目说明 -└── run.sh # 启动脚本 -``` - -### 运行测试 - -```bash -make test -``` - -## 贡献 - -欢迎提交 Issue 和 Pull Request! diff --git a/fastdeploy/golang_router/cmd/main.go b/fastdeploy/golang_router/cmd/main.go index 316c3a1f00..1138fed1e6 100644 --- a/fastdeploy/golang_router/cmd/main.go +++ b/fastdeploy/golang_router/cmd/main.go @@ -3,7 +3,9 @@ package main import ( "context" "flag" + "fmt" "log" + "runtime/debug" "github.com/PaddlePaddle/FastDeploy/router/internal/config" "github.com/PaddlePaddle/FastDeploy/router/internal/manager" @@ -14,13 +16,24 @@ import ( func main() { // Parse command line arguments - var configPath, port string - var splitwise bool + var ( + configPath string + port string + splitwise bool + showVersion bool + ) flag.StringVar(&configPath, "config_path", "", "path to config file") flag.StringVar(&port, "port", "", "listen port of router") flag.BoolVar(&splitwise, "splitwise", false, "enable splitwise mode") + flag.BoolVar(&showVersion, "version", false, "print version info") + flag.BoolVar(&showVersion, "V", false, "print version info (shorthand)") flag.Parse() + if showVersion { + printVersion() + return + } + // Load configuration cfg, err := config.Load(configPath, port, splitwise) if err != nil { @@ -52,3 +65,38 @@ func main() { log.Fatalf("Failed to start server: %v", err) } } + +func printVersion() { + if info, ok := debug.ReadBuildInfo(); ok { + var ( + commit = "unknown" + vcsTime = "unknown" + dirty = "unknown" + ) + + for _, s := range info.Settings { + switch s.Key { + case "vcs.revision": + if len(s.Value) >= 7 { + commit = s.Value[:7] + } else { + commit = s.Value + } + case "vcs.time": + vcsTime = s.Value + case "vcs.modified": + dirty = s.Value + } + } + + fmt.Printf("golang-router\n") + fmt.Printf(" version: %s\n", info.Main.Version) + fmt.Printf(" commit: %s\n", commit) + fmt.Printf(" dirty: %s\n", dirty) + fmt.Printf(" vcsTime: %s\n", vcsTime) + fmt.Printf(" module: %s\n", info.Main.Path) + return + } + + fmt.Println("version info not available") +} diff --git a/fastdeploy/golang_router/internal/gateway/completions.go b/fastdeploy/golang_router/internal/gateway/completions.go index 816636feb9..3aa15fc3f3 100644 --- a/fastdeploy/golang_router/internal/gateway/completions.go +++ b/fastdeploy/golang_router/internal/gateway/completions.go @@ -3,6 +3,7 @@ package gateway import ( "bufio" "bytes" + "context" crand "crypto/rand" "encoding/json" "fmt" @@ -35,6 +36,8 @@ func newRequestID() string { return fmt.Sprintf("%d-%d", time.Now().UnixNano(), rand.Int63()) } +type PromptExtractor func(rawReq map[string]any) string + // extractPromptFromChatRequest extracts text prompt from OpenAI ChatCompletions style request func extractPromptFromChatRequest(rawReq map[string]any) string { messagesVal, ok := rawReq["messages"] @@ -95,12 +98,55 @@ func extractPromptFromChatRequest(rawReq map[string]any) string { return builder.String() } +func extractPromptFromCompletionsRequest(rawReq map[string]any) string { + promptVal, ok := rawReq["prompt"] + if !ok { + return "" + } + + var builder strings.Builder + + appendText := func(s string) { + s = strings.TrimSpace(s) + if s == "" { + return + } + if builder.Len() > 0 { + builder.WriteByte(' ') + } + builder.WriteString(s) + } + + switch v := promptVal.(type) { + + case string: + appendText(v) + + case []string: + for _, s := range v { + appendText(s) + } + + case []any: + for _, item := range v { + if s, ok := item.(string); ok { + appendText(s) + } + } + + default: + // Other structures are ignored for now + } + + return builder.String() +} + // PostToPD sends requests to both Prefill and Decode instances, only returns Decode node response -func PostToPD(c *gin.Context, decodeURL, prefillURL string, reqBody []byte) (*http.Response, error) { +func PostToPD(c *gin.Context, decodeURL, prefillURL string, reqBody []byte, isStream bool, completionEndpoint string) (*http.Response, error) { ctx := c.Request.Context() - decodeEndpoint := fmt.Sprintf("%s/v1/%s", decodeURL, "chat/completions") - prefillEndpoint := fmt.Sprintf("%s/v1/%s", prefillURL, "chat/completions") + decodeEndpoint := fmt.Sprintf("%s/v1/%s", decodeURL, completionEndpoint) + prefillEndpoint := fmt.Sprintf("%s/v1/%s", prefillURL, completionEndpoint) // Construct two requests decodeReq, err := http.NewRequestWithContext(ctx, "POST", decodeEndpoint, bytes.NewReader(reqBody)) @@ -160,14 +206,71 @@ func PostToPD(c *gin.Context, decodeURL, prefillURL string, reqBody []byte) (*ht } if prefillRes.resp != nil { - prefillRes.resp.Body.Close() + go readPrefillRecv(ctx, prefillURL, isStream, prefillRes.resp) } return decodeRes.resp, nil } +func readPrefillRecv(ctx context.Context, url string, isStream bool, backendResp *http.Response) { + if backendResp == nil || backendResp.Body == nil { + return + } + defer backendResp.Body.Close() + + if isStream { + buffer := bytebufferpool.Get() + buffer.Reset() + defer bytebufferpool.Put(buffer) + + scanner := bufio.NewScanner(backendResp.Body) + scanner.Buffer(buffer.B, maxCapacity) + + released := false + defer func() { + // Fallback to ensure release + if !released { + scheduler_handler.Release(ctx, url) + logger.Debug("[prefill] release in defer (fallback) url=%s", url) + } + }() + + for scanner.Scan() { + line := scanner.Text() + + // First read that returns data + if !released { + scheduler_handler.Release(ctx, url) + released = true + + logger.Debug("[prefill] first chunk received, release scheduler url=%s", url) + } + logger.Debug("[prefill] recv result: %s", line) + } + + if err := scanner.Err(); err != nil { + logger.Debug("[prefill] scanner error: %v", err) + } + } else { + _, err := io.Copy(io.Discard, backendResp.Body) + if err != nil { + logger.Debug("[prefill] copy error: %v", err) + } + } +} + // ChatCompletions implements request forwarding to actual large model inference service func ChatCompletions(c *gin.Context) { + completionEndpoint := "chat/completions" + CommonCompletions(c, extractPromptFromChatRequest, completionEndpoint) +} + +func Completions(c *gin.Context) { + completionEndpoint := "completions" + CommonCompletions(c, extractPromptFromCompletionsRequest, completionEndpoint) +} + +func CommonCompletions(c *gin.Context, extractor PromptExtractor, completionEndpoint string) { ctx := c.Request.Context() bodyBytes, err := io.ReadAll(c.Request.Body) @@ -190,13 +293,15 @@ func ChatCompletions(c *gin.Context) { destURL string releaseTargets []string requestBodyData []byte + prefillURL string + decodeURL string ) if isSplitwise { // PD mode: select instances for Prefill/Decode separately - message := extractPromptFromChatRequest(rawReq) + message := extractor(rawReq) - prefillURL, decodeURL, err := manager.SelectWorkerPair(ctx, message) + prefillURL, decodeURL, err = manager.SelectWorkerPair(ctx, message) if err != nil { c.Writer.WriteHeader(http.StatusBadGateway) c.Writer.Write([]byte(`{"error": "Failed to select worker pair"}`)) @@ -208,6 +313,9 @@ func ChatCompletions(c *gin.Context) { return } + // Prefill node token count was added in SelectWorker, release when request ends + defer scheduler_handler.ReleasePrefillTokens(ctx, prefillURL, message) + // Construct disaggregate_info to ensure selected P/D work in pairs within FastDeploy disagg, err := manager.BuildDisaggregateInfo(ctx, prefillURL, decodeURL) if err != nil { @@ -237,9 +345,6 @@ func ChatCompletions(c *gin.Context) { // Expose scheduling results to caller for debugging/validating scheduling strategy c.Writer.Header().Set("X-Router-Prefill-URL", prefillURL) c.Writer.Header().Set("X-Router-Decode-URL", decodeURL) - - // Prefill node token count was added in SelectWorker, release when request ends - defer scheduler_handler.ReleasePrefillTokens(ctx, prefillURL, message) } else { // Non-PD mode: use Mixed instance dest, err := manager.SelectWorker(ctx, "") @@ -260,10 +365,18 @@ func ChatCompletions(c *gin.Context) { } }() + isStream := false + if v, ok := rawReq["stream"]; ok { + stream, ok := v.(bool) + if ok && stream { + isStream = true + } + } + // Send request var backendResp *http.Response if isSplitwise { - backendResp, err = PostToPD(c, destURL, releaseTargets[0], requestBodyData) + backendResp, err = PostToPD(c, decodeURL, prefillURL, requestBodyData, isStream, completionEndpoint) } else { backendResp, err = GetClientWithRetry(c, requestBodyData, destURL) } @@ -291,14 +404,6 @@ func ChatCompletions(c *gin.Context) { c.Writer.WriteHeader(backendResp.StatusCode) } - isStream := false - if v, ok := rawReq["stream"]; ok { - stream, ok := v.(bool) - if ok && stream { - isStream = true - } - } - redirect(c, isStream, backendResp) } diff --git a/fastdeploy/golang_router/internal/gateway/completions_test.go b/fastdeploy/golang_router/internal/gateway/completions_test.go index 1483cff635..2dc03840d6 100644 --- a/fastdeploy/golang_router/internal/gateway/completions_test.go +++ b/fastdeploy/golang_router/internal/gateway/completions_test.go @@ -2,9 +2,12 @@ package gateway import ( "bytes" + "context" "encoding/json" + "io" "net/http" "net/http/httptest" + "strings" "testing" "github.com/gin-gonic/gin" @@ -57,29 +60,6 @@ func TestExtractPromptFromChatRequest(t *testing.T) { } } -func TestPostToPD(t *testing.T) { - // Setup test servers - prefillTS := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - })) - defer prefillTS.Close() - - decodeTS := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"response": "test"}`)) - })) - defer decodeTS.Close() - - // Setup test context - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) - c.Request = httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(`{"test": "data"}`)) - - resp, err := PostToPD(c, decodeTS.URL, prefillTS.URL, []byte(`{"test": "data"}`)) - assert.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode) -} - func TestRedirect(t *testing.T) { // Setup test server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -144,3 +124,309 @@ func TestNewRequestID(t *testing.T) { // Check that IDs are different assert.NotEqual(t, id1, id2) } + +func TestExtractPromptFromCompletionsRequest(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + { + "simple string prompt", + `{"prompt": "hello world"}`, + "hello world", + }, + { + "string array prompt", + `{"prompt": ["first", "second", "third"]}`, + "first second third", + }, + { + "interface array prompt", + `{"prompt": ["first", "second", "third"]}`, + "first second third", + }, + { + "empty prompt", + `{"prompt": ""}`, + "", + }, + { + "empty array prompt", + `{"prompt": []}`, + "", + }, + { + "missing prompt field", + `{"other": "field"}`, + "", + }, + { + "array with empty strings", + `{"prompt": ["", "hello", ""]}`, + "hello", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var rawReq map[string]any + err := json.Unmarshal([]byte(tt.input), &rawReq) + assert.NoError(t, err) + + result := extractPromptFromCompletionsRequest(rawReq) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestPostToPD(t *testing.T) { + // Setup test context + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/v1/chat/completions", + bytes.NewBufferString(`{"test": "data"}`)) + + reqBody := []byte(`{"test": "data"}`) + + t.Run("successful request to both P and D", func(t *testing.T) { + // Setup test servers for prefill and decode + prefillServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("prefill response")) + })) + defer prefillServer.Close() + + decodeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("decode response")) + })) + defer decodeServer.Close() + + resp, err := PostToPD(c, decodeServer.URL, prefillServer.URL, reqBody, false, "chat/completions") + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.NotNil(t, resp) + defer resp.Body.Close() + }) + + t.Run("decode server connection error", func(t *testing.T) { + prefillServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer prefillServer.Close() + + // Use invalid URL to simulate connection error + resp, err := PostToPD(c, "http://invalid-server:9999", prefillServer.URL, reqBody, false, "chat/completions") + assert.Error(t, err) + assert.Nil(t, resp) + }) + + t.Run("prefill server connection error", func(t *testing.T) { + decodeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer decodeServer.Close() + + // Use invalid URL to simulate connection error + resp, err := PostToPD(c, decodeServer.URL, "http://invalid-server:9999", reqBody, false, "chat/completions") + assert.Error(t, err) + assert.Nil(t, resp) + }) +} + +func TestGetClientWithRetry(t *testing.T) { + t.Run("success after connection errors", func(t *testing.T) { + retryCount := 0 + shouldFail := true + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + retryCount++ + if shouldFail && retryCount < 3 { + // Simulate network connection error by closing connection + hj, ok := w.(http.Hijacker) + if ok { + conn, _, _ := hj.Hijack() + conn.Close() + return + } + w.WriteHeader(http.StatusInternalServerError) + return + } + shouldFail = false + w.WriteHeader(http.StatusOK) + w.Write([]byte("success")) + })) + defer ts.Close() + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/v1/chat/completions", + bytes.NewBufferString(`{"test": "data"}`)) + + reqBody := []byte(`{"test": "data"}`) + + resp, err := GetClientWithRetry(c, reqBody, ts.URL) + assert.NoError(t, err) + assert.NotNil(t, resp) + assert.Equal(t, http.StatusOK, resp.StatusCode) + }) + + t.Run("all retries fail with connection errors", func(t *testing.T) { + retryCount := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + retryCount++ + // Always simulate network connection error + hj, ok := w.(http.Hijacker) + if ok { + conn, _, _ := hj.Hijack() + conn.Close() + return + } + w.WriteHeader(http.StatusInternalServerError) + })) + defer ts.Close() + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/v1/chat/completions", + bytes.NewBufferString(`{"test": "data"}`)) + + reqBody := []byte(`{"test": "data"}`) + + resp, err := GetClientWithRetry(c, reqBody, ts.URL) + assert.Error(t, err) + assert.Nil(t, resp) + }) + + t.Run("success on first try", func(t *testing.T) { + retryCount := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + retryCount++ + w.WriteHeader(http.StatusOK) + w.Write([]byte("success")) + })) + defer ts.Close() + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/v1/chat/completions", + bytes.NewBufferString(`{"test": "data"}`)) + + reqBody := []byte(`{"test": "data"}`) + + resp, err := GetClientWithRetry(c, reqBody, ts.URL) + assert.NoError(t, err) + assert.NotNil(t, resp) + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, 1, retryCount) + }) +} + +func TestCompletions(t *testing.T) { + // This is a basic test that just verifies the function calls CommonCompletions + // More comprehensive testing would require mocking the manager dependencies + t.Run("function exists", func(t *testing.T) { + // Just verify that the function can be called without panic + // Actual behavior testing requires integration test setup + assert.NotNil(t, Completions) + }) +} + +func TestReadPrefillRecv(t *testing.T) { + t.Run("nil response handling", func(t *testing.T) { + ctx := context.Background() + // Should handle nil response gracefully without panic + readPrefillRecv(ctx, "test-url", false, nil) + }) + + t.Run("nil response body handling", func(t *testing.T) { + ctx := context.Background() + // Create a mock response with nil body + resp := &http.Response{ + StatusCode: http.StatusOK, + Body: nil, + } + // Should handle nil body gracefully without panic + readPrefillRecv(ctx, "test-url", false, resp) + }) + + t.Run("mock response without scheduler dependency", func(t *testing.T) { + ctx := context.Background() + + // Create a simple response that doesn't trigger scheduler calls + resp := &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBufferString("test")), + } + + // This test verifies basic error handling and response body consumption + // without triggering scheduler initialization requirements + readPrefillRecv(ctx, "test-url", false, resp) + }) +} + +func TestCommonCompletions(t *testing.T) { + // Setup a basic test server for backend responses + backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Check if it's a stream request + bodyBytes, _ := io.ReadAll(r.Body) + var reqBody map[string]any + json.Unmarshal(bodyBytes, &reqBody) + + if stream, ok := reqBody["stream"].(bool); ok && stream { + // Stream response + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + w.Write([]byte("data: {\"choices\":[{\"text\":\"chunk1\"}]}\n")) + w.Write([]byte("data: {\"choices\":[{\"text\":\"chunk2\"}]}\n")) + w.Write([]byte("data: [DONE]\n")) + } else { + // Non-stream response + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"choices":[{"text":"test response"}]}`)) + } + })) + defer backendServer.Close() + + t.Run("basic request handling", func(t *testing.T) { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/v1/completions", + bytes.NewBufferString(`{"prompt": "test", "stream": false}`)) + + // Mock the manager functions to return our test server + // This would normally require more sophisticated mocking + // For now, this test verifies the function structure + assert.NotNil(t, CommonCompletions) + }) + + t.Run("invalid JSON request", func(t *testing.T) { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/v1/completions", + bytes.NewBufferString(`invalid json`)) + + CommonCompletions(c, extractPromptFromCompletionsRequest, "completions") + + // Should return 400 Bad Request + assert.Equal(t, http.StatusBadRequest, w.Code) + assert.Contains(t, w.Body.String(), "Invalid JSON format") + }) + + t.Run("empty request body", func(t *testing.T) { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/v1/completions", nil) + + CommonCompletions(c, extractPromptFromCompletionsRequest, "completions") + + // Should return 400 Bad Request with appropriate error message + assert.Equal(t, http.StatusBadRequest, w.Code) + // The error message could be either "Invalid request body" or "Invalid JSON format" + // depending on how empty body is handled + assert.True(t, strings.Contains(w.Body.String(), "Invalid request body") || + strings.Contains(w.Body.String(), "Invalid JSON format") || + w.Body.String() != "") + }) +} diff --git a/fastdeploy/golang_router/internal/router/router.go b/fastdeploy/golang_router/internal/router/router.go index d23866eafa..194412253b 100644 --- a/fastdeploy/golang_router/internal/router/router.go +++ b/fastdeploy/golang_router/internal/router/router.go @@ -24,7 +24,7 @@ func New(cfg *config.Config) *gin.Engine { v1 := r.Group("/v1") { v1.POST("/chat/completions", gateway.ChatCompletions) - v1.POST("/completions", gateway.ChatCompletions) + v1.POST("/completions", gateway.Completions) } r.POST("/register", manager.RegisterInstance) r.GET("/registered_number", manager.RegisteredNumber) diff --git a/mkdocs.yml b/mkdocs.yml index 1d71c0af3f..c23be6772c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -132,6 +132,7 @@ nav: - Monitor Metrics: online_serving/metrics.md - Scheduler: online_serving/scheduler.md - Graceful Shutdown: online_serving/graceful_shutdown_service.md + - Load-Balancing Scheduling Router: online_serving/router.md - Offline Inference: offline_inference.md - Best Practices: - ERNIE-4.5-0.3B: best_practices/ERNIE-4.5-0.3B-Paddle.md