From b61731bb961aaa4d77be8cc2ea32021d4a1b2e36 Mon Sep 17 00:00:00 2001 From: mouxin <494624263qq@gmail.com> Date: Tue, 17 Mar 2026 15:23:13 +0800 Subject: [PATCH] [Feature][Docs] Adjust prefill release & expose load metrics (#6884) --- docs/online_serving/router.md | 28 +++- docs/online_serving/router_faq.md | 51 ++++++- docs/zh/online_serving/router.md | 19 ++- docs/zh/online_serving/router_faq.md | 51 ++++++- fastdeploy/golang_router/cmd/main.go | 2 + .../config/config.example.yaml | 3 +- .../config/config.example.yaml | 3 +- .../internal/common/interface.go | 1 + .../golang_router/internal/config/config.go | 4 + .../internal/gateway/completions.go | 97 ++++++++---- .../internal/gateway/completions_test.go | 140 ++++++++++++++++++ .../golang_router/internal/manager/handler.go | 9 ++ .../golang_router/internal/manager/metrics.go | 8 +- .../internal/manager/metrics_test.go | 13 +- .../handler/fd_remote_metrics_score.go | 27 ++++ .../internal/scheduler/handler/handler.go | 49 +++++- .../scheduler/handler/handler_test.go | 4 + .../scheduler/handler/prefill_cache_aware.go | 97 +++++++++++- fastdeploy/golang_router/pkg/logger/logger.go | 11 +- .../golang_router/pkg/metrics/metrics.go | 18 +++ 20 files changed, 576 insertions(+), 59 deletions(-) create mode 100644 fastdeploy/golang_router/internal/scheduler/handler/fd_remote_metrics_score.go diff --git a/docs/online_serving/router.md b/docs/online_serving/router.md index 3a388f363e..d90c3574ca 100644 --- a/docs/online_serving/router.md +++ b/docs/online_serving/router.md @@ -190,7 +190,7 @@ server: splitwise: true # true enables PD disaggregation; false disables it scheduler: - policy: "power_of_two" # Scheduling policy (optional): random, power_of_two, round_robin, process_tokens, request_num, cache_aware, fd_metrics_score + policy: "power_of_two" # Scheduling policy (optional): random, power_of_two, round_robin, process_tokens, request_num, cache_aware, remote_cache_aware, fd_metrics_score, fd_remote_metrics_score prefill-policy: "cache_aware" # Prefill scheduling policy in PD mode decode-policy: "fd_metrics_score" # Decode scheduling policy in PD mode eviction-interval-secs: 60 # Cache eviction interval for CacheAware scheduling @@ -199,9 +199,13 @@ scheduler: hit-ratio-weight: 1.0 # Cache hit ratio weight load-balance-weight: 0.05 # Load balancing weight cache-block-size: 4 # Cache block size - tokenizer-url: "http://0.0.0.0:8098" # Tokenizer service endpoint (optional) - tokenizer-timeout-secs: 2 # Tokenizer service timeout + # tokenizer-url: "http://0.0.0.0:8098" # Tokenizer service endpoint (optional), cache_aware uses character-level tokenization when not configured. + # Note: Enabling this option causes a synchronous remote tokenizer call on every scheduling decision, + # introducing additional network latency. Only enable it when precise token-level tokenization + # is needed to improve cache hit rate. + # tokenizer-timeout-secs: 2 # Tokenizer service timeout; default: 2 waiting-weight: 10 # Waiting weight for CacheAware scheduling + stats-interval-secs: 5 # Stats logging interval in seconds, includes load and cache hit rate statistics; default: 5 manager: health-failure-threshold: 3 # Number of failed health checks before marking unhealthy @@ -254,6 +258,24 @@ Instance Registration Parameters: Among these, `role`, `host_ip`, and `port` are required; all other parameters are optional. +## Scheduling Strategies + +The Router supports the following scheduling strategies, configurable via `policy` (mixed mode), `prefill-policy`, and `decode-policy` (PD disaggregated mode) fields in the configuration file. + +**Default strategies**: When not configured, prefill nodes default to `process_tokens`, mixed and decode nodes default to `request_num`. + +| Strategy | Applicable Scenario | Implementation | +|----------|---------------------|----------------| +| `random` | General | Randomly selects one available instance, stateless, suitable for lightweight scenarios. | +| `round_robin` | General | Uses atomic counter to cycle through instance list, distributing requests evenly in order. | +| `power_of_two` | General | Randomly picks two instances, compares their concurrent request counts, selects the one with lower load. | +| `process_tokens` | **prefill (default)** | Iterates all instances, selects the one with the fewest tokens currently being processed (in-memory counting), suitable for prefill long-request load balancing. | +| `request_num` | **mixed / decode (default)** | Iterates all instances, selects the one with the fewest concurrent requests (in-memory counting), suitable for decode and mixed scenarios. | +| `fd_metrics_score` | mixed / decode | Uses in-memory counting to get running/waiting request counts, scores by `running + waiting × waitingWeight`, selects the instance with the lowest score. | +| `fd_remote_metrics_score` | mixed / decode | Fetches running/waiting request counts from each instance's remote `/metrics` endpoint in real-time, scores by `running + waiting × waitingWeight`, selects the instance with the lowest score. Requires `metrics_port` in instance registration. **Note: A synchronous remote HTTP request is issued on every scheduling decision. With a large number of instances or poor network conditions, this can significantly increase scheduling latency. Evaluate your deployment conditions carefully before enabling this strategy.** | +| `cache_aware` | prefill | Maintains KV Cache prefix hit information per instance via Radix Tree, selects instances by combining hit ratio and load scores (in-memory counting); automatically falls back to `process_tokens` when load is severely imbalanced. | +| `remote_cache_aware` | prefill | Same cache-aware strategy as `cache_aware`, but uses remote `/metrics` endpoint for instance load data. Requires `metrics_port` in instance registration. **Note: A synchronous remote HTTP request is issued on every scheduling decision. With a large number of instances or poor network conditions, this can significantly increase scheduling latency. Evaluate your deployment conditions carefully before enabling this strategy.** | + ## Troubleshooting If you encounter issues while using the Router, please refer to the [Router Troubleshooting Guide](router_faq.md), which covers common log analysis, response output interpretation, and troubleshooting methods. diff --git a/docs/online_serving/router_faq.md b/docs/online_serving/router_faq.md index d12e438832..49083539d4 100644 --- a/docs/online_serving/router_faq.md +++ b/docs/online_serving/router_faq.md @@ -23,6 +23,12 @@ For basic Router usage, please refer to [Load-Balancing Scheduling Router](route | `Failed to register instance from index {index}: {error}` | Instance at index {index} in config file failed to register | That instance was not registered | Health status, registration parameters | | `failed to send request to {url} with error: {error}` | Health check request failed to send | The instance may be marked as unhealthy | Network connectivity, proxy settings | | `scanner error: {error}` | Error occurred while reading backend streaming response | The current request may fail | Backend instance status | +| `[prefill] scanner error: {error}, message={message}` | Error occurred while reading Prefill backend streaming response | The current Prefill request may fail | Backend instance status | +| `[prefill] copy error: {error}, message={message}` | Error occurred while copying Prefill response data | The current Prefill request may fail | Backend instance status | +| `Panic recovered: {error}` | A panic occurred during request processing and was recovered | The current request fails, but the service continues running | Backend instance status, request content | +| `empty baseURL provided` | Health check received an empty base URL | Health check cannot be performed | Registration parameters | +| `failed to create request: {error}` | Failed to create health check request | The instance may be marked as unhealthy | Network environment | +| `failed to read response body: {error}` | Failed to read health check response body | The instance may be marked as unhealthy | Backend instance status | ### Warn-Level Logs @@ -30,7 +36,9 @@ For basic Router usage, please refer to [Load-Balancing Scheduling Router](route | :--- | :--- | :--- | :--- | | `Server {url} is not healthy` | The instance at this URL failed health check | Router cannot register the instance, or will remove it from the registered list | Health status | | `Instance {url} role is unknown` | Instance role cannot be recognized | The instance will not be added to the scheduling list | Registration parameters | -| `cache-aware prefill: tokenizer failed, fallback to process_tokens: {error}` | Tokenizer service call failed, automatically falling back to process_tokens strategy | Prefill scheduling temporarily does not use cache_aware strategy; normal request processing is not affected | Tokenizer service status | +| `cache-aware prefill: tokenizer failed, fallback to char tokens: {error}` | Tokenizer service call failed, automatically falling back to character-based tokenization | cache_aware strategy remains active, using character-based tokenization for cache matching instead of the Tokenizer; normal request processing is not affected | Tokenizer service status | +| `cache-aware prefill: tokenize failed, fallback to process_tokens: {error}` | Tokenization completely failed (e.g., empty input), falling back to process_tokens strategy | Prefill scheduling temporarily does not use cache_aware strategy; normal request processing is not affected | Request content, Tokenizer service status | +| `cache-aware prefill: final strategy: process_tokens, reason: tokenize failed: {error}. ts_ms={ts}` | Tokenization failed (new format), falling back to process_tokens strategy | Prefill scheduling temporarily does not use cache_aware strategy; normal request processing is not affected | Request content, Tokenizer service status | ### Info-Level Logs @@ -42,6 +50,42 @@ For basic Router usage, please refer to [Load-Balancing Scheduling Router](route | `No instances found in config file {path}` | No instances found in the registration config file | Check whether register.yaml is empty | | `Request completed successfully.` | Request processing completed | Normal operation log | | `Request failed, retrying...` | Request failed, retrying | Router will retry up to 3 times | +| `select worker (prefill): {url}, tokens: {tokens}` | Prefill scheduler selected a worker, showing current token processing count | Normal operation log | +| `select worker ({type}): {url}, count: {count}` | Decode/Mixed scheduler selected a worker, showing current request concurrency | Normal operation log | +| `release worker: {url}, count: {count}` | Request ended, worker counter released | Normal operation log | +| `release prefill tokens: {url}, tokens: {tokens}` | Prefill request ended, token load released | Normal operation log | +| `cleanup unhealthy worker counter: {url}` | Cleaned up counter for unhealthy worker | Normal operation log | +| `removed counters for {count} unhealthy workers: {urls}` | Batch cleanup of counters for unhealthy workers | Normal operation log | +| `[stats] total_running={n}, workers: [{loads}], cache_hit_rate={rate}% (hits={hits}/total={total})` | Periodic stats: total requests, worker loads, cache hit rate | Normal operation log, useful for monitoring and tuning | +| `Parsing completed; starting worker selection.` | Request parsing completed, starting worker selection | Normal operation log | +| `Request completed with an error.` | Request processing completed with an error | Check backend instance status | +| `[SelectWorkerPair] decode selection failed, releasing prefill counter url={url}` | Decode selection failed in PD disaggregated mode, releasing Prefill counter | Error handling log | +| `[prefill] first chunk received, release counter url={url}` | Prefill streaming response received first chunk, counter released | Normal operation log | +| `[prefill] non-stream prefill response done, release counter url={url}` | Prefill non-streaming response completed, counter released | Normal operation log | +| `[prefill] backendResp is nil or backendResp.Body is nil, url={url}` | Prefill backend response is nil | May indicate backend connection issue | +| `[prefill] release in defer (fallback) url={url}, isStream={bool}` | Fallback resource release when Prefill request exits abnormally | Error handling log | +| `[prefill] release in CommonCompletions defer (error path) url={url}` | Prefill resource release on error path | Error handling log | +| `cache-aware prefill: final strategy: process_tokens, reason: strategy not initialized` | cache_aware strategy not initialized, falling back to process_tokens | Check cache_aware configuration | +| `cache-aware prefill: final strategy: process_tokens, reason: load imbalanced, loads={loads}. ts_ms={ts}` | Load imbalanced across instances, falling back to process_tokens strategy | Normal operation log, automatic load balancing switch | +| `cache-aware prefill: final strategy: cache_aware_scoring, selected={url}, loads={loads}, hitRatios={ratios}. ts_ms={ts}` | cache_aware scoring strategy selected a worker | Normal operation log, showing loads and hit ratios | +| `[{method}] {path} {proto} {status} {latency} {clientIP}` | HTTP request access log | Normal operation log, records basic info for each request | +| `before SelectWorker prefill. ts_ms={ts}` | Starting Prefill worker selection in PD disaggregated mode | Normal operation log, for performance tracing | +| `before SelectWorker decode, after prefill. ts_ms={ts}` | Starting Decode worker selection after Prefill selection | Normal operation log, for performance tracing | +| `after SelectWorker decode, before return. ts_ms={ts}` | Decode worker selection completed | Normal operation log, for performance tracing | + +### Debug-Level Logs + +> Debug-level logs are only output when the log level is set to `debug`, typically used for development debugging. + +| Log Message | Meaning | Description | +| :--- | :--- | :--- | +| `Healthy instances: prefill={urls}, decode={urls}, mixed={urls}` | Lists healthy instances for each role | Useful for verifying instance discovery | +| `cache-aware prefill: hashes={n} workers={n} load={loads} hit={hits}` | Hash count, worker count, and load info for cache_aware strategy | Useful for debugging cache hits | +| `cache-aware prefill: tokenizer tokens={tokens}` | Tokenizer tokenization result | Useful for debugging tokenization results | +| `cache-aware score: worker={url} hit={hit} loadRatio={ratio} score={score}` | Scoring details for cache_aware strategy | Useful for debugging scheduling decisions | +| `radix match: hashes={n} matched_len={n} node_children={n}` | Radix tree match details | Useful for debugging cache matching | +| `radix record: worker={url} hashes={n} node_depth={n}` | Radix tree record details | Useful for debugging cache recording | +| `radix eviction: removed={n} nodeCount={n}` | Radix tree eviction details | Useful for debugging cache eviction | ## Common Response Output Analysis @@ -189,9 +233,10 @@ If `Failed to start server` appears in startup logs, check: ### Tokenizer Service (cache_aware Strategy) -When using the `cache_aware` scheduling strategy, the Router calls a Tokenizer service to tokenize requests for cache hit ratio computation. When the Tokenizer service is unavailable, the Router will log a Warn-level message: `tokenizer failed, fallback to process_tokens`. +When using the `cache_aware` scheduling strategy, the Router calls a Tokenizer service to tokenize requests for cache hit ratio computation. When the Tokenizer service is unavailable, the Router has a two-level degradation mechanism: -**This does not affect normal request processing** — the Router has a built-in degradation mechanism that automatically falls back to the `process_tokens` strategy for continued scheduling. The only impact is the temporary loss of cache-aware optimization. +1. **Fallback to character-based tokenization** (common case): The log will show `tokenizer failed, fallback to char tokens`. The cache_aware strategy remains active, using character-based tokenization for cache matching instead of the Tokenizer. Cache hit accuracy may decrease, but normal request processing is not affected. +2. **Fallback to process_tokens strategy** (extreme case): When tokenization completely fails (e.g., empty request content), the log will show `tokenize failed, fallback to process_tokens`. The cache_aware strategy temporarily becomes inactive, and scheduling falls back to token processing volume. Normal request processing is not affected. To restore full cache_aware functionality: diff --git a/docs/zh/online_serving/router.md b/docs/zh/online_serving/router.md index 45576deb07..eaf50722c7 100644 --- a/docs/zh/online_serving/router.md +++ b/docs/zh/online_serving/router.md @@ -190,7 +190,7 @@ server: splitwise: true # true代表开启pd分离模式,false代表开启非pd分离模式 scheduler: - policy: "power_of_two" # 调度策略(可选): random, power_of_two, round_robin, process_tokens, request_num, cache_aware, fd_metrics_score; 默认: request_num + policy: "power_of_two" # 调度策略(可选): random, power_of_two, round_robin, process_tokens, request_num, cache_aware, remote_cache_aware, fd_metrics_score, fd_remote_metrics_score; 默认: request_num prefill-policy: "cache_aware" # pd分离模式下prefill节点调度策略; 默认: process_tokens decode-policy: "fd_metrics_score" # pd分离模式下decode节点调度策略; 默认: request_num eviction-interval-secs: 60 # cache-aware策略清理过期cache的间隔时间 @@ -199,9 +199,12 @@ scheduler: hit-ratio-weight: 1.0 # cache-aware策略命中率权重 load-balance-weight: 0.05 # cache-aware策略负载均衡权重 cache-block-size: 4 # cache-aware策略cache block大小 - tokenizer-url: "http://0.0.0.0:8098" # tokenizer服务地址(可选) - tokenizer-timeout-secs: 2 # tokenizer服务超时时间 + # tokenizer-url: "http://0.0.0.0:8098" # tokenizer服务地址(可选), 不配置时cache_aware策略自动使用字符级分词。 + # 注意:配置此项会在每次调度时同步调用远程tokenizer服务,引入额外网络时延, + # 仅在需要精确token级分词以提升cache命中率时再考虑启用。 + # tokenizer-timeout-secs: 2 # tokenizer服务超时时间; 默认: 2 waiting-weight: 10 # cache-aware策略等待权重 + stats-interval-secs: 5 # 日志统计信息打印间隔时间(秒), 包含负载和缓存命中率等统计数据; 默认: 5 manager: health-failure-threshold: 3 # 健康检查失败次数,超过次数后认为节点不健康 @@ -265,10 +268,12 @@ Router 支持以下调度策略,可通过配置文件中的 `policy`(mixed | `random` | 通用 | 从所有可用实例中随机选择一个,无状态感知,适合轻量场景。 | | `round_robin` | 通用 | 使用原子计数器对实例列表循环取模,按顺序均匀分发请求。 | | `power_of_two` | 通用 | 随机选取两个实例,比较其当前并发请求数,选择负载较低的一个。 | -| `process_tokens` | **prefill(默认)** | 遍历所有实例,选择当前正在处理的 token 数最少的实例,适合 prefill 阶段的长请求负载均衡。 | -| `request_num` | **mixed / decode(默认)** | 遍历所有实例,选择当前并发请求数最少的实例,适合 decode 及 mixed 场景的请求均衡。 | -| `fd_metrics_score` | mixed / decode | 实时从各实例的 metrics 接口获取 running/waiting 请求数,按 `running + waiting × waitingWeight` 打分,选择得分最低的实例。 | -| `cache_aware` | prefill | 基于 Radix Tree 维护各实例的 KV Cache 前缀命中情况,综合命中率与负载打分选择实例;负载严重不均衡时自动回退至 `process_tokens`。 | +| `process_tokens` | **prefill(默认)** | 遍历所有实例,选择当前正在处理的 token 数最少的实例(内存计数),适合 prefill 阶段的长请求负载均衡。 | +| `request_num` | **mixed / decode(默认)** | 遍历所有实例,选择当前并发请求数最少的实例(内存计数),适合 decode 及 mixed 场景的请求均衡。 | +| `fd_metrics_score` | mixed / decode | 基于内存计数获取 running/waiting 请求数,按 `running + waiting × waitingWeight` 打分,选择得分最低的实例。 | +| `fd_remote_metrics_score` | mixed / decode | 实时从各实例的远程 `/metrics` 接口获取 running/waiting 请求数,按 `running + waiting × waitingWeight` 打分,选择得分最低的实例。需要实例注册时提供 `metrics_port`。**注意:每次调度时会同步发起远程 HTTP 请求,在实例数量较多或网络条件较差时会显著增加调度时延,请结合实际情况评估后再启用。** | +| `cache_aware` | prefill | 基于 Radix Tree 维护各实例的 KV Cache 前缀命中情况,综合命中率与负载打分(内存计数)选择实例;负载严重不均衡时自动回退至 `process_tokens`。 | +| `remote_cache_aware` | prefill | 与 `cache_aware` 相同的缓存感知策略,但使用远程 `/metrics` 接口获取实例负载数据。需要实例注册时提供 `metrics_port`。**注意:每次调度时会同步发起远程 HTTP 请求,在实例数量较多或网络条件较差时会显著增加调度时延,请结合实际情况评估后再启用。** | ## 常见问题排查 diff --git a/docs/zh/online_serving/router_faq.md b/docs/zh/online_serving/router_faq.md index 359ec7c8eb..a42ed01528 100644 --- a/docs/zh/online_serving/router_faq.md +++ b/docs/zh/online_serving/router_faq.md @@ -23,6 +23,12 @@ Router 的基本使用方式请参考 [负载均衡调度 Router](router.md)。 | `Failed to register instance from index {index}: {error}` | 配置文件中第 {index} 个实例注册失败 | 该实例未能成功注册 | 健康状况、注册参数 | | `failed to send request to {url} with error: {error}` | 健康检查请求发送失败 | 该实例可能被判定为不健康 | 网络连通性、代理设置 | | `scanner error: {error}` | 读取后端流式响应时发生错误 | 当前请求可能失败 | 后端实例状态 | +| `[prefill] scanner error: {error}, message={message}` | 读取 Prefill 后端流式响应时发生错误 | 当前 Prefill 请求可能失败 | 后端实例状态 | +| `[prefill] copy error: {error}, message={message}` | 复制 Prefill 响应数据时发生错误 | 当前 Prefill 请求可能失败 | 后端实例状态 | +| `Panic recovered: {error}` | 请求处理过程中发生 panic 并被恢复 | 当前请求失败,但服务继续运行 | 后端实例状态、请求内容 | +| `empty baseURL provided` | 健康检查时传入了空的基础 URL | 健康检查无法执行 | 注册参数 | +| `failed to create request: {error}` | 创建健康检查请求失败 | 该实例可能被判定为不健康 | 网络环境 | +| `failed to read response body: {error}` | 读取健康检查响应体失败 | 该实例可能被判定为不健康 | 后端实例状态 | ### Warn 级别日志 @@ -30,7 +36,9 @@ Router 的基本使用方式请参考 [负载均衡调度 Router](router.md)。 | :--- | :--- | :--- | :--- | | `Server {url} is not healthy` | 该 URL 对应的实例未通过健康检查 | Router 无法注册该实例,或将该实例从已注册列表中移除 | 健康状况 | | `Instance {url} role is unknown` | 实例角色无法识别 | 该实例不会被加入调度列表 | 注册参数 | -| `cache-aware prefill: tokenizer failed, fallback to process_tokens: {error}` | Tokenizer 服务调用失败,已自动回退至 process_tokens 策略 | Prefill 调度暂时不使用 cache_aware 策略,不影响正常请求处理 | Tokenizer 服务状态 | +| `cache-aware prefill: tokenizer failed, fallback to char tokens: {error}` | Tokenizer 服务调用失败,已自动回退至字符级分词 | cache_aware 策略仍然生效,使用字符级分词代替 Tokenizer 进行缓存匹配,不影响正常请求处理 | Tokenizer 服务状态 | +| `cache-aware prefill: tokenize failed, fallback to process_tokens: {error}` | 分词彻底失败(如输入为空),回退至 process_tokens 策略 | Prefill 调度暂时不使用 cache_aware 策略,不影响正常请求处理 | 请求内容、Tokenizer 服务状态 | +| `cache-aware prefill: final strategy: process_tokens, reason: tokenize failed: {error}. ts_ms={ts}` | 分词失败(新格式),回退至 process_tokens 策略 | Prefill 调度暂时不使用 cache_aware 策略,不影响正常请求处理 | 请求内容、Tokenizer 服务状态 | ### Info 级别日志 @@ -42,6 +50,42 @@ Router 的基本使用方式请参考 [负载均衡调度 Router](router.md)。 | `No instances found in config file {path}` | 注册配置文件中未找到实例信息 | 请检查 register.yaml 内容是否为空 | | `Request completed successfully.` | 请求处理完成 | 正常运行日志 | | `Request failed, retrying...` | 请求失败,正在进行重试 | Router 最多重试 3 次 | +| `select worker (prefill): {url}, tokens: {tokens}` | Prefill 调度选中 Worker,显示当前 token 处理量 | 正常运行日志 | +| `select worker ({type}): {url}, count: {count}` | Decode/Mixed 调度选中 Worker,显示当前请求并发数 | 正常运行日志 | +| `release worker: {url}, count: {count}` | 请求结束,释放 Worker 计数器 | 正常运行日志 | +| `release prefill tokens: {url}, tokens: {tokens}` | Prefill 请求结束,释放 token 负载 | 正常运行日志 | +| `cleanup unhealthy worker counter: {url}` | 清理不健康 Worker 的计数器 | 正常运行日志 | +| `removed counters for {count} unhealthy workers: {urls}` | 批量清理不健康 Worker 的计数器 | 正常运行日志 | +| `[stats] total_running={n}, workers: [{loads}], cache_hit_rate={rate}% (hits={hits}/total={total})` | 周期性统计:总请求数、各 Worker 负载、缓存命中率 | 正常运行日志,用于监控调优 | +| `Parsing completed; starting worker selection.` | 请求解析完成,开始选择 Worker | 正常运行日志 | +| `Request completed with an error.` | 请求处理完成但发生错误 | 请排查后端实例状态 | +| `[SelectWorkerPair] decode selection failed, releasing prefill counter url={url}` | PD 分离模式下 Decode 选择失败,释放 Prefill 计数器 | 异常处理日志 | +| `[prefill] first chunk received, release counter url={url}` | Prefill 流式响应收到首个数据块,释放计数器 | 正常运行日志 | +| `[prefill] non-stream prefill response done, release counter url={url}` | Prefill 非流式响应完成,释放计数器 | 正常运行日志 | +| `[prefill] backendResp is nil or backendResp.Body is nil, url={url}` | Prefill 后端响应为空 | 可能表示后端连接异常 | +| `[prefill] release in defer (fallback) url={url}, isStream={bool}` | Prefill 请求异常退出时的兜底资源释放 | 异常处理日志 | +| `[prefill] release in CommonCompletions defer (error path) url={url}` | 请求出错时释放 Prefill 资源 | 异常处理日志 | +| `cache-aware prefill: final strategy: process_tokens, reason: strategy not initialized` | cache_aware 策略未初始化,回退至 process_tokens | 请检查 cache_aware 相关配置 | +| `cache-aware prefill: final strategy: process_tokens, reason: load imbalanced, loads={loads}. ts_ms={ts}` | 实例间负载不均衡,回退至 process_tokens 策略 | 正常运行日志,负载均衡自动切换 | +| `cache-aware prefill: final strategy: cache_aware_scoring, selected={url}, loads={loads}, hitRatios={ratios}. ts_ms={ts}` | cache_aware 打分策略选中 Worker | 正常运行日志,显示负载和命中率 | +| `[{method}] {path} {proto} {status} {latency} {clientIP}` | HTTP 请求访问日志 | 正常运行日志,记录每个请求的基本信息 | +| `before SelectWorker prefill. ts_ms={ts}` | PD 分离模式下开始选择 Prefill Worker | 正常运行日志,用于性能追踪 | +| `before SelectWorker decode, after prefill. ts_ms={ts}` | Prefill 选择完成后开始选择 Decode Worker | 正常运行日志,用于性能追踪 | +| `after SelectWorker decode, before return. ts_ms={ts}` | Decode Worker 选择完成 | 正常运行日志,用于性能追踪 | + +### Debug 级别日志 + +> Debug 级别日志需要将日志级别设置为 `debug` 才会输出,通常用于开发调试。 + +| 日志表现 | 日志含义 | 说明 | +| :--- | :--- | :--- | +| `Healthy instances: prefill={urls}, decode={urls}, mixed={urls}` | 列出当前各角色的健康实例列表 | 用于确认实例发现是否正常 | +| `cache-aware prefill: hashes={n} workers={n} load={loads} hit={hits}` | cache_aware 策略的 hash 数量、Worker 数量及负载信息 | 用于调试缓存命中 | +| `cache-aware prefill: tokenizer tokens={tokens}` | Tokenizer 分词结果 | 用于调试分词结果 | +| `cache-aware score: worker={url} hit={hit} loadRatio={ratio} score={score}` | cache_aware 策略的打分详情 | 用于调试调度决策 | +| `radix match: hashes={n} matched_len={n} node_children={n}` | 前缀树匹配详情 | 用于调试缓存匹配 | +| `radix record: worker={url} hashes={n} node_depth={n}` | 前缀树记录详情 | 用于调试缓存记录 | +| `radix eviction: removed={n} nodeCount={n}` | 前缀树淘汰详情 | 用于调试缓存淘汰 | ## 常见返回输出分析 @@ -189,9 +233,10 @@ PD 分离模式下建议完整配置以下参数,以确保 KV Cache 传输正 ### Tokenizer 服务(cache_aware 策略) -使用 `cache_aware` 调度策略时,Router 会调用 Tokenizer 服务对请求进行分词以计算缓存命中率。当 Tokenizer 服务不可用时,日志会出现 `tokenizer failed, fallback to process_tokens` 的 Warn 级别提示。 +使用 `cache_aware` 调度策略时,Router 会调用 Tokenizer 服务对请求进行分词以计算缓存命中率。当 Tokenizer 服务不可用时,Router 内置了两级退化机制: -**这不影响正常的请求处理**——Router 内置了退化机制,会自动回退至 `process_tokens` 策略继续调度,只是暂时无法利用缓存感知的优化能力。 +1. **回退至字符级分词**(常见情况):日志出现 `tokenizer failed, fallback to char tokens`。此时 cache_aware 策略仍然生效,只是使用字符级分词代替 Tokenizer 进行缓存匹配,缓存命中精度会有所下降,但不影响正常请求处理。 +2. **回退至 process_tokens 策略**(极端情况):当分词彻底失败(如请求内容为空)时,日志出现 `tokenize failed, fallback to process_tokens`。此时 cache_aware 策略暂时不生效,改为按 token 处理量进行调度,同样不影响正常请求处理。 如需恢复 cache_aware 策略的完整功能: diff --git a/fastdeploy/golang_router/cmd/main.go b/fastdeploy/golang_router/cmd/main.go index 82d9a2949c..e0e8c98e13 100644 --- a/fastdeploy/golang_router/cmd/main.go +++ b/fastdeploy/golang_router/cmd/main.go @@ -57,6 +57,8 @@ func main() { go manager.MonitorInstanceHealth(context.Background(), intervalSecs) intervalCleanupSecs := cfg.Scheduler.EvictionIntervalSecs go scheduler_handler.StartBackupCleanupTask(context.Background(), intervalCleanupSecs) + statsIntervalSecs := cfg.Scheduler.StatsIntervalSecs + go scheduler_handler.StartStatsReporter(context.Background(), statsIntervalSecs) // Start server addr := ":" + cfg.Server.Port diff --git a/fastdeploy/golang_router/examples/run_with_config/config/config.example.yaml b/fastdeploy/golang_router/examples/run_with_config/config/config.example.yaml index 8fa80b7630..712b2bcee7 100644 --- a/fastdeploy/golang_router/examples/run_with_config/config/config.example.yaml +++ b/fastdeploy/golang_router/examples/run_with_config/config/config.example.yaml @@ -14,9 +14,8 @@ scheduler: hit-ratio-weight: 1.0 load-balance-weight: 0.05 cache-block-size: 4 - tokenizer-url: "http://0.0.0.0:8098" # optional tokenizer service endpoint - tokenizer-timeout-secs: 2 waiting-weight: 10 + stats-interval-secs: 5 # interval in seconds for periodic stats logging (running requests, cache hit rate) manager: health-failure-threshold: 3 diff --git a/fastdeploy/golang_router/examples/run_with_default_workers/config/config.example.yaml b/fastdeploy/golang_router/examples/run_with_default_workers/config/config.example.yaml index 8fa80b7630..712b2bcee7 100644 --- a/fastdeploy/golang_router/examples/run_with_default_workers/config/config.example.yaml +++ b/fastdeploy/golang_router/examples/run_with_default_workers/config/config.example.yaml @@ -14,9 +14,8 @@ scheduler: hit-ratio-weight: 1.0 load-balance-weight: 0.05 cache-block-size: 4 - tokenizer-url: "http://0.0.0.0:8098" # optional tokenizer service endpoint - tokenizer-timeout-secs: 2 waiting-weight: 10 + stats-interval-secs: 5 # interval in seconds for periodic stats logging (running requests, cache hit rate) manager: health-failure-threshold: 3 diff --git a/fastdeploy/golang_router/internal/common/interface.go b/fastdeploy/golang_router/internal/common/interface.go index 4d3d914462..f0ede87d65 100644 --- a/fastdeploy/golang_router/internal/common/interface.go +++ b/fastdeploy/golang_router/internal/common/interface.go @@ -5,4 +5,5 @@ import "context" type ManagerAPI interface { GetHealthyURLs(ctx context.Context) []string GetMetrics(ctx context.Context, url string) (int, int, int) + GetRemoteMetrics(ctx context.Context, url string) (int, int, int) } diff --git a/fastdeploy/golang_router/internal/config/config.go b/fastdeploy/golang_router/internal/config/config.go index 9ff4d896ef..9f1c1c0e34 100644 --- a/fastdeploy/golang_router/internal/config/config.go +++ b/fastdeploy/golang_router/internal/config/config.go @@ -44,6 +44,7 @@ type SchedulerConfig struct { HitRatioWeight float64 `yaml:"hit-ratio-weight"` LoadBalanceWeight float64 `yaml:"load-balance-weight"` WaitingWeight float64 `yaml:"waiting-weight"` + StatsIntervalSecs float64 `yaml:"stats-interval-secs"` } type LogConfig struct { @@ -127,5 +128,8 @@ func Load(configPath, listenPort string, isSplitwise bool) (*Config, error) { if cfg.Scheduler.DecodePolicy == "" { cfg.Scheduler.DecodePolicy = "request_num" } + if cfg.Scheduler.StatsIntervalSecs == 0 { + cfg.Scheduler.StatsIntervalSecs = 5 + } return &cfg, nil } diff --git a/fastdeploy/golang_router/internal/gateway/completions.go b/fastdeploy/golang_router/internal/gateway/completions.go index f99c8607b0..3b4abd5494 100644 --- a/fastdeploy/golang_router/internal/gateway/completions.go +++ b/fastdeploy/golang_router/internal/gateway/completions.go @@ -213,7 +213,17 @@ func PostToPD(c *gin.Context, decodeURL, prefillURL string, reqBody []byte, isSt } func readPrefillRecv(ctx context.Context, url string, isStream bool, message string, backendResp *http.Response) { + released := false + defer func() { + if !released { + scheduler_handler.Release(ctx, url) + scheduler_handler.ReleasePrefillTokens(ctx, url, message) + logger.Info(ctx, "[prefill] release in defer (fallback) url=%s, isStream=%v", url, isStream) + } + }() + if backendResp == nil || backendResp.Body == nil { + logger.Info(ctx, "[prefill] backendResp is nil or backendResp.Body is nil, url=%s", url) return } defer backendResp.Body.Close() @@ -226,16 +236,6 @@ func readPrefillRecv(ctx context.Context, url string, isStream bool, message str scanner := bufio.NewScanner(backendResp.Body) scanner.Buffer(buffer.B, maxCapacity) - released := false - defer func() { - // Fallback to ensure release - if !released { - scheduler_handler.Release(ctx, url) - scheduler_handler.ReleasePrefillTokens(ctx, url, message) - logger.Debug(ctx, "[prefill] release in defer (fallback) url=%s", url) - } - }() - for scanner.Scan() { _ = scanner.Text() @@ -244,19 +244,22 @@ func readPrefillRecv(ctx context.Context, url string, isStream bool, message str scheduler_handler.Release(ctx, url) scheduler_handler.ReleasePrefillTokens(ctx, url, message) released = true - logger.Info(ctx, "[prefill] first chunk received, release counter url=%s", url) } } if err := scanner.Err(); err != nil { - logger.Debug(ctx, "[prefill] scanner error: %v", err) + logger.Error(ctx, "[prefill] scanner error: %v, message=%s", err, message) } } else { _, err := io.Copy(io.Discard, backendResp.Body) if err != nil { - logger.Debug(ctx, "[prefill] copy error: %v", err) + logger.Error(ctx, "[prefill] copy error: %v, message=%s", err, message) } + scheduler_handler.Release(ctx, url) + scheduler_handler.ReleasePrefillTokens(ctx, url, message) + released = true + logger.Info(ctx, "[prefill] non-stream prefill response done, release counter url=%s", url) } } @@ -268,6 +271,21 @@ func getRequestID(ctx context.Context, rawReq map[string]any) string { return rawReq["request_id"].(string) } +// getSessionID extracts session_id from top-level or extra_body, top-level takes priority +func getSessionID(rawReq map[string]any) string { + // Priority 1: top-level session_id (same level as messages) + if sid, ok := rawReq["session_id"].(string); ok && sid != "" { + return sid + } + // Priority 2: extra_body.session_id + if extraBody, ok := rawReq["extra_body"].(map[string]any); ok { + if sid, ok := extraBody["session_id"].(string); ok && sid != "" { + return sid + } + } + return "" +} + // ChatCompletions implements request forwarding to actual large model inference service func ChatCompletions(c *gin.Context) { completionEndpoint := "chat/completions" @@ -299,17 +317,22 @@ func CommonCompletions(c *gin.Context, extractor PromptExtractor, completionEndp isSplitwise := manager.GetSplitwise(ctx) var ( - destURL string - releaseTargets []string - requestBodyData []byte - prefillURL string - decodeURL string - message string + destURL string + releaseTargets []string + requestBodyData []byte + prefillURL string + decodeURL string + message string + prefillHandedOff bool // true once readPrefillRecv goroutine takes ownership of prefill counters ) if isSplitwise { requestID := getRequestID(ctx, rawReq) ctx = context.WithValue(ctx, logger.RequestIDKey, requestID) + sessionID := getSessionID(rawReq) + if sessionID != "" { + ctx = context.WithValue(ctx, logger.SessionIDKey, sessionID) + } c.Request = c.Request.WithContext(ctx) // PD mode: select instances for Prefill/Decode separately @@ -328,6 +351,22 @@ func CommonCompletions(c *gin.Context, extractor PromptExtractor, completionEndp return } + // Both prefill and decode counters are now incremented. + // Register defer to guarantee release on ALL subsequent paths. + releaseTargets = []string{decodeURL} + defer func() { + // Always release decode request counter + for _, url := range releaseTargets { + scheduler_handler.Release(ctx, url) + } + // Release prefill counters only if readPrefillRecv was NOT launched + if !prefillHandedOff { + scheduler_handler.Release(ctx, prefillURL) + scheduler_handler.ReleasePrefillTokens(ctx, prefillURL, message) + logger.Info(ctx, "[prefill] release in CommonCompletions defer (error path) url=%s", prefillURL) + } + }() + // Construct disaggregate_info to ensure selected P/D work in pairs within FastDeploy disagg, err := manager.BuildDisaggregateInfo(ctx, prefillURL, decodeURL) if err != nil { @@ -347,7 +386,6 @@ func CommonCompletions(c *gin.Context, extractor PromptExtractor, completionEndp } destURL = decodeURL - releaseTargets = []string{decodeURL} // Expose scheduling results to caller for debugging/validating scheduling strategy c.Writer.Header().Set("X-Router-Prefill-URL", prefillURL) @@ -364,14 +402,14 @@ func CommonCompletions(c *gin.Context, extractor PromptExtractor, completionEndp destURL = dest releaseTargets = []string{destURL} requestBodyData = bodyBytes - } - // Maintain request_num count for related instances (Inc done in SelectWorker, Release here) - defer func() { - for _, url := range releaseTargets { - scheduler_handler.Release(ctx, url) - } - }() + // Maintain request_num count for mixed instances + defer func() { + for _, url := range releaseTargets { + scheduler_handler.Release(ctx, url) + } + }() + } isStream := false if v, ok := rawReq["stream"]; ok { @@ -395,6 +433,11 @@ func CommonCompletions(c *gin.Context, extractor PromptExtractor, completionEndp logger.Info(ctx, "Request completed with an error.") return } + + // PostToPD succeeded: readPrefillRecv goroutine now owns prefill counter release + if isSplitwise { + prefillHandedOff = true + } defer backendResp.Body.Close() if isSplitwise { diff --git a/fastdeploy/golang_router/internal/gateway/completions_test.go b/fastdeploy/golang_router/internal/gateway/completions_test.go index fdf931e403..b188532aba 100644 --- a/fastdeploy/golang_router/internal/gateway/completions_test.go +++ b/fastdeploy/golang_router/internal/gateway/completions_test.go @@ -365,6 +365,146 @@ func TestReadPrefillRecv(t *testing.T) { }) } +func TestGetSessionID(t *testing.T) { + tests := []struct { + name string + input map[string]any + expected string + }{ + { + name: "top-level session_id", + input: map[string]any{ + "session_id": "top-level-sid", + "messages": []any{}, + }, + expected: "top-level-sid", + }, + { + name: "extra_body session_id", + input: map[string]any{ + "messages": []any{}, + "extra_body": map[string]any{ + "session_id": "extra-body-sid", + }, + }, + expected: "extra-body-sid", + }, + { + name: "top-level takes priority over extra_body", + input: map[string]any{ + "session_id": "top-level-sid", + "extra_body": map[string]any{ + "session_id": "extra-body-sid", + }, + }, + expected: "top-level-sid", + }, + { + name: "no session_id provided", + input: map[string]any{ + "messages": []any{}, + }, + expected: "", + }, + { + name: "empty request", + input: map[string]any{}, + expected: "", + }, + { + name: "top-level session_id is empty string, fallback to extra_body", + input: map[string]any{ + "session_id": "", + "extra_body": map[string]any{ + "session_id": "extra-body-sid", + }, + }, + expected: "extra-body-sid", + }, + { + name: "both session_id are empty strings", + input: map[string]any{ + "session_id": "", + "extra_body": map[string]any{ + "session_id": "", + }, + }, + expected: "", + }, + { + name: "extra_body exists but no session_id in it", + input: map[string]any{ + "extra_body": map[string]any{ + "other_field": "value", + }, + }, + expected: "", + }, + { + name: "extra_body is not a map", + input: map[string]any{ + "extra_body": "not-a-map", + }, + expected: "", + }, + { + name: "session_id is not a string (integer)", + input: map[string]any{ + "session_id": 12345, + }, + expected: "", + }, + { + name: "session_id is not a string (bool)", + input: map[string]any{ + "session_id": true, + }, + expected: "", + }, + { + name: "extra_body session_id is not a string", + input: map[string]any{ + "extra_body": map[string]any{ + "session_id": 12345, + }, + }, + expected: "", + }, + { + name: "session_id from JSON unmarshal (top-level)", + input: func() map[string]any { + var m map[string]any + json.Unmarshal([]byte(`{"session_id": "json-sid", "messages": []}`), &m) + return m + }(), + expected: "json-sid", + }, + { + name: "session_id from JSON unmarshal (extra_body)", + input: func() map[string]any { + var m map[string]any + json.Unmarshal([]byte(`{"extra_body": {"session_id": "json-extra-sid"}, "messages": []}`), &m) + return m + }(), + expected: "json-extra-sid", + }, + { + name: "session_id is nil", + input: map[string]any{ + "session_id": nil, + }, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := getSessionID(tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + func TestCommonCompletions(t *testing.T) { // Setup a basic test server for backend responses backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/fastdeploy/golang_router/internal/manager/handler.go b/fastdeploy/golang_router/internal/manager/handler.go index e57009c20a..9b74541758 100644 --- a/fastdeploy/golang_router/internal/manager/handler.go +++ b/fastdeploy/golang_router/internal/manager/handler.go @@ -8,6 +8,7 @@ import ( "github.com/PaddlePaddle/FastDeploy/router/internal/config" scheduler_handler "github.com/PaddlePaddle/FastDeploy/router/internal/scheduler/handler" + "github.com/PaddlePaddle/FastDeploy/router/pkg/logger" ) type Manager struct { @@ -134,13 +135,21 @@ func SelectWorkerPair(ctx context.Context, message string) (string, string, erro if len(prefillWorkers) == 0 || len(decodeWorkers) == 0 { return "", "", nil } + logger.Info(ctx,"before SelectWorker prefill. ts_ms=%s", time.Now().Format("2006-01-02 15:04:05.000")) selectedPrefillWorkerURL, err := scheduler_handler.SelectWorker(ctx, prefillWorkers, message, "prefill") if err != nil { return "", "", err } + logger.Info(ctx,"before SelectWorker decode, after prefill. ts_ms=%s", time.Now().Format("2006-01-02 15:04:05.000")) selectedDecodeWorkerURL, err := scheduler_handler.SelectWorker(ctx, decodeWorkers, message, "decode") if err != nil { + // Prefill counter was already incremented but decode failed; + // release prefill counters here since CommonCompletions defer is not yet registered. + scheduler_handler.Release(ctx, selectedPrefillWorkerURL) + scheduler_handler.ReleasePrefillTokens(ctx, selectedPrefillWorkerURL, message) + logger.Info(ctx, "[SelectWorkerPair] decode selection failed, releasing prefill counter url=%s", selectedPrefillWorkerURL) return "", "", err } + logger.Info(ctx,"after SelectWorker decode, before return. ts_ms=%s", time.Now().Format("2006-01-02 15:04:05.000")) return selectedPrefillWorkerURL, selectedDecodeWorkerURL, nil } diff --git a/fastdeploy/golang_router/internal/manager/metrics.go b/fastdeploy/golang_router/internal/manager/metrics.go index 1059d52a63..7316b3738b 100644 --- a/fastdeploy/golang_router/internal/manager/metrics.go +++ b/fastdeploy/golang_router/internal/manager/metrics.go @@ -98,8 +98,14 @@ func GetMetricsByURL(ctx context.Context, rawURL string) (int, int, int, error) return int(runningCnt), int(waitingCnt), int(availableGpuBlockNum), nil } -// GetMetrics retrieves running metrics of the worker for the specified URL +// GetMetrics retrieves running metrics of the worker for the specified URL (in-memory counting) func (m *Manager) GetMetrics(ctx context.Context, rawURL string) (int, int, int) { + runningCnt := redrictCounter(ctx, rawURL) + return runningCnt, 0, 0 +} + +// GetRemoteMetrics retrieves real metrics from the worker's /metrics endpoint, falls back to in-memory counting on error +func (m *Manager) GetRemoteMetrics(ctx context.Context, rawURL string) (int, int, int) { runningCnt, waitingCnt, availableGpuBlockNum, err := GetMetricsByURL(ctx, rawURL) if err != nil { runningNewCnt := redrictCounter(ctx, rawURL) diff --git a/fastdeploy/golang_router/internal/manager/metrics_test.go b/fastdeploy/golang_router/internal/manager/metrics_test.go index cfec5a430b..f5cbb7e313 100644 --- a/fastdeploy/golang_router/internal/manager/metrics_test.go +++ b/fastdeploy/golang_router/internal/manager/metrics_test.go @@ -298,12 +298,19 @@ available_gpu_block_num 5`)) t.Logf("GetMetricsByURL succeeded: running=%d, waiting=%d, gpu=%d", running, waiting, gpu) } - // Now test Manager.GetMetrics - running, waiting, gpu = m.GetMetrics(context.Background(), workerURL) - t.Logf("Manager.GetMetrics result: running=%d, waiting=%d, gpu=%d", running, waiting, gpu) + // Test Manager.GetRemoteMetrics (uses real remote metrics) + running, waiting, gpu = m.GetRemoteMetrics(context.Background(), workerURL) + t.Logf("Manager.GetRemoteMetrics result: running=%d, waiting=%d, gpu=%d", running, waiting, gpu) assert.Equal(t, 2, running) assert.Equal(t, 1, waiting) assert.Equal(t, 5, gpu) + + // Test Manager.GetMetrics (uses in-memory counting) + runningMem, waitingMem, gpuMem := m.GetMetrics(context.Background(), workerURL) + t.Logf("Manager.GetMetrics result: running=%d, waiting=%d, gpu=%d", runningMem, waitingMem, gpuMem) + assert.Equal(t, 0, runningMem) // in-memory counter, should be 0 + assert.Equal(t, 0, waitingMem) + assert.Equal(t, 0, gpuMem) }) // Test error case (should fall back to counter) diff --git a/fastdeploy/golang_router/internal/scheduler/handler/fd_remote_metrics_score.go b/fastdeploy/golang_router/internal/scheduler/handler/fd_remote_metrics_score.go new file mode 100644 index 0000000000..50239d32c0 --- /dev/null +++ b/fastdeploy/golang_router/internal/scheduler/handler/fd_remote_metrics_score.go @@ -0,0 +1,27 @@ +package handler + +import ( + "context" + "math" +) + +func FDRemoteMetricsScoreSelectWorker(ctx context.Context, workers []string, message string) (string, error) { + if len(workers) == 0 { + return "", nil + } + + var ( + selectedURL string = "" + minScore float64 = math.MaxFloat64 + ) + + for _, w := range workers { + runningCnt, waitingCnt, _ := DefaultScheduler.managerAPI.GetRemoteMetrics(ctx, w) + score := computeScore(ctx, runningCnt, waitingCnt) + if score < minScore { + minScore = score + selectedURL = w + } + } + return selectedURL, nil +} diff --git a/fastdeploy/golang_router/internal/scheduler/handler/handler.go b/fastdeploy/golang_router/internal/scheduler/handler/handler.go index 1061fa3ca7..76009285ec 100644 --- a/fastdeploy/golang_router/internal/scheduler/handler/handler.go +++ b/fastdeploy/golang_router/internal/scheduler/handler/handler.go @@ -98,8 +98,12 @@ func SelectWorker(ctx context.Context, workers []string, message string, workerT strategyFunc = RequestNumSelectWorker case "fd_metrics_score": strategyFunc = FDMetricsScoreSelectWorker + case "fd_remote_metrics_score": + strategyFunc = FDRemoteMetricsScoreSelectWorker case "cache_aware": strategyFunc = CacheAwarePrefillSelectWorker + case "remote_cache_aware": + strategyFunc = RemoteCacheAwarePrefillSelectWorker default: strategyFunc = RandomSelectWorker } @@ -137,6 +141,9 @@ func SelectWorker(ctx context.Context, workers []string, message string, workerT // Release decreases the counter for the specified worker URL func Release(ctx context.Context, url string) { + if DefaultScheduler == nil { + return + } counter := GetOrCreateCounter(ctx, url) counter.Dec() logger.Info(ctx, "release worker: %s, count: %d", url, counter.Get()) @@ -277,10 +284,50 @@ func estimateTokens(message string) uint64 { // ReleasePrefillTokens releases the corresponding token load when request ends func ReleasePrefillTokens(ctx context.Context, url, message string) { - if url == "" || message == "" { + if DefaultScheduler == nil || url == "" || message == "" { return } tokenCounter := GetOrCreateTokenCounter(ctx, url) tokenCounter.Sub(estimateTokens(message)) logger.Info(ctx, "release prefill tokens: %s, tokens: %d", url, tokenCounter.Get()) } + +// StartStatsReporter periodically logs all worker loads and cache hit rate +func StartStatsReporter(ctx context.Context, interval float64) { + ticker := time.NewTicker(time.Duration(interval * float64(time.Second))) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + reportStats(ctx) + } + } +} + +func reportStats(ctx context.Context) { + if DefaultScheduler == nil || DefaultScheduler.managerAPI == nil { + return + } + + healthyURLs := DefaultScheduler.managerAPI.GetHealthyURLs(ctx) + + totalRunning := 0 + var workerLoads []string + for _, url := range healthyURLs { + running, _, _ := DefaultScheduler.managerAPI.GetMetrics(ctx, url) + totalRunning += running + workerLoads = append(workerLoads, fmt.Sprintf("%s: running=%d", url, running)) + } + + // Cache hit stats (periodic reset) + hits, total := GetAndResetCacheHitStats() + hitRate := 0.0 + if total > 0 { + hitRate = float64(hits) * 100 / float64(total) + } + + logger.Info(ctx, "[stats] total_running=%d, workers: [%s], cache_hit_rate=%.2f%% (hits=%d/total=%d)", + totalRunning, strings.Join(workerLoads, ", "), hitRate, hits, total) +} diff --git a/fastdeploy/golang_router/internal/scheduler/handler/handler_test.go b/fastdeploy/golang_router/internal/scheduler/handler/handler_test.go index cc67fdea44..7b120cf49a 100644 --- a/fastdeploy/golang_router/internal/scheduler/handler/handler_test.go +++ b/fastdeploy/golang_router/internal/scheduler/handler/handler_test.go @@ -19,6 +19,10 @@ func (m *mockManagerAPI) GetMetrics(ctx context.Context, url string) (int, int, return 0, 0, 0 // 返回默认值用于测试 } +func (m *mockManagerAPI) GetRemoteMetrics(ctx context.Context, url string) (int, int, int) { + return 0, 0, 0 // 返回默认值用于测试 +} + func TestSchedulerInit(t *testing.T) { cfg := &config.Config{ Scheduler: config.SchedulerConfig{ diff --git a/fastdeploy/golang_router/internal/scheduler/handler/prefill_cache_aware.go b/fastdeploy/golang_router/internal/scheduler/handler/prefill_cache_aware.go index 44fb3f3064..6f5cb80eec 100644 --- a/fastdeploy/golang_router/internal/scheduler/handler/prefill_cache_aware.go +++ b/fastdeploy/golang_router/internal/scheduler/handler/prefill_cache_aware.go @@ -8,9 +8,11 @@ import ( "math" "math/rand" "sync" + "sync/atomic" "time" "github.com/PaddlePaddle/FastDeploy/router/pkg/logger" + "github.com/PaddlePaddle/FastDeploy/router/pkg/metrics" ) type prefillCacheStrategy struct { @@ -20,6 +22,12 @@ type prefillCacheStrategy struct { loadBalanceWeight float64 cache *radixPrefixCache tokenizer TokenizerClient + + // session-based cache hit tracking + sessionWorkerMap map[string]string // session_id -> last selected prefill worker URL + sessionMu sync.RWMutex + cacheHitCount atomic.Int64 // periodic counter (reset each stats interval) + cacheTotalCount atomic.Int64 // periodic counter (reset each stats interval) } type schedulerConfigSnapshot struct { @@ -41,23 +49,40 @@ func newPrefillCacheStrategy(cfg *schedulerConfigSnapshot) *prefillCacheStrategy loadBalanceWeight: cfg.loadBalanceWeight, cache: newRadixPrefixCache(cfg.cacheBlockSize), tokenizer: NewHTTPTokenizer(cfg.tokenizerURL, cfg.tokenizerTimeout), + sessionWorkerMap: make(map[string]string), } } // CacheAwarePrefillSelectWorker fallbacks to min tokens on extreme imbalance; otherwise scores by hit rate and load func CacheAwarePrefillSelectWorker(ctx context.Context, workers []string, message string) (string, error) { + return cacheAwareSelectWorkerImpl(ctx, workers, message, false) +} + +// RemoteCacheAwarePrefillSelectWorker uses remote metrics for load balancing decisions +func RemoteCacheAwarePrefillSelectWorker(ctx context.Context, workers []string, message string) (string, error) { + return cacheAwareSelectWorkerImpl(ctx, workers, message, true) +} + +func cacheAwareSelectWorkerImpl(ctx context.Context, workers []string, message string, useRemoteMetrics bool) (string, error) { if len(workers) == 0 { return "", nil } if DefaultScheduler == nil || DefaultScheduler.prefillCache == nil { + logger.Info(ctx, "cache-aware prefill: final strategy: process_tokens, reason: strategy not initialized") return ProcessTokensSelectWorker(ctx, workers, message) } strategy := DefaultScheduler.prefillCache // 1) Fetch node load; fallback to min tokens on extreme imbalance - loads := strategy.getRunningRequests(ctx, workers) + var loads map[string]uint64 + if useRemoteMetrics { + loads = strategy.getRemoteRunningRequests(ctx, workers) + } else { + loads = strategy.getRunningRequests(ctx, workers) + } if strategy.isLoadImbalanced(loads) { + logger.Info(ctx, "cache-aware prefill: final strategy: process_tokens, reason: load imbalanced, loads=%v. ts_ms=%s", loads, time.Now().Format("2006-01-02 15:04:05.000")) return ProcessTokensSelectWorker(ctx, workers, message) } @@ -65,7 +90,7 @@ func CacheAwarePrefillSelectWorker(ctx context.Context, workers []string, messag tokens, err := strategy.tokenize(ctx, message) if err != nil || len(tokens) == 0 { if err != nil { - logger.Warn(ctx, "cache-aware prefill: tokenizer failed, fallback to process_tokens: %v", err) + logger.Info(ctx, "cache-aware prefill: final strategy: process_tokens, reason: tokenize failed: %v. ts_ms=%s", err, time.Now().Format("2006-01-02 15:04:05.000")) } return ProcessTokensSelectWorker(ctx, workers, message) } @@ -79,7 +104,12 @@ func CacheAwarePrefillSelectWorker(ctx context.Context, workers []string, messag // 5) Record prefix strategy.cache.Record(tokens, selected) - logger.Debug(ctx, "cache-aware prefill: selected=%s", selected) + + // 6) Track session-based cache hit rate + strategy.trackSessionCacheHit(ctx, selected) + + logger.Info(ctx, "cache-aware prefill: final strategy: cache_aware_scoring, selected=%s, loads=%v, hitRatios=%v. ts_ms=%s", + selected, loads, hitRatios, time.Now().Format("2006-01-02 15:04:05.000")) return selected, nil } @@ -174,7 +204,7 @@ func (p *prefillCacheStrategy) chooseByScore(ctx context.Context, workers []stri return selected } -// getRunningRequests retrieves running request metrics +// getRunningRequests retrieves running request metrics (in-memory counting) func (p *prefillCacheStrategy) getRunningRequests(ctx context.Context, workers []string) map[string]uint64 { result := make(map[string]uint64, len(workers)) if DefaultScheduler == nil || DefaultScheduler.managerAPI == nil { @@ -188,6 +218,64 @@ func (p *prefillCacheStrategy) getRunningRequests(ctx context.Context, workers [ return result } +// getRemoteRunningRequests retrieves running request metrics from remote /metrics endpoint +func (p *prefillCacheStrategy) getRemoteRunningRequests(ctx context.Context, workers []string) map[string]uint64 { + result := make(map[string]uint64, len(workers)) + if DefaultScheduler == nil || DefaultScheduler.managerAPI == nil { + return result + } + + for _, w := range workers { + running, _, _ := DefaultScheduler.managerAPI.GetRemoteMetrics(ctx, w) + result[w] = uint64(running) + } + return result +} + +// trackSessionCacheHit checks if the same session_id was routed to the same prefill worker +func (p *prefillCacheStrategy) trackSessionCacheHit(ctx context.Context, selectedWorker string) { + sessionID, _ := ctx.Value(logger.SessionIDKey).(string) + if sessionID == "" { + return + } + + prevWorker, exists := p.getSessionWorker(sessionID) + + p.cacheTotalCount.Add(1) + metrics.RouterCacheRequestTotal.Inc() + + if exists && prevWorker == selectedWorker { + p.cacheHitCount.Add(1) + metrics.RouterCacheHitTotal.Inc() + } + + p.setSessionWorker(sessionID, selectedWorker) +} + +func (p *prefillCacheStrategy) getSessionWorker(sessionID string) (string, bool) { + p.sessionMu.RLock() + defer p.sessionMu.RUnlock() + prevWorker, exists := p.sessionWorkerMap[sessionID] + return prevWorker, exists +} + +func (p *prefillCacheStrategy) setSessionWorker(sessionID, worker string) { + p.sessionMu.Lock() + defer p.sessionMu.Unlock() + p.sessionWorkerMap[sessionID] = worker +} + +// GetAndResetCacheHitStats returns periodic cache hit stats and resets counters +func GetAndResetCacheHitStats() (hits int64, total int64) { + if DefaultScheduler == nil || DefaultScheduler.prefillCache == nil { + return 0, 0 + } + strategy := DefaultScheduler.prefillCache + hits = strategy.cacheHitCount.Swap(0) + total = strategy.cacheTotalCount.Swap(0) + return hits, total +} + // Track prefix hits using a radix tree keyed by block hash type radixPrefixCache struct { mu sync.RWMutex @@ -533,6 +621,7 @@ func charsToTokens(message string) []int { for _, r := range message { tokens = append(tokens, int(r)) } + // rune return tokens } diff --git a/fastdeploy/golang_router/pkg/logger/logger.go b/fastdeploy/golang_router/pkg/logger/logger.go index 3df0ec229a..c9f5e6389b 100644 --- a/fastdeploy/golang_router/pkg/logger/logger.go +++ b/fastdeploy/golang_router/pkg/logger/logger.go @@ -19,6 +19,7 @@ var ( type contextKey string const RequestIDKey contextKey = "request_id" +const SessionIDKey contextKey = "session_id" // Init initialize logger func Init(logLevel, output string) { @@ -61,10 +62,14 @@ func contextPrefix(ctx context.Context) string { if ctx == nil { return "" } - if rid, ok := ctx.Value(RequestIDKey).(string); ok && rid != "" { - return "[request_id:" + rid + "] " + var prefix string + if sid, ok := ctx.Value(SessionIDKey).(string); ok && sid != "" { + prefix += "[session_id:" + sid + "] " } - return "" + if rid, ok := ctx.Value(RequestIDKey).(string); ok && rid != "" { + prefix += "[request_id:" + rid + "] " + } + return prefix } // Info logs informational messages diff --git a/fastdeploy/golang_router/pkg/metrics/metrics.go b/fastdeploy/golang_router/pkg/metrics/metrics.go index 4883bcdc96..36094fbb47 100644 --- a/fastdeploy/golang_router/pkg/metrics/metrics.go +++ b/fastdeploy/golang_router/pkg/metrics/metrics.go @@ -8,6 +8,8 @@ func init() { prometheus.MustRegister(TotalRequests) prometheus.MustRegister(InferenceRequests) prometheus.MustRegister(RequestDuration) + prometheus.MustRegister(RouterCacheHitTotal) + prometheus.MustRegister(RouterCacheRequestTotal) } // TotalRequests tracks the total number of HTTP requests @@ -37,3 +39,19 @@ var RequestDuration = prometheus.NewSummaryVec( }, []string{"method", "endpoint"}, ) + +// RouterCacheHitTotal tracks the cumulative number of cache hits (session routed to same prefill worker) +var RouterCacheHitTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "router_cache_hit_total", + Help: "Cumulative number of cache hits (same session_id routed to same prefill worker)", + }, +) + +// RouterCacheRequestTotal tracks the cumulative number of cache-aware requests with session_id +var RouterCacheRequestTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "router_cache_request_total", + Help: "Cumulative number of cache-aware scheduling requests with session_id", + }, +)