diff --git a/README_CN.md b/README_CN.md index d2df9ccb6c..524fe99936 100644 --- a/README_CN.md +++ b/README_CN.md @@ -83,6 +83,7 @@ FastDeploy 支持在**英伟达(NVIDIA)GPU**、**昆仑芯(Kunlunxin)XPU - [投机解码](./docs/zh/features/speculative_decoding.md) - [前缀缓存](./docs/zh/features/prefix_caching.md) - [分块预填充](./docs/zh/features/chunked_prefill.md) +- [负载均衡调度Router](./docs/zh/online_serving/router.md) ## 致谢 diff --git a/README_EN.md b/README_EN.md index ddb1a1dd69..d2f39e946f 100644 --- a/README_EN.md +++ b/README_EN.md @@ -81,6 +81,7 @@ Learn how to download models, enable using the torch format, and more: - [Speculative Decoding](./docs/features/speculative_decoding.md) - [Prefix Caching](./docs/features/prefix_caching.md) - [Chunked Prefill](./docs/features/chunked_prefill.md) +- [Load-Balancing Scheduling Router](./docs/online_serving/router.md) ## Acknowledgement diff --git a/docs/features/data_parallel_service.md b/docs/features/data_parallel_service.md index 98635b4195..4707d38242 100644 --- a/docs/features/data_parallel_service.md +++ b/docs/features/data_parallel_service.md @@ -70,11 +70,11 @@ The usage and request scheduling workflow is as follows: ### Launching the Router -Start the Router service. Logs are written to `log_router/router.log`. +Start the Router service. Logs are written to `log_router/router.log`. `fd-router` installation instructions can be found in the [Router documentation](../online_serving/router.md). ```shell export FD_LOG_DIR="log_router" -python -m fastdeploy.router.launch \ +/usr/local/bin/fd-router \ --host 0.0.0.0 \ --port 30000 ``` diff --git a/docs/features/disaggregated.md b/docs/features/disaggregated.md index 851817475b..ac1de8ece9 100644 --- a/docs/features/disaggregated.md +++ b/docs/features/disaggregated.md @@ -67,11 +67,11 @@ bash build.sh **Quick Start** -Start the Router service. The `--splitwise` parameter specifies the scheduling mode as disaggregated deployment. Log information is output to `log_router/router.log`. +Start the Router service. The `--splitwise` parameter specifies the scheduling mode as disaggregated deployment. Log information is output to `log_router/router.log`. `fd-router` installation instructions can be found in the [Router documentation](../online_serving/router.md). ```bash export FD_LOG_DIR="log_router" -python -m fastdeploy.router.launch \ +/usr/local/bin/fd-router \ --host 0.0.0.0 \ --port 30000 \ --splitwise diff --git a/docs/online_serving/images/go-router-workflow.png b/docs/online_serving/images/go-router-workflow.png new file mode 100644 index 0000000000..4c8c16cd74 Binary files /dev/null and b/docs/online_serving/images/go-router-workflow.png differ diff --git a/docs/online_serving/router.md b/docs/online_serving/router.md index 400e44e9c3..367f276ec6 100644 --- a/docs/online_serving/router.md +++ b/docs/online_serving/router.md @@ -4,12 +4,20 @@ FastDeploy provides a Golang-based [Router](https://github.com/PaddlePaddle/FastDeploy/tree/develop/fastdeploy/golang_router) for request scheduling. The Router supports both centralized deployment and Prefill/Decode (PD) disaggregated deployment.。 +![go-router](images/go-router-workflow.png) + ## Installation ### 1. Prebuilt Binaries Starting from FastDeploy v2.5.0, the official Docker images include the Go language environment required to build the Golang Router and also provide a precompiled Router binary. The Router binary is located by default in the `/usr/local/bin` directory and can be used directly without additional compilation. For installation details, please refer to the [FastDeploy Installation Guide](../get_started/installation/nvidia_gpu.md) +If you need to download the Golang-based router binary separately, it can be installed using the following steps: +``` +wget https://paddle-qa.bj.bcebos.com/paddle-pipeline/FastDeploy_ActionCE/develop/latest/fd-router +mv fd-router /usr/local/bin/fd-router +``` + ### 2. Build from Source You need to build the Router from source in the following scenarios: @@ -33,7 +41,7 @@ bash build.sh Start the Router service. The `--port` parameter specifies the scheduling port for centralized deployment. ``` -./fd-router --port 30000 +/usr/local/bin/fd-router --port 30000 ``` Start a mixed inference instance. Compared to standalone deployment, specify the Router endpoint via `--router`. Other parameters remain unchanged. @@ -50,7 +58,7 @@ python -m fastdeploy.entrypoints.openai.api_server \ Start the Router service with PD disaggregation enabled using the `--splitwise` flag. ``` -./fd-router \ +/usr/local/bin/fd-router \ --port 30000 \ --splitwise ``` @@ -105,7 +113,7 @@ popd Launch the Router with the custom configuration specified via `--config_path`: ``` -./fd-router \ +/usr/local/bin/fd-router \ --port 30000 \ --splitwise \ --config_path examples/run_with_config/config/config.yaml diff --git a/docs/zh/features/data_parallel_service.md b/docs/zh/features/data_parallel_service.md index b7ed73cdb5..dac9ae20ee 100644 --- a/docs/zh/features/data_parallel_service.md +++ b/docs/zh/features/data_parallel_service.md @@ -55,10 +55,10 @@ FastDeploy提供[Router](https://github.com/PaddlePaddle/FastDeploy/tree/develop - Router接收实例的生成结果,返回给用户 上手示例: -- 启动Router服务,日志信息输出在`log_router/router.log`。 +- 启动Router服务,日志信息输出在`log_router/router.log`。`fd-router`的安装方法参考[Router说明文档](../online_serving/router.md)。 ``` export FD_LOG_DIR="log_router" -python -m fastdeploy.router.launch \ +/usr/local/bin/fd-router \ --host 0.0.0.0 \ --port 30000 \ ``` diff --git a/docs/zh/features/disaggregated.md b/docs/zh/features/disaggregated.md index 956d62cce3..ac074054c7 100644 --- a/docs/zh/features/disaggregated.md +++ b/docs/zh/features/disaggregated.md @@ -65,10 +65,10 @@ bash build.sh **快速上手** -启动Router服务,其中`--splitwise`参数指定为分离式部署的调度方式,日志信息输出在`log_router/router.log`。 +启动Router服务,其中`--splitwise`参数指定为分离式部署的调度方式,日志信息输出在`log_router/router.log`。`fd-router`的安装方法参考[Router说明文档](../online_serving/router.md)。 ``` export FD_LOG_DIR="log_router" -python -m fastdeploy.router.launch \ +/usr/local/bin/fd-router \ --host 0.0.0.0 \ --port 30000 \ --splitwise diff --git a/docs/zh/online_serving/images/go-router-workflow.png b/docs/zh/online_serving/images/go-router-workflow.png new file mode 100644 index 0000000000..4c8c16cd74 Binary files /dev/null and b/docs/zh/online_serving/images/go-router-workflow.png differ diff --git a/docs/zh/online_serving/router.md b/docs/zh/online_serving/router.md index 86828b0c9d..434f55f1c6 100644 --- a/docs/zh/online_serving/router.md +++ b/docs/zh/online_serving/router.md @@ -4,11 +4,19 @@ FastDeploy提供Golang版本[Router](https://github.com/PaddlePaddle/FastDeploy/tree/develop/fastdeploy/golang_router),用于实现请求的调度。Router支持集中式部署和PD分离式部署。 +![go-router](images/go-router-workflow.png) + ## 安装 ### 1. 预编译库下载 -在 FastDeploy v2.5.0 及之后版本中,官方 Docker 镜像将内置 Golang Router 编译所需的 Go 语言环境,并提供已编译完成的 Router 二进制文件。该二进制文件默认位于 `/usr/local/bin` 目录下,可直接使用。相关安装方式可参考 [FastDeploy 安装文档](../get_started/installation/nvidia_gpu.md) +在 FastDeploy v2.5.0 及之后版本中,官方 Docker 镜像将内置 Golang Router 编译所需的 Go 语言环境,并提供已编译完成的 Router 二进制文件。该二进制文件默认位于 `/usr/local/bin` 目录下,可直接使用。相关安装方式可参考 [FastDeploy 安装文档](../get_started/installation/nvidia_gpu.md)。 + +若需单独下载 Golang router 二进制文件,可通过以下方式: +``` +wget https://paddle-qa.bj.bcebos.com/paddle-pipeline/FastDeploy_ActionCE/develop/latest/fd-router +mv fd-router /usr/local/bin/fd-router +``` ### 2. 编译安装 @@ -27,13 +35,14 @@ FastDeploy提供Golang版本[Router](https://github.com/PaddlePaddle/FastDeploy/ git clone https://github.com/PaddlePaddle/FastDeploy.git cd FastDeploy/fastdeploy/golang_router bash build.sh +cp ``` ## 集中式部署 启动Router服务,其中`--port`参数指定集中式部署的调度端口. ``` -./fd-router \ +/usr/local/bin/fd-router \ --port 30000 ``` @@ -51,7 +60,7 @@ python -m fastdeploy.entrypoints.openai.api_server \ 启动Router服务,其中`--splitwise`参数指定为分离式部署的调度方式. ``` -./fd-router \ +/usr/local/bin/fd-router \ --port 30000 \ --splitwise ``` @@ -106,7 +115,7 @@ popd 在Router启动Router服务,其中`--config_path`参数指定配置路径. ``` -./fd-router \ +/usr/local/bin/fd-router \ --port 30000 \ --splitwise \ --config_path examples/run_with_config/config/config.yaml diff --git a/examples/router/start_mixed_go_router.sh b/examples/router/start_mixed_go_router.sh index 76067dc9f7..4847b8798b 100644 --- a/examples/router/start_mixed_go_router.sh +++ b/examples/router/start_mixed_go_router.sh @@ -17,8 +17,7 @@ ROUTER_PORT=52600 FD_BIN_DIR="/usr/local/bin" FD_ROUTER_BIN="${FD_BIN_DIR}/fd-router" -FD_ROUTER_URL="https://paddle-qa.bj.bcebos.com/FastDeploy/fd-router" -FD_ROUTER_SHA256="67640aaeebdd886826d3534930b2154cd2c1441a26bc3f38c3af5f0aadba7c2d" +FD_ROUTER_URL="https://paddle-qa.bj.bcebos.com/paddle-pipeline/FastDeploy_ActionCE/develop/latest/fd-router" ports=( $S1_PORT $((S1_PORT + 1)) $((S1_PORT + 2)) $((S1_PORT + 3)) @@ -37,20 +36,44 @@ if [ ! -x "${FD_ROUTER_BIN}" ]; then mkdir -p "${FD_BIN_DIR}" TMP_BIN="${FD_ROUTER_BIN}.tmp" - wget -q --no-proxy "${FD_ROUTER_URL}" -O "${TMP_BIN}" || exit 1 - - echo "${FD_ROUTER_SHA256} ${TMP_BIN}" | sha256sum -c - || { - echo "❌ Integrity check failed" + wget -q --no-proxy "${FD_ROUTER_URL}" -O "${TMP_BIN}" || { + echo "❌ Download fd-router failed" rm -f "${TMP_BIN}" exit 1 } + # ------- sanity checks (no fixed hash) ------- + + # 1. must be ELF binary + file "${TMP_BIN}" || grep -q "ELF" || { + echo "❌ fd-router is not an ELF binary" + rm -f "${TMP_BIN}" + exit 1 + } + + # 2. must be x86_64 architecture + file "${TMP_BIN}" | grep -q "x86-64" || { + echo "❌ fd-router architecture mismatch" + rm -f "${TMP_BIN}" + exit 1 + } + + # 3. size check (avoid HTML / empty / error pages) + SIZE=$(stat -c%s "${TMP_BIN}") + if [ "$SIZE" -lt 1000000 ]; then + echo "❌ fd-router size is too small ($SIZE bytes), suspicious" + rm -f "${TMP_BIN}" + exit 1 + fi + + # ------------------------------------- + mv "${TMP_BIN}" "${FD_ROUTER_BIN}" chmod +x "${FD_ROUTER_BIN}" - echo "fd-router installed and verified" + echo "✅ fd-router installed with sanity checks" else - echo "fd-router already exists" + echo "✅ fd-router already exists" fi # start router diff --git a/examples/router/start_pd_go_router.sh b/examples/router/start_pd_go_router.sh index ce73a6ad90..49bd07fd05 100644 --- a/examples/router/start_pd_go_router.sh +++ b/examples/router/start_pd_go_router.sh @@ -24,8 +24,7 @@ LOG_DATE=$(date +%Y%m%d_%H%M%S) FD_BIN_DIR="/usr/local/bin" FD_ROUTER_BIN="${FD_BIN_DIR}/fd-router" -FD_ROUTER_URL="https://paddle-qa.bj.bcebos.com/FastDeploy/fd-router" -FD_ROUTER_SHA256="67640aaeebdd886826d3534930b2154cd2c1441a26bc3f38c3af5f0aadba7c2d" +FD_ROUTER_URL="https://paddle-qa.bj.bcebos.com/paddle-pipeline/FastDeploy_ActionCE/develop/latest/fd-router" ports=($P_PORT $D_PORT $ROUTER_PORT) check_ports "${ports[@]}" || { @@ -40,20 +39,44 @@ if [ ! -x "${FD_ROUTER_BIN}" ]; then mkdir -p "${FD_BIN_DIR}" TMP_BIN="${FD_ROUTER_BIN}.tmp" - wget -q --no-proxy "${FD_ROUTER_URL}" -O "${TMP_BIN}" || exit 1 - - echo "${FD_ROUTER_SHA256} ${TMP_BIN}" | sha256sum -c - || { - echo "❌ Integrity check failed" + wget -q --no-proxy "${FD_ROUTER_URL}" -O "${TMP_BIN}" || { + echo "❌ Download fd-router failed" rm -f "${TMP_BIN}" exit 1 } + # ------- sanity checks (no fixed hash) ------- + + # 1. must be ELF binary + file "${TMP_BIN}" || grep -q "ELF" || { + echo "❌ fd-router is not an ELF binary" + rm -f "${TMP_BIN}" + exit 1 + } + + # 2. must be x86_64 architecture + file "${TMP_BIN}" | grep -q "x86-64" || { + echo "❌ fd-router architecture mismatch" + rm -f "${TMP_BIN}" + exit 1 + } + + # 3. size check (avoid HTML / empty / error pages) + SIZE=$(stat -c%s "${TMP_BIN}") + if [ "$SIZE" -lt 1000000 ]; then + echo "❌ fd-router size is too small ($SIZE bytes), suspicious" + rm -f "${TMP_BIN}" + exit 1 + fi + + # ------------------------------------- + mv "${TMP_BIN}" "${FD_ROUTER_BIN}" chmod +x "${FD_ROUTER_BIN}" - echo "fd-router installed and verified" + echo "✅ fd-router installed with sanity checks" else - echo "fd-router already exists" + echo "✅ fd-router already exists" fi # start router diff --git a/fastdeploy/golang_router/examples/run_with_config/run.sh b/fastdeploy/golang_router/examples/run_with_config/run.sh index 0d637a8596..61ae78169a 100644 --- a/fastdeploy/golang_router/examples/run_with_config/run.sh +++ b/fastdeploy/golang_router/examples/run_with_config/run.sh @@ -19,5 +19,5 @@ if [ -n "$PID" ]; then fi echo "Starting new fd-router process..." -nohup ./fd-router --config_path ./config/config.yaml --splitwise > fd-router.log 2>&1 & +nohup /usr/local/bin/fd-router --config_path ./config/config.yaml --splitwise > fd-router.log 2>&1 & echo "fd-router started with PID: $!" diff --git a/fastdeploy/golang_router/examples/run_with_default_workers/run.sh b/fastdeploy/golang_router/examples/run_with_default_workers/run.sh index a23bb67d5c..d96541d5bf 100644 --- a/fastdeploy/golang_router/examples/run_with_default_workers/run.sh +++ b/fastdeploy/golang_router/examples/run_with_default_workers/run.sh @@ -14,5 +14,5 @@ if [ -n "$PID" ]; then fi echo "Starting new fd-router process..." -nohup ./fd-router --config_path ./config/config.yaml --splitwise > fd-router.log 2>&1 & +nohup /usr/local/bin/fd-router --config_path ./config/config.yaml --splitwise > fd-router.log 2>&1 & echo "fd-router started with PID: $!" diff --git a/fastdeploy/golang_router/internal/gateway/completions.go b/fastdeploy/golang_router/internal/gateway/completions.go index 3aa15fc3f3..88d15a5138 100644 --- a/fastdeploy/golang_router/internal/gateway/completions.go +++ b/fastdeploy/golang_router/internal/gateway/completions.go @@ -142,7 +142,7 @@ func extractPromptFromCompletionsRequest(rawReq map[string]any) string { } // PostToPD sends requests to both Prefill and Decode instances, only returns Decode node response -func PostToPD(c *gin.Context, decodeURL, prefillURL string, reqBody []byte, isStream bool, completionEndpoint string) (*http.Response, error) { +func PostToPD(c *gin.Context, decodeURL, prefillURL string, reqBody []byte, isStream bool, message string, completionEndpoint string) (*http.Response, error) { ctx := c.Request.Context() decodeEndpoint := fmt.Sprintf("%s/v1/%s", decodeURL, completionEndpoint) @@ -206,13 +206,13 @@ func PostToPD(c *gin.Context, decodeURL, prefillURL string, reqBody []byte, isSt } if prefillRes.resp != nil { - go readPrefillRecv(ctx, prefillURL, isStream, prefillRes.resp) + go readPrefillRecv(ctx, prefillURL, isStream, message, prefillRes.resp) } return decodeRes.resp, nil } -func readPrefillRecv(ctx context.Context, url string, isStream bool, backendResp *http.Response) { +func readPrefillRecv(ctx context.Context, url string, isStream bool, message string, backendResp *http.Response) { if backendResp == nil || backendResp.Body == nil { return } @@ -231,21 +231,22 @@ func readPrefillRecv(ctx context.Context, url string, isStream bool, backendResp // Fallback to ensure release if !released { scheduler_handler.Release(ctx, url) + scheduler_handler.ReleasePrefillTokens(ctx, url, message) logger.Debug("[prefill] release in defer (fallback) url=%s", url) } }() for scanner.Scan() { - line := scanner.Text() + _ = scanner.Text() // First read that returns data if !released { scheduler_handler.Release(ctx, url) + scheduler_handler.ReleasePrefillTokens(ctx, url, message) released = true logger.Debug("[prefill] first chunk received, release scheduler url=%s", url) } - logger.Debug("[prefill] recv result: %s", line) } if err := scanner.Err(); err != nil { @@ -295,11 +296,12 @@ func CommonCompletions(c *gin.Context, extractor PromptExtractor, completionEndp requestBodyData []byte prefillURL string decodeURL string + message string ) if isSplitwise { // PD mode: select instances for Prefill/Decode separately - message := extractor(rawReq) + message = extractor(rawReq) prefillURL, decodeURL, err = manager.SelectWorkerPair(ctx, message) if err != nil { @@ -313,9 +315,6 @@ func CommonCompletions(c *gin.Context, extractor PromptExtractor, completionEndp return } - // Prefill node token count was added in SelectWorker, release when request ends - defer scheduler_handler.ReleasePrefillTokens(ctx, prefillURL, message) - // Construct disaggregate_info to ensure selected P/D work in pairs within FastDeploy disagg, err := manager.BuildDisaggregateInfo(ctx, prefillURL, decodeURL) if err != nil { @@ -340,7 +339,7 @@ func CommonCompletions(c *gin.Context, extractor PromptExtractor, completionEndp } destURL = decodeURL - releaseTargets = []string{prefillURL, decodeURL} + releaseTargets = []string{decodeURL} // Expose scheduling results to caller for debugging/validating scheduling strategy c.Writer.Header().Set("X-Router-Prefill-URL", prefillURL) @@ -376,7 +375,7 @@ func CommonCompletions(c *gin.Context, extractor PromptExtractor, completionEndp // Send request var backendResp *http.Response if isSplitwise { - backendResp, err = PostToPD(c, decodeURL, prefillURL, requestBodyData, isStream, completionEndpoint) + backendResp, err = PostToPD(c, decodeURL, prefillURL, requestBodyData, isStream, message, completionEndpoint) } else { backendResp, err = GetClientWithRetry(c, requestBodyData, destURL) } @@ -389,7 +388,7 @@ func CommonCompletions(c *gin.Context, extractor PromptExtractor, completionEndp defer backendResp.Body.Close() if isSplitwise { - metrics.InferenceRequests.WithLabelValues("", releaseTargets[0], destURL, strconv.Itoa(backendResp.StatusCode)).Inc() + metrics.InferenceRequests.WithLabelValues("", prefillURL, decodeURL, strconv.Itoa(backendResp.StatusCode)).Inc() } else { metrics.InferenceRequests.WithLabelValues(destURL, "", "", strconv.Itoa(backendResp.StatusCode)).Inc() } diff --git a/fastdeploy/golang_router/internal/gateway/completions_test.go b/fastdeploy/golang_router/internal/gateway/completions_test.go index 2dc03840d6..2c2df1d9cb 100644 --- a/fastdeploy/golang_router/internal/gateway/completions_test.go +++ b/fastdeploy/golang_router/internal/gateway/completions_test.go @@ -203,7 +203,7 @@ func TestPostToPD(t *testing.T) { })) defer decodeServer.Close() - resp, err := PostToPD(c, decodeServer.URL, prefillServer.URL, reqBody, false, "chat/completions") + resp, err := PostToPD(c, decodeServer.URL, prefillServer.URL, reqBody, false, "test message", "chat/completions") assert.NoError(t, err) assert.Equal(t, http.StatusOK, resp.StatusCode) assert.NotNil(t, resp) @@ -217,7 +217,7 @@ func TestPostToPD(t *testing.T) { defer prefillServer.Close() // Use invalid URL to simulate connection error - resp, err := PostToPD(c, "http://invalid-server:9999", prefillServer.URL, reqBody, false, "chat/completions") + resp, err := PostToPD(c, "http://invalid-server:9999", prefillServer.URL, reqBody, false, "test message", "chat/completions") assert.Error(t, err) assert.Nil(t, resp) }) @@ -229,7 +229,7 @@ func TestPostToPD(t *testing.T) { defer decodeServer.Close() // Use invalid URL to simulate connection error - resp, err := PostToPD(c, decodeServer.URL, "http://invalid-server:9999", reqBody, false, "chat/completions") + resp, err := PostToPD(c, decodeServer.URL, "http://invalid-server:9999", reqBody, false, "test message", "chat/completions") assert.Error(t, err) assert.Nil(t, resp) }) @@ -336,7 +336,7 @@ func TestReadPrefillRecv(t *testing.T) { t.Run("nil response handling", func(t *testing.T) { ctx := context.Background() // Should handle nil response gracefully without panic - readPrefillRecv(ctx, "test-url", false, nil) + readPrefillRecv(ctx, "test-url", false, "test message", nil) }) t.Run("nil response body handling", func(t *testing.T) { @@ -347,7 +347,7 @@ func TestReadPrefillRecv(t *testing.T) { Body: nil, } // Should handle nil body gracefully without panic - readPrefillRecv(ctx, "test-url", false, resp) + readPrefillRecv(ctx, "test-url", false, "test message", resp) }) t.Run("mock response without scheduler dependency", func(t *testing.T) { @@ -361,7 +361,7 @@ func TestReadPrefillRecv(t *testing.T) { // This test verifies basic error handling and response body consumption // without triggering scheduler initialization requirements - readPrefillRecv(ctx, "test-url", false, resp) + readPrefillRecv(ctx, "test-url", false, "test message", resp) }) }