diff --git a/.github/workflows/_golang_router_test.yml b/.github/workflows/_golang_router_test.yml new file mode 100644 index 0000000000..4964f3a3a0 --- /dev/null +++ b/.github/workflows/_golang_router_test.yml @@ -0,0 +1,213 @@ +name: GOLANG_ROUTER Tests +description: "Run FastDeploy golang_router tests" + +on: + workflow_call: + inputs: + DOCKER_IMAGE: + description: "Build Images" + required: true + type: string + default: "ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddleqa:fastdeploy-ciuse-cuda126-paddle-dev" + FASTDEPLOY_ARCHIVE_URL: + description: "URL of the compressed FastDeploy code archive." + required: true + type: string + FASTDEPLOY_WHEEL_URL: + description: "URL of the FastDeploy Wheel." + required: true + type: string + FASTDEPLOY_ROUTER_URL: + description: "URL of the FastDeploy Router" + required: true + type: string + CACHE_DIR: + description: "Cache Dir Use" + required: false + type: string + default: "" + MODEL_CACHE_DIR: + description: "Cache Dir Use" + required: false + type: string + default: "" + secrets: + github-token: + required: true + +jobs: + run_golang_router_tests: + runs-on: [self-hosted, GPU-h20-2Cards] + timeout-minutes: 30 + steps: + - name: Code Prepare + shell: bash + env: + docker_image: ${{ inputs.DOCKER_IMAGE }} + fd_archive_url: ${{ inputs.FASTDEPLOY_ARCHIVE_URL }} + run: | + set -x + REPO="https://github.com/${{ github.repository }}.git" + FULL_REPO="${{ github.repository }}" + REPO_NAME="${FULL_REPO##*/}" + BASE_BRANCH="${{ github.base_ref }}" + docker pull ${docker_image} + # Clean the repository directory before starting + docker run --rm --net=host -v $(pwd):/workspace -w /workspace \ + -e "REPO_NAME=${REPO_NAME}" \ + ${docker_image} /bin/bash -c ' + CLEAN_RETRIES=3 + CLEAN_COUNT=0 + + while [ $CLEAN_COUNT -lt $CLEAN_RETRIES ]; do + echo "Attempt $((CLEAN_COUNT+1)) to remove ${REPO_NAME}* ..." + rm -rf "${REPO_NAME}"* || true + sleep 2 + + # Check if anything matching ${REPO_NAME}* still exists + if ! ls "${REPO_NAME}"* >/dev/null 2>&1; then + echo "All ${REPO_NAME}* removed successfully" + break + fi + + CLEAN_COUNT=$((CLEAN_COUNT + 1)) + done + + if ls "${REPO_NAME}"* >/dev/null 2>&1; then + echo "ERROR: Failed to clean ${REPO_NAME}* after multiple attempts" + ls -ld "${REPO_NAME}"* + exit 1 + fi + ' + + wget -q --no-proxy ${fd_archive_url} + tar -xf FastDeploy.tar.gz + rm -rf FastDeploy.tar.gz + cd FastDeploy + git config --global user.name "FastDeployCI" + git config --global user.email "fastdeploy_ci@example.com" + git log -n 3 --oneline + + - name: Run Golang_Router Tests + shell: bash + env: + docker_image: ${{ inputs.DOCKER_IMAGE }} + fd_wheel_url: ${{ inputs.FASTDEPLOY_WHEEL_URL }} + fd_router_url: ${{ inputs.FASTDEPLOY_ROUTER_URL }} + CACHE_DIR: ${{ inputs.CACHE_DIR }} + BASE_REF: ${{ github.event.pull_request.base.ref }} + MODEL_CACHE_DIR: ${{ inputs.MODEL_CACHE_DIR }} + IS_PR: ${{ github.event_name == 'pull_request' }} + run: | + if [[ "$IS_PR" == "true" ]]; then + echo "Running on PR" + else + echo "Not a PR" + fi + runner_name="${{ runner.name }}" + CARD_ID=$(echo "${runner_name}" | awk -F'-' '{print $NF}') + DEVICES=$(echo "$CARD_ID" | fold -w1 | paste -sd,) + DEVICE_PORT=$(echo "$DEVICES" | cut -d',' -f1) + + FLASK_PORT=$((8068 + DEVICE_PORT * 100)) + FD_API_PORT=$((8088 + DEVICE_PORT * 100)) + FD_ENGINE_QUEUE_PORT=$((8058 + DEVICE_PORT * 100)) + FD_METRICS_PORT=$((8078 + DEVICE_PORT * 100)) + FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100)) + FD_ROUTER_PORT=$((8048 + DEVICE_PORT * 100)) + FD_CONNECTOR_PORT=$((8038 + DEVICE_PORT * 100)) + FD_RDMA_PORT=$((8028 + DEVICE_PORT * 100)) + echo "Test ENV Parameter:" + echo "=========================================================" + echo "FLASK_PORT=${FLASK_PORT}" + echo "FD_API_PORT=${FD_API_PORT}" + echo "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}" + echo "FD_METRICS_PORT=${FD_METRICS_PORT}" + echo "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" + echo "FD_ROUTER_PORT=${FD_ROUTER_PORT}" + echo "FD_CONNECTOR_PORT=${FD_CONNECTOR_PORT}" + echo "FD_RDMA_PORT=${FD_RDMA_PORT}" + echo "DEVICES=${DEVICES}" + echo "=========================================================" + + CACHE_DIR="${CACHE_DIR:-$(dirname "$(dirname "${{ github.workspace }}")")}" + echo "CACHE_DIR is set to ${CACHE_DIR}" + if [ ! -f "${CACHE_DIR}/gitconfig" ]; then + touch "${CACHE_DIR}/gitconfig" + fi + + PORTS=($FLASK_PORT $FD_API_PORT $FD_ENGINE_QUEUE_PORT $FD_METRICS_PORT $FD_CACHE_QUEUE_PORT) + LOG_FILE="./port_cleanup_$(date +%Y%m%d_%H%M%S).log" + echo "==== LOG_FILE is ${LOG_FILE} ====" + + echo "==== PORT CLEAN BEFORE TASK RUN ====" | tee -a $LOG_FILE + + for port in "${PORTS[@]}"; do + PIDS=$(lsof -t -i :$port || true) + if [ -n "$PIDS" ]; then + echo "Port $port is occupied by PID(s): $PIDS" | tee -a $LOG_FILE + echo "$PIDS" | xargs -r kill -9 + echo "Port $port cleared" | tee -a $LOG_FILE + else + echo "Port $port is free" | tee -a $LOG_FILE + fi + done + + echo "==== PORT CLEAN COMPLETE ====" | tee -a $LOG_FILE + + echo "=========================================================" + echo "Ensuring no stale container named ${runner_name} ..." + if [ "$(docker ps -a -q -f name=${runner_name})" ]; then + echo "Removing stale container: ${runner_name}" + docker rm -f ${runner_name} || true + fi + + export RDMA_DEVICES=$(find /dev/infiniband/uverbs* -maxdepth 1 -not -type d | xargs -I{} echo '--device {}:{}') + + docker run --rm --net=host \ + --name ${runner_name} \ + --cap-add=SYS_PTRACE --cap-add=IPC_LOCK \ + --shm-size=64G \ + ${RDMA_DEVICES} \ + --device=/dev/infiniband/rdma_cm \ + --ulimit memlock=-1:-1 \ + -v $(pwd):/workspace -w /workspace \ + -v "${CACHE_DIR}/gitconfig:/etc/gitconfig:ro" \ + -v "${CACHE_DIR}/.cache:/root/.cache" \ + -v "${CACHE_DIR}/ConfigDir:/root/.config" \ + -v "${MODEL_CACHE_DIR}:/ModelData:ro" \ + -e "MODEL_PATH=/ModelData" \ + -e "FD_API_PORT=${FD_API_PORT}" \ + -e "FD_ENGINE_QUEUE_PORT=${FD_ENGINE_QUEUE_PORT}" \ + -e "FD_METRICS_PORT=${FD_METRICS_PORT}" \ + -e "FLASK_PORT=${FLASK_PORT}" \ + -e "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" \ + -e "FD_ROUTER_PORT=${FD_ROUTER_PORT}" \ + -e "FD_CONNECTOR_PORT=${FD_CONNECTOR_PORT}" \ + -e "FD_RDMA_PORT=${FD_RDMA_PORT}" \ + -e "CLEAN_CUDA=1" \ + -e TZ="Asia/Shanghai" \ + -e "fd_wheel_url=${fd_wheel_url}" \ + -e "fd_router_url=${fd_router_url}" \ + -e "BASE_REF=${BASE_REF}" \ + -e "IS_PR=${IS_PR}" \ + --gpus "\"device=${DEVICES}\"" ${docker_image} /bin/bash -c ' + + git config --global --add safe.directory /workspace/FastDeploy + cd FastDeploy + + python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/ + pip config set global.extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple + + python -m pip install -r scripts/unittest_requirement.txt + python -m pip install ${fd_wheel_url} + rm -rf fastdeploy + python -m pip install ${fd_wheel_url} --no-deps --target=/workspace/FastDeploy + export PYTHONPATH=/workspace/FastDeploy/ + + # download fd-router binary + wget -q --no-proxy ${fd_router_url} -O /usr/local/bin/fd-router + chmod +x /usr/local/bin/fd-router + + bash scripts/run_golang_router.sh + ' diff --git a/.github/workflows/publish_job.yml b/.github/workflows/publish_job.yml index 4fb5a0147c..7ee5306288 100644 --- a/.github/workflows/publish_job.yml +++ b/.github/workflows/publish_job.yml @@ -413,6 +413,19 @@ jobs: secrets: github-token: ${{ secrets.GITHUB_TOKEN }} + golang_router_test: + name: Run Golang Router Tests + needs: [ clone,build_cu126,build_fd_router ] + uses: ./.github/workflows/_golang_router_test.yml + with: + DOCKER_IMAGE: ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/paddleqa:fastdeploy-ciuse-cuda126-dailyupdate + FASTDEPLOY_ARCHIVE_URL: ${{ needs.clone.outputs.repo_archive_url }} + FASTDEPLOY_WHEEL_URL: ${{ needs.build_cu126.outputs.wheel_path }} + FASTDEPLOY_ROUTER_URL: ${{ needs.build_fd_router.outputs.fd_router_path }} + MODEL_CACHE_DIR: "/ssd2/actions-runner/ModelData" + secrets: + github-token: ${{ secrets.GITHUB_TOKEN }} + logprob_test: name: Run FastDeploy LogProb Tests needs: [build_cu126] diff --git a/scripts/coverage_run.sh b/scripts/coverage_run.sh index d0e94c3638..62d8938b95 100644 --- a/scripts/coverage_run.sh +++ b/scripts/coverage_run.sh @@ -13,11 +13,11 @@ failed_tests_file="failed_tests.log" ################################## -# 执行 pytest,每个文件单独跑 -# 使用 pytest 的 --collect-only 输出,并从每行中提取真正的测试文件路径(形如 tests/.../test_*.py)。 -# 注意:pytest 在收集失败时会输出形如 "ERROR tests/xxx/test_xxx.py::test_xxx ..." 的行, -# 为了避免把前缀 "ERROR"/"FAILED"/"collecting" 等误当成文件名,这里只保留行中出现的 -# "tests/.../test_*.py" 这一段,其他前后内容直接丢弃。 +# Run pytest, one file at a time +# Use pytest's --collect-only output to extract the actual test file paths (e.g., tests/.../test_*.py). +# Note: pytest may output lines like "ERROR tests/xxx/test_xxx.py::test_xxx ..." on collection failure, +# to avoid treating prefixes like "ERROR"/"FAILED"/"collecting" as filenames, +# we only keep the "tests/.../test_*.py" portion and discard everything else. TEST_FILES=$( python -m pytest --collect-only -q -c "${PYTEST_INI}" "${tests_path}" --rootdir="${run_path}" --disable-warnings 2>&1 \ | grep -E 'tests/.+\/test_.*\.py' \ @@ -29,30 +29,81 @@ TEST_FILES=$( failed_pytest=0 success_pytest=0 +# nullglob: if no match, the pattern expands to nothing +shopt -s nullglob + for file in $TEST_FILES; do echo "Running pytest file: $file" + # Clean up previous logs + rm -rf "${run_path}"/log* || true + rm -rf "${run_path}"/*.log || true + + # Run pytest with coverage for the current file python -m coverage run -m pytest -c ${PYTEST_INI} "$file" -vv -s status=$? if [ "$status" -ne 0 ]; then echo "$file" >> "$failed_tests_file" failed_pytest=$((failed_pytest+1)) + + echo "" + echo "==================== Dumping Logs ====================" + + for log_dir in "${run_path}"/log*; do + if [ -d "${log_dir}" ]; then + echo + echo ">>>> Processing log directory: ${log_dir}" + + # print all workerlog.0 + worker_logs=("${log_dir}"/workerlog.0) + if [ "${#worker_logs[@]}" -gt 0 ]; then + for worker_log in "${worker_logs[@]}"; do + if [ -f "${worker_log}" ]; then + echo "---------------- ${worker_log} (last 100 lines) ----------------" + tail -n 100 "${worker_log}" || true + echo "---------------------------------------------------------------" + fi + done + else + echo "No workerlog.0 found in ${log_dir}" + fi + + echo ">>> grep error in ${log_dir}" + grep -Rni --color=auto "error" "${log_dir}" || true + fi + done + + # print all server logs + server_logs=("${run_path}"/*.log) + if [ "${#server_logs[@]}" -gt 0 ]; then + for server_log in "${server_logs[@]}"; do + if [ -f "${server_log}" ]; then + echo + echo "---------------- ${server_log} (last 100 lines) ----------------" + tail -n 100 "${server_log}" || true + echo "---------------------------------------------------------------" + fi + done + else + echo "No *.log files found" + fi + + echo "======================================================" else success_pytest=$((success_pytest+1)) fi ps -ef | grep "${FD_CACHE_QUEUE_PORT}" | grep -v grep | awk '{print $2}' | xargs -r kill -9 ps -ef | grep "${FD_ENGINE_QUEUE_PORT}" | grep -v grep | awk '{print $2}' | xargs -r kill -9 done +shopt -u nullglob ################################## -# 汇总结果 +# Summary ################################## echo "====================================" echo "Pytest total: $((failed_pytest + success_pytest))" echo "Pytest successful: $success_pytest" echo "Pytest failed: $failed_pytest" -echo "Special tests total: ${#special_tests[@]}" -echo "Special tests successful: $success_special" if [ "$failed_pytest" -ne 0 ]; then echo "Failed test cases are listed in $failed_tests_file" diff --git a/scripts/run_golang_router.sh b/scripts/run_golang_router.sh new file mode 100644 index 0000000000..66578d267d --- /dev/null +++ b/scripts/run_golang_router.sh @@ -0,0 +1,92 @@ +#!/bin/bash + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" + +GOLANG_ROUTER_CASES_DIR="${REPO_ROOT}/tests/e2e/golang_router" +FAILED_CASE_FILE="${REPO_ROOT}/failed_cases.txt" + +FAILED_COUNT=0 + +rm -f "${FAILED_CASE_FILE}" + +shopt -s nullglob +test_files=("${GOLANG_ROUTER_CASES_DIR}"/test_*.py) + +if [ "${#test_files[@]}" -eq 0 ]; then + echo "ERROR: No test files found under: ${GOLANG_ROUTER_CASES_DIR}" + exit 1 +fi + +for test_file in "${test_files[@]}"; do + echo "------------------------------------------------------------" + echo "Running pytest: ${test_file}" + echo "------------------------------------------------------------" + # Clean up previous logs + rm -rf "${REPO_ROOT}"/log* || true + rm -rf "${REPO_ROOT}"/*.log || true + + if ! python -m pytest -sv --tb=short "${test_file}"; then + echo "Pytest failed for: ${test_file}" + echo "${test_file}" >> "${FAILED_CASE_FILE}" + FAILED_COUNT=$((FAILED_COUNT + 1)) + + echo "" + echo "==================== Dumping Logs ====================" + + for log_dir in "${REPO_ROOT}"/log*; do + if [ -d "${log_dir}" ]; then + echo + echo ">>>> Processing log directory: ${log_dir}" + + # print all workerlog.0 + worker_logs=("${log_dir}"/workerlog.0) + if [ "${#worker_logs[@]}" -gt 0 ]; then + for worker_log in "${worker_logs[@]}"; do + if [ -f "${worker_log}" ]; then + echo "---------------- ${worker_log} (last 100 lines) ----------------" + tail -n 100 "${worker_log}" || true + echo "---------------------------------------------------------------" + fi + done + else + echo "No workerlog.0 found in ${log_dir}" + fi + + echo ">>> grep error in ${log_dir}" + grep -Rni --color=auto "error" "${log_dir}" || true + fi + done + + # print all server logs + server_logs=("${REPO_ROOT}"/*.log) + if [ "${#server_logs[@]}" -gt 0 ]; then + for server_log in "${server_logs[@]}"; do + if [ -f "${server_log}" ]; then + echo + echo "---------------- ${server_log} (last 100 lines) ----------------" + tail -n 100 "${server_log}" || true + echo "---------------------------------------------------------------" + fi + done + else + echo "No *.log files found" + fi + + echo "======================================================" + fi +done + +echo "" +echo "============================================================" + +shopt -u nullglob + +if [ "${FAILED_COUNT}" -ne 0 ]; then + echo "${FAILED_COUNT} test file(s) failed:" + cat "${FAILED_CASE_FILE}" + exit 1 +else + echo "All golang_router end-to-end tests passed" + exit 0 +fi diff --git a/scripts/run_gpu_4cards.sh b/scripts/run_gpu_4cards.sh index abc9a58100..9789f9c53e 100644 --- a/scripts/run_gpu_4cards.sh +++ b/scripts/run_gpu_4cards.sh @@ -22,26 +22,42 @@ for test_file in "${test_files[@]}"; do echo "------------------------------------------------------------" echo "Running pytest: ${test_file}" echo "------------------------------------------------------------" + # Clean up previous logs + rm -rf "${REPO_ROOT}"/log* || true + rm -rf "${REPO_ROOT}"/*.log || true if ! python -m pytest -sv --tb=short "${test_file}"; then echo "Pytest failed for: ${test_file}" echo "${test_file}" >> "${FAILED_CASE_FILE}" FAILED_COUNT=$((FAILED_COUNT + 1)) + echo "" + echo "==================== Dumping Logs ====================" + + if [ -d "${REPO_ROOT}/log" ]; then + echo ">>> grep error in ${REPO_ROOT}/log/" + grep -Rni --color=auto "error" "${REPO_ROOT}/log/" || true + else + echo "${REPO_ROOT}/log directory not found" + fi + if [ -f "${REPO_ROOT}/log/log_0/workerlog.0" ]; then - echo "---------------- workerlog.0 (last 200 lines) -------------" - tail -n 200 "${REPO_ROOT}/log/log_0/workerlog.0" + echo "---------------- workerlog.0 (last 100 lines) -------------" + tail -n 100 "${REPO_ROOT}/log/log_0/workerlog.0" echo "------------------------------------------------------------" fi if [ -f "${REPO_ROOT}/server.log" ]; then - echo "---------------- server.log (last 200 lines) ---------------" - tail -n 200 "${REPO_ROOT}/server.log" + echo "---------------- server.log (last 100 lines) ---------------" + tail -n 100 "${REPO_ROOT}/server.log" echo "------------------------------------------------------------" fi fi done +echo "" +echo "============================================================" + shopt -u nullglob if [ "${FAILED_COUNT}" -ne 0 ]; then diff --git a/tests/cov_pytest.ini b/tests/cov_pytest.ini index 9e339dbb0b..58f3820fca 100644 --- a/tests/cov_pytest.ini +++ b/tests/cov_pytest.ini @@ -9,5 +9,6 @@ addopts = --ignore=tests/xpu_ci --ignore=tests/metax_ci --ignore=tests/e2e/4cards_cases + --ignore=tests/e2e/golang_router --ignore=tests/v1/test_schedule_output.py --ignore=tests/graph_optimization/test_cuda_graph_dynamic_subgraph.py diff --git a/tests/e2e/golang_router/test_ernie_03b_golang_router.py b/tests/e2e/golang_router/test_ernie_03b_golang_router.py new file mode 100644 index 0000000000..6a6f8e4d7d --- /dev/null +++ b/tests/e2e/golang_router/test_ernie_03b_golang_router.py @@ -0,0 +1,364 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Test for router and mixed server + +import json +import os +import shutil +import signal +import subprocess +import sys +import time + +import pytest +import requests +from e2e.utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean, +) + +# Read ports from environment variables; use default values if not set +FD_ROUTER_PORT = int(os.getenv("FD_ROUTER_PORT", 8533)) + +# List of ports to clean before and after tests +PORTS_TO_CLEAN = [ + FD_API_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + FD_CACHE_QUEUE_PORT, + FD_API_PORT + 1, + FD_ENGINE_QUEUE_PORT + 1, + FD_METRICS_PORT + 1, + FD_CACHE_QUEUE_PORT + 1, + FD_ROUTER_PORT, +] + + +def get_registered_number(router_url) -> list: + """ + Get the number of registered models in the router. + + Args: + router_url (str): The base URL of the router, e.g. "http://localhost:8080". + + Returns: + int: The number of registered models. + """ + if not router_url.startswith("http"): + router_url = f"http://{router_url}" + + try: + response = requests.get(f"{router_url}/registered_number", timeout=60) + registered_numbers = response.json() + return registered_numbers + except Exception: + return {"mixed": 0, "prefill": 0, "decode": 0} + + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_server(): + """ + Pytest fixture that runs once per test session: + - Cleans ports before tests + - Starts the API server as a subprocess + - Waits for server port to open (up to 30 seconds) + - Tears down server after all tests finish + """ + print("Pre-test port cleanup...") + clean(PORTS_TO_CLEAN) + + print("log dir clean ") + if os.path.exists("log_router") and os.path.isdir("log_router"): + shutil.rmtree("log_router") + if os.path.exists("log_server_0") and os.path.isdir("log_server_0"): + shutil.rmtree("log_server_0") + if os.path.exists("log_server_1") and os.path.isdir("log_server_1"): + shutil.rmtree("log_server_1") + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "ERNIE-4.5-0.3B-Paddle") + else: + model_path = "baidu/ERNIE-4.5-0.3B-Paddle" + print(f"model_path: {model_path}") + + # router + print("start router...") + env_router = os.environ.copy() + env_router["FD_LOG_DIR"] = "log_router" + router_log_path = "router.log" + + router_cmd = ["fd-router", "--port", str(FD_ROUTER_PORT)] + + with open(router_log_path, "w") as logfile: + process_router = subprocess.Popen( + router_cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_router, + ) + + # server0 + print("start server0...") + env_server_0 = os.environ.copy() + env_server_0["CUDA_VISIBLE_DEVICES"] = "0" + env_server_0["FD_LOG_DIR"] = "log_server_0" + env_server_0["INFERENCE_MSG_QUEUE_ID"] = str(FD_API_PORT) + log_path = "server_0.log" + cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT), + "--tensor-parallel-size", + "1", + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", + str(FD_METRICS_PORT), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT), + "--max-model-len", + "8192", + "--max-num-seqs", + "20", + "--quantization", + "wint8", + "--router", + f"0.0.0.0:{FD_ROUTER_PORT}", + ] + + # Start subprocess in new process group + with open(log_path, "w") as logfile: + process_server_0 = subprocess.Popen( + cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_server_0, + ) + time.sleep(1) + + # server 1 + print("start server 1...") + env_server_1 = os.environ.copy() + env_server_1["CUDA_VISIBLE_DEVICES"] = "1" + env_server_1["INFERENCE_MSG_QUEUE_ID"] = str(FD_API_PORT + 1) + env_server_1["FD_LOG_DIR"] = "log_server_1" + log_path = "server_1.log" + cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT + 1), + "--tensor-parallel-size", + "1", + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT + 1), + "--metrics-port", + str(FD_METRICS_PORT + 1), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT + 1), + "--max-model-len", + "8192", + "--max-num-seqs", + "20", + "--quantization", + "wint8", + "--router", + f"0.0.0.0:{FD_ROUTER_PORT}", + ] + + # Start subprocess in new process group + with open(log_path, "w") as logfile: + process_server_1 = subprocess.Popen( + cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_server_1, + ) + + # Wait up to 300 seconds for API server to be ready + for _ in range(60): + registered_numbers = get_registered_number(f"0.0.0.0:{FD_ROUTER_PORT}") + if registered_numbers["mixed"] >= 2: + print("Mixed servers are both online") + break + time.sleep(5) + else: + print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") + try: + os.killpg(process_router.pid, signal.SIGTERM) + os.killpg(process_server_0.pid, signal.SIGTERM) + os.killpg(process_server_1.pid, signal.SIGTERM) + clean(PORTS_TO_CLEAN) + except Exception as e: + print(f"Failed to kill process group: {e}") + raise RuntimeError(f"API server did not start on port {FD_API_PORT}") + + yield # Run tests + + print("\n===== Post-test server cleanup... =====") + try: + os.killpg(process_router.pid, signal.SIGTERM) + os.killpg(process_server_0.pid, signal.SIGTERM) + os.killpg(process_server_1.pid, signal.SIGTERM) + clean(PORTS_TO_CLEAN) + print(f"server (pid={process_server_0.pid}) terminated") + print(f"server (pid={process_server_1.pid}) terminated") + except Exception as e: + print(f"Failed to terminate API server: {e}") + + +@pytest.fixture(scope="session") +def api_url(request): + """ + Returns the API endpoint URL for chat completions. + """ + return f"http://0.0.0.0:{FD_ROUTER_PORT}/v1/chat/completions" + + +@pytest.fixture(scope="session") +def metrics_url(request): + """ + Returns the metrics endpoint URL. + """ + return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics" + + +@pytest.fixture +def headers(): + """ + Returns common HTTP request headers. + """ + return {"Content-Type": "application/json"} + + +def test_metrics_config(metrics_url): + timeout = 600 + url = metrics_url.replace("metrics", "config-info") + res = requests.get(url, timeout=timeout) + assert res.status_code == 200 + + +def send_request(url, payload, timeout=60): + """ + 发送请求到指定的URL,并返回响应结果。 + """ + headers = { + "Content-Type": "application/json", + } + + try: + res = requests.post(url, headers=headers, json=payload, timeout=timeout) + print("🟢 接收响应中...\n") + return res + except requests.exceptions.Timeout: + print(f"❌ 请求超时(超过 {timeout} 秒)") + return None + except requests.exceptions.RequestException as e: + print(f"❌ 请求失败:{e}") + return None + + +def get_stream_chunks(response): + """解析流式返回,生成chunk List[dict]""" + chunks = [] + + if response.status_code == 200: + for line in response.iter_lines(decode_unicode=True): + if line: + if line.startswith("data: "): + line = line[len("data: ") :] + + if line.strip() == "[DONE]": + break + + try: + chunk = json.loads(line) + chunks.append(chunk) + except Exception as e: + print(f"解析失败: {e}, 行内容: {line}") + else: + print(f"请求失败,状态码: {response.status_code}") + print("返回内容:", response.text) + + return chunks + + +def test_chat_usage_stream(api_url): + """测试流式chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "牛顿的三大运动定律是什么?"}, + ], + "max_tokens": 50, + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + "metadata": {"min_tokens": 10}, + } + + response = send_request(url=api_url, payload=payload) + chunks = get_stream_chunks(response) + result = "".join([x["choices"][0]["delta"]["content"] for x in chunks[:-1]]) + print("Response:", result) + assert result != "", "结果为空" + usage = chunks[-1]["usage"] + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +def test_chat_usage_non_stream(api_url): + """测试非流式chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "牛顿的三大运动定律是什么?"}, + ], + "max_tokens": 50, + "stream": False, + "metadata": {"min_tokens": 10}, + } + + response = send_request(url=api_url, payload=payload).json() + usage = response["usage"] + result = response["choices"][0]["message"]["content"] + assert result != "", "结果为空" + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" diff --git a/tests/e2e/golang_router/test_ernie_03b_pd_golang_router_v1_rdma_tp1.py b/tests/e2e/golang_router/test_ernie_03b_pd_golang_router_v1_rdma_tp1.py new file mode 100644 index 0000000000..5e130c519b --- /dev/null +++ b/tests/e2e/golang_router/test_ernie_03b_pd_golang_router_v1_rdma_tp1.py @@ -0,0 +1,369 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Test splitwise deployment: use local_scheduler + router, +# set ENABLE_V1_KVCACHE_SCHEDULER is 1, use rdma to transfer cache. + +import json +import os +import shutil +import signal +import subprocess +import sys +import time + +import pytest +import requests +from e2e.utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean, + get_registered_number, +) + +# Read ports from environment variables; use default values if not set +FD_CONNECTOR_PORT = int(os.getenv("FD_CONNECTOR_PORT", 8433)) +FD_ROUTER_PORT = int(os.getenv("FD_ROUTER_PORT", 8533)) +FD_RDMA_PORT = int(os.getenv("FD_RDMA_PORT", 8623)) + +# List of ports to clean before and after tests +PORTS_TO_CLEAN = [ + FD_API_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + FD_CACHE_QUEUE_PORT, + FD_CONNECTOR_PORT, + FD_RDMA_PORT, + FD_API_PORT + 1, + FD_ENGINE_QUEUE_PORT + 1, + FD_METRICS_PORT + 1, + FD_CACHE_QUEUE_PORT + 1, + FD_CONNECTOR_PORT + 1, + FD_RDMA_PORT + 1, + FD_ROUTER_PORT, +] + + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_server(): + """ + Pytest fixture that runs once per test session: + - Cleans ports before tests + - Starts the API server as a subprocess + - Waits for server port to open (up to 30 seconds) + - Tears down server after all tests finish + """ + print("Pre-test port cleanup...") + clean(PORTS_TO_CLEAN) + + print("log dir clean ") + if os.path.exists("log_router") and os.path.isdir("log_router"): + shutil.rmtree("log_router") + if os.path.exists("log_prefill") and os.path.isdir("log_prefill"): + shutil.rmtree("log_prefill") + if os.path.exists("log_decode") and os.path.isdir("log_decode"): + shutil.rmtree("log_decode") + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "ERNIE-4.5-0.3B-Paddle") + else: + model_path = "baidu/ERNIE-4.5-0.3B-Paddle" + print(f"model_path: {model_path}") + + # get rdma nics + current_dir = os.path.dirname(os.path.abspath(__file__)) + shell_path = os.path.join(current_dir, "../utils/get_rdma_nics.sh") + output = subprocess.check_output(["bash", shell_path, "gpu"], text=True) + _, rdma_nics = output.split("=") + print(f"shell_path: {shell_path}, rdma_nics: {rdma_nics}") + + # router + print("start router...") + env_router = os.environ.copy() + env_router["FD_LOG_DIR"] = "log_router" + router_log_path = "router.log" + + router_cmd = [ + "fd-router", + "--port", + str(FD_ROUTER_PORT), + "--splitwise", + ] + + with open(router_log_path, "w") as logfile: + process_router = subprocess.Popen( + router_cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_router, + ) + + # prefill实例 + print("start prefill...") + env_prefill = os.environ.copy() + env_prefill["CUDA_VISIBLE_DEVICES"] = "0" + env_prefill["FD_LOG_DIR"] = "log_prefill" + env_prefill["KVCACHE_RDMA_NICS"] = rdma_nics + + prefill_log_path = "prefill.log" + prefill_cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT), + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", + str(FD_METRICS_PORT), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT), + "--max-model-len", + "8192", + "--splitwise-role", + "prefill", + "--cache-transfer-protocol", + "rdma", + "--rdma-comm-ports", + str(FD_RDMA_PORT), + "--pd-comm-port", + str(FD_CONNECTOR_PORT), + "--router", + f"0.0.0.0:{FD_ROUTER_PORT}", + ] + + # Start subprocess in new process group + with open(prefill_log_path, "w") as logfile: + process_prefill = subprocess.Popen( + prefill_cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_prefill, + ) + time.sleep(1) + + # decode实例 + print("start decode...") + env_decode = os.environ.copy() + env_decode["CUDA_VISIBLE_DEVICES"] = "1" + env_decode["FD_LOG_DIR"] = "log_decode" + env_decode["KVCACHE_RDMA_NICS"] = rdma_nics + + decode_log_path = "decode.log" + decode_cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT + 1), + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT + 1), + "--metrics-port", + str(FD_METRICS_PORT + 1), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT + 1), + "--max-model-len", + "8192", + "--splitwise-role", + "decode", + "--cache-transfer-protocol", + "rdma", + "--rdma-comm-ports", + str(FD_RDMA_PORT + 1), + "--pd-comm-port", + str(FD_CONNECTOR_PORT + 1), + "--router", + f"0.0.0.0:{FD_ROUTER_PORT}", + ] + + # Start subprocess in new process group + with open(decode_log_path, "w") as logfile: + process_decode = subprocess.Popen( + decode_cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_decode, + ) + + # Wait up to 300 seconds for API server to be ready + for _ in range(60): + registered_numbers = get_registered_number(f"0.0.0.0:{FD_ROUTER_PORT}") + if registered_numbers["prefill"] >= 1 and registered_numbers["decode"] >= 1: + print("Prefill and decode servers are both online") + break + time.sleep(5) + else: + print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") + try: + os.killpg(process_router.pid, signal.SIGTERM) + os.killpg(process_prefill.pid, signal.SIGTERM) + os.killpg(process_decode.pid, signal.SIGTERM) + clean(PORTS_TO_CLEAN) + except Exception as e: + print(f"Failed to kill process group: {e}") + raise RuntimeError(f"API server did not start on port {FD_API_PORT}") + + yield # Run tests + + print("\n===== Post-test server cleanup... =====") + try: + os.killpg(process_router.pid, signal.SIGTERM) + os.killpg(process_prefill.pid, signal.SIGTERM) + os.killpg(process_decode.pid, signal.SIGTERM) + clean(PORTS_TO_CLEAN) + print(f"Prefill server (pid={process_prefill.pid}) terminated") + print(f"Decode server (pid={process_decode.pid}) terminated") + except Exception as e: + print(f"Failed to terminate API server: {e}") + + +@pytest.fixture(scope="session") +def api_url(request): + """ + Returns the API endpoint URL for chat completions. + """ + return f"http://0.0.0.0:{FD_ROUTER_PORT}/v1/chat/completions" + + +@pytest.fixture(scope="session") +def metrics_url(request): + """ + Returns the metrics endpoint URL. + """ + return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics" + + +@pytest.fixture +def headers(): + """ + Returns common HTTP request headers. + """ + return {"Content-Type": "application/json"} + + +def test_metrics_config(metrics_url): + timeout = 600 + url = metrics_url.replace("metrics", "config-info") + res = requests.get(url, timeout=timeout) + assert res.status_code == 200 + + +def send_request(url, payload, timeout=60): + """ + 发送请求到指定的URL,并返回响应结果。 + """ + headers = { + "Content-Type": "application/json", + } + + try: + res = requests.post(url, headers=headers, json=payload, timeout=timeout) + print("🟢 接收响应中...\n") + return res + except requests.exceptions.Timeout: + print(f"❌ 请求超时(超过 {timeout} 秒)") + return None + except requests.exceptions.RequestException as e: + print(f"❌ 请求失败:{e}") + return None + + +def get_stream_chunks(response): + """解析流式返回,生成chunk List[dict]""" + chunks = [] + + if response.status_code == 200: + for line in response.iter_lines(decode_unicode=True): + if line: + if line.startswith("data: "): + line = line[len("data: ") :] + + if line.strip() == "[DONE]": + break + + try: + chunk = json.loads(line) + chunks.append(chunk) + except Exception as e: + print(f"解析失败: {e}, 行内容: {line}") + else: + print(f"请求失败,状态码: {response.status_code}") + print("返回内容:", response.text) + + return chunks + + +def test_chat_usage_stream(api_url): + """测试流式chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "牛顿的三大运动定律是什么?"}, + ], + "max_tokens": 50, + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + "metadata": {"min_tokens": 10}, + } + + response = send_request(url=api_url, payload=payload) + chunks = get_stream_chunks(response) + result = "".join([x["choices"][0]["delta"]["content"] for x in chunks[:-1]]) + print("Decode Response:", result) + assert result != "", "结果为空" + usage = chunks[-1]["usage"] + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +def test_chat_usage_non_stream(api_url): + """测试非流式chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "牛顿的三大运动定律是什么?"}, + ], + "max_tokens": 50, + "stream": False, + "metadata": {"min_tokens": 10}, + } + + response = send_request(url=api_url, payload=payload).json() + usage = response["usage"] + result = response["choices"][0]["message"]["content"] + assert result != "", "结果为空" + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens"