[Feature] Tracing: Fine-Grained Tracing for Request Latency Part1 (#5458)

This commit is contained in:
xiaolei373
2025-12-16 16:36:09 +08:00
committed by GitHub
parent c9b47f90ce
commit a30b4da260
29 changed files with 5464 additions and 475 deletions
+173
View File
@@ -0,0 +1,173 @@
## Observability Example Configuration (`examples/observability`)
This directory provides a complete, Docker Composebased observability example environment, including:
* **Prometheus**: Metrics collection
* **Grafana**: Metrics visualization
* **OpenTelemetry Collector**: Distributed tracing data ingestion and processing
Developers can use this example to **launch a local monitoring and tracing system with a single command**.
---
### Prerequisites
Please make sure the following components are installed in advance:
* Docker
* Docker Compose (or a newer Docker CLI version that supports `docker compose`)
---
### Usage
#### Start All Services
Enter the directory:
```bash
cd examples/observability
```
Run the following command to start the complete monitoring and tracing stack:
```bash
docker compose -f docker-compose.yaml up -d
```
After startup, you can access:
* **Prometheus**: [http://localhost:9090](http://localhost:9090)
* **Grafana**: [http://localhost:3000](http://localhost:3000)
* **OTLP receiver**: Applications should send traces to the default ports of the OTel Collector (usually `4317` or `4318`)
* gRPC: `4317`
* HTTP: `4318`
* **Jaeger UI**: [http://localhost:16886](http://localhost:16886)
**Notes:**
* Update the Prometheus scrape targets to match your actual application endpoints.
* Map Grafanas service port to a port that is accessible on your machine.
* Map the Jaeger UI port to a port that is accessible on your machine.
* When starting the full stack, there is no need to start individual sub-services separately.
---
#### Start Metrics Services Only
Enter the directory:
```bash
cd examples/observability/metrics
```
Run the following command:
```bash
docker compose -f prometheus_compose.yaml up -d
```
After startup, you can access:
* **Grafana**: [http://localhost:3000](http://localhost:3000)
---
#### Start Tracing Services Only
Enter the directory:
```bash
cd examples/observability/tracing
```
Run the following command:
```bash
docker compose -f tracing_compose.yaml up -d
```
After startup, you can access:
* **OTLP receiver**: Applications should send traces to the default ports of the OTel Collector (usually `4317` or `4318`)
* gRPC: `4317`
* HTTP: `4318`
* **Jaeger UI**: [http://localhost:16886](http://localhost:16886)
---
### Directory Structure and File Descriptions
#### Core Startup File
| File Name | Purpose | Description |
| --------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `docker-compose.yaml` | Main entry | Defines and starts the full observability stack (Prometheus, Grafana, OTel Collector, and Jaeger). This is the single entry point to launch the entire environment. |
---
#### Metrics and Monitoring Configuration
| File / Directory | Purpose | Description |
| --------------------------------------------------- | ------------------------ | ------------------------------------------------------------------------------------------------------------------------- |
| `metrics` | Metrics root directory | Contains all Prometheus- and metrics-related configurations. |
| `prometheus.yaml` | Prometheus main config | Defines scrape targets, global scrape parameters, and optional recording rules. All monitored endpoints are defined here. |
| `prometheus_compose.yaml` | Prometheus Docker config | Defines the Prometheus container, volume mounts, and network settings. |
| `grafana/datasources/datasource.yaml` | Datasource configuration | Configures how Grafana connects to Prometheus. |
| `grafana/dashboards/config/dashboard.yaml` | Dashboard provisioning | Specifies the locations of dashboard JSON files to be loaded. |
| `grafana/dashboards/json/fastdeploy-dashboard.json` | Dashboard definition | Contains visualization layouts and queries for `fastdeploy` monitoring metrics. |
---
#### Distributed Tracing Configuration
| File / Directory | Purpose | Description |
| ------------------------------------------------------------------------------- | ---------------------- | ---------------------------------------------------------------------- |
| `tracing` | Tracing root directory | Contains all configurations related to distributed tracing. |
| `opentelemetry.yaml` | OTel Collector config | Defines the Collector data pipelines: |
| • **receivers**: receive OTLP data (traces, metrics, logs) | | |
| • **processors**: data processing and batching | | |
| • **exporters**: export data to tracing backends (such as Jaeger) or files | | |
| • **extensions**: health check, pprof, and zpages | | |
| • **pipelines**: define complete processing flows for traces, metrics, and logs | | |
| `tracing_compose.yaml` | Tracing Docker config | Defines the container configuration for the OTel Collector and Jaeger. |
---
### Customization
#### 4.1 Modify Metrics Scrape Targets
If your applications metrics endpoint, port, or path changes, edit:
```plain
metrics/prometheus.yaml
```
---
#### 4.2 Adjust Tracing Sampling Rate or Processing Logic
Edit:
```plain
tracing/opentelemetry.yaml
```
---
#### 4.3 Add Custom Grafana Dashboards
1. Add the new dashboard JSON file to:
```plain
grafana/dashboards/json/
```
2. Register the dashboard so Grafana can load it automatically by editing:
```plain
grafana/dashboards/config/dashboard.yaml
```
+202
View File
@@ -0,0 +1,202 @@
# FastDeploy Tracing with OpenTelemetry
**FastDeploy** exports request tracing data through the **OpenTelemetry Collector**.
Tracing can be enabled when starting the server using the `--trace-enable` flag, and the OpenTelemetry Collector endpoint can be configured via `--otlp-traces-endpoint`.
---
## Setup Guide
### 1. Install Dependencies
```bash
# Manual installation
pip install opentelemetry-sdk \
opentelemetry-api \
opentelemetry-exporter-otlp \
opentelemetry-exporter-otlp-proto-grpc
```
---
### 2. Start OpenTelemetry Collector and Jaeger
```bash
docker compose -f examples/observability/tracing/tracing_compose.yaml up -d
```
---
### 3. Start FastDeploy Server with Tracing Enabled
#### Configure FastDeploy Environment Variables
```shell
# Enable tracing
"TRACES_ENABLE": "true",
# Service name
"FD_SERVICE_NAME": "FastDeploy",
# Instance name
"FD_HOST_NAME": "trace_test",
# Exporter type
"TRACES_EXPORTER": "otlp",
# OTLP endpoint:
# gRPC: 4317
# HTTP: 4318
"EXPORTER_OTLP_ENDPOINT": "http://localhost:4317",
# Optional headers
"EXPORTER_OTLP_HEADERS": "Authentication=Txxxxx",
# Export protocol
"OTEL_EXPORTER_OTLP_TRACES_PROTOCOL": "grpc",
```
#### Start FastDeploy
Start the FastDeploy server with the above configuration and ensure that tracing is enabled.
---
### 4. Send Requests and View Traces
* Open the **Jaeger UI** in your browser (port `16686`) to visualize request traces.
* The OpenTelemetry Collector will also export the trace data to a local file:
```plain
/tmp/otel_trace.json
```
---
## Adding Tracing to Your Own Code
FastDeploy already inserts tracing points at most critical execution stages.
Developers can use the APIs provided in `trace.py` to add more fine-grained tracing.
---
### 4.1 Initialize Tracing
Each **process** involved in tracing must call:
```python
process_tracing_init()
```
Each **thread** that participates in a traced request must call:
```python
trace_set_thread_info("thread_label", tp_rank, dp_rank)
```
* `thread_label`: identifier used for visual distinction of threads.
* `tp_rank` / `dp_rank`: optional values to label tensor parallelism or data parallelism ranks.
---
### 4.2 Mark Request Start and Finish
```python
trace_req_start(rid, bootstrap_room, ts, role)
trace_req_finish(rid, ts, attrs)
```
* Creates both a **Bootstrap Room Span** and a **Root Span**.
* Supports inheritance from spans created by the **FastAPI Instrumentor** (context copying).
* `attrs` can be used to attach additional attributes to the request span.
---
### 4.3 Add Tracing for Slices
#### Standard Slice
```python
trace_slice_start("slice_name", rid)
trace_slice_end("slice_name", rid)
```
#### Mark Thread Completion
The last slice in a thread can mark the thread span as finished:
```python
trace_slice_end("slice_name", rid, thread_finish_flag=True)
```
---
### 4.4 Trace Context Propagation Across Threads
#### Sender Side (ZMQ)
```python
trace_context = trace_get_proc_propagate_context(rid)
req.trace_context = trace_context
```
#### Receiver Side (ZMQ)
```python
trace_set_proc_propagate_context(rid, req.trace_context)
```
---
### 4.5 Add Events and Attributes
#### Events (recorded on the current slice)
```python
trace_event("event_name", rid, ts, attrs)
```
#### Attributes (attached to the current slice)
```python
trace_slice_add_attr(rid, attrs)
```
---
## Extending the Tracing Framework
### 5.1 Trace Context Hierarchy
* Two levels of Trace Context:
* **`TraceReqContext`** request-level context
* **`TraceThreadContext`** thread-level context
* Three-level Span hierarchy:
* `req_root_span`
* `thread_span`
* `slice_span`
---
### 5.2 Available Span Name Enum (`TraceSpanName`)
```python
FASTDEPLOY
PREPROCESS
SCHEDULE
PREFILL
DECODE
POSTPROCESS
```
* These enums can be used when creating slices to ensure consistent naming.
---
### 5.3 Important Notes
1. Each **thread span must be closed** when the final slice of that thread finishes.
2. Spans created by **FastAPI Instrumentor** are automatically inherited by the internal tracing context.
+149
View File
@@ -0,0 +1,149 @@
## Observability 示例配置 (`examples/observability`)
该目录提供了一套完整的、基于 Docker Compose 的可观测性(Observability)示例,包括:
- Prometheus:指标收集
- Grafana:指标可视化
- OpenTelemetry Collector:分布式追踪数据接收与处理
开发者可以使用此示例环境 一键启动本地监控与追踪系统。
### 先决条件
需要确保提前安装以下组件:
- Docker
- Docker Compose(或新版 Docker CLI 支持 `docker compose`
### 使用方法
#### 整体启动
进入目录:
```shell
cd examples/observability
```
`examples/observability` 目录下执行以下命令即可启动完整的监控和追踪服务:
```bash
docker compose -f docker-compose.yaml up -d
```
启动完成后可访问:
- Prometheus 访问: http://localhost:9090
- Grafana 访问: http://localhost:3000
- OTLP 接收端: 应用程序应将 Traces 发送到 OTel Collector 的默认端口(通常是 `4317``4318`)。
- grpc: 4317端口
- http: 4318端口
- Jeager 访问:http://localhost:16886
【注意事项】:
- Prometheus的抓取地址换成自己的地址
- Grafana的展示端口映射成自己可以访问的端口
- Jaeger的展示端口映射成自己可以访问的端口
- 如果启动了整体服务就不需要再单独去启动子服务了
#### metrics启动
进入目录:
```shell
cd examples/observability/metrics
```
`examples/observability` 目录下执行以下命令即可启动完整的监控和追踪服务:
```bash
docker compose -f prometheus_compose.yaml up -d
```
启动完成后可访问:
- Grafana 访问: http://localhost:3000
#### trace启动
进入目录:
```shell
cd examples/observability/tracing
```
`examples/observability` 目录下执行以下命令即可启动完整的监控和追踪服务:
```bash
docker compose -f tracing_compose.yaml up -d
```
启动完成后可访问:
- OTLP 接收端:应用程序应将 Traces 发送到 OTel Collector 的默认端口(通常是 `4317``4318`)。
- grpc: 4317端口
- http: 4318端口
- Jeager 访问:http://localhost:16886
### 目录结构与文件说明
#### 核心启动文件
| 文件名 | 作用 | 详情 |
| ------------------- | ---------- | ------------------------------------------------------------ |
| docker-compose.yaml | 主启动文件 | 定义并启动完整的可观测性组件(Prometheus、Grafana、OTel Collector、Jaeger)。这是启动整个 Observability 环境的唯一入口。 |
#### 指标 (Metrics) 与监控配置
| 文件/目录 | 作用 | 详情 |
| ------------------------------------------------- | ---------------------- | ------------------------------------------------------------ |
| metrics | 指标配置根目录 | 包含所有与指标收集和 Prometheus 相关的配置。 |
| prometheus.yaml | Prometheus 主配置 | 定义抓取目标(scrape targets)、全局采集参数,并可选地配置记录规则(recording rules)。所有监控端点都在此定义。 |
| prometheus_compose.yaml | Prometheus Docker 配置 | 定义 Prometheus 容器、卷挂载和网络设置。 |
| grafana/datasources/datasource.yaml | 数据源配置 | 定义 Grafana 连接 Prometheus 的方式。 |
| grafana/dashboards/config/dashboard.yaml | 仪表板加载配置 | 指定仪表板 JSON 文件所在路径。 |
| grafana/dashboards/json/fastdeploy-dashboard.json | 仪表板 | 包含 `fastdeploy`监控指标的可视化布局与查询定义。 |
#### 分布式追踪 (Tracing) 配置
| 文件/目录 | 作用 | 详情 |
| -------------------- | ------------------- | ------------------------------------------------------------ |
| tracing | 追踪配置根目录 | 包含所有与分布式追踪相关的配置。 |
| opentelemetry.yaml | OTel Collector 配置 | 定义 Collector 的数据管道:<br />• receivers:接收 OTLP 数据(traces, metrics, logs<br />• processors:处理与批次化数据<br />• exporters:将数据导出到追踪后端(如 Jaeger)或文件<br />• extensions:健康检查、pprof 和 zpages<br />• pipelines:定义 traces、metrics 和 logs 的完整处理流程 |
| tracing_compose.yaml | Tracing Docker 配置 | 定义 OTel Collector 和 Jaeger 的容器配置。 |
### 4. 如何定制
#### 4.1 修改指标抓取目标
若应用程序端口、路径更改,请编辑:
```plain
metrics/prometheus.yaml
```
#### 4.2 调整追踪采样率或处理逻辑
编辑:
```plain
tracing/opentelemetry.yaml
```
#### 4.3 添加自定义 Grafana 仪表盘
1. 新增 JSON 仪表盘至:
```plain
grafana/dashboards/json/
```
1. 在下方文件中注册该仪表盘,使 Grafana 自动加载:
```plain
grafana/dashboards/config/dashboard.yaml
```
+150
View File
@@ -0,0 +1,150 @@
**FastDeploy** 基于**OpenTelemetry Collector** 导出请求追踪数据。
可通过在启动服务器时添加 `--trace-enable` 来开启追踪,并使用 `--otlp-traces-endpoint` 配置 OpenTelemetry Collector 接收端点。
## 配置指南(Setup Guide
### 1. 安装依赖和工具
```bash
# 手动安装
pip install opentelemetry-sdk opentelemetry-api opentelemetry-exporter-otlp opentelemetry-exporter-otlp-proto-grpc
```
### 2. 启动 OpenTelemetry Collector 和 Jaeger
```bash
docker compose -f examples/observability/tracing/tracing_compose.yaml up -d
```
### 3. 启动带追踪功能的 FastDeploy 服务器
- FastDeploy设置环境变量
```shell
# 开启Trace
"TRACES_ENABLE": "true",
# 服务名称
"FD_SERVICE_NAME": "FastDeploy",
# 实例名称
"FD_HOST_NAME": "trace_test",
"TRACES_EXPORTER": "otlp",
# grpc方式导出端口为4317 http方式导出端口为4318
"EXPORTER_OTLP_ENDPOINT": "http://localhost:4317",
"EXPORTER_OTLP_HEADERS": "Authentication=Txxxxx",
# 导出方式
"OTEL_EXPORTER_OTLP_TRACES_PROTOCOL": "grpc",
```
- 启动FastDeploy
### 4. 发送请求并观察追踪数据
- 在浏览器访问 Jaeger UI(端口 `16686`)可视化请求追踪。
- Collector 同时会将追踪数据导出为 `/tmp/otel_trace.json`
## 如何为自己的代码添加追踪
FastDeploy 已在主要节点插入了追踪点。开发者可使用 `trace.py` 提供的 API 进行更精细的追踪。
### 4.1 初始化追踪
每个涉及追踪的**进程**执行:
```python
process_tracing_init()
```
请求涉及到的每个**线程**执行:
```python
trace_set_thread_info("thread_label", tp_rank, dp_rank)
```
- `thread_label` 用于线程区分,可视化显示
- `tp_rank`/`dp_rank` 可选,标记张量并行或数据并行 rank
### 4.2 标记请求开始和结束
```python
trace_req_start(rid, bootstrap_room, ts, role)
trace_req_finish(rid, ts, attrs)
```
- 会创建 Bootstrap Room Span与 Root Span
- 支持 FastAPI Instrumentor 已创建 Span 的继承(context copy
- `attrs` 可添加额外属性
### 4.3 为 Slice 添加追踪
普通 Slice
```python
trace_slice_start("slice_name", rid)
trace_slice_end("slice_name", rid)
```
- 最后一个 Slice 可标记线程结束:
```python
trace_slice_end("slice_name", rid, thread_finish_flag=True)
```
### 4.4 请求跨线程 Trace Context 传播
发送端(ZMQ):
```python
trace_context = trace_get_proc_propagate_context(rid)
req.trace_context = trace_context
```
接收端(ZMQ):
```python
trace_set_proc_propagate_context(rid, req.trace_context)
```
### 4.5 添加事件和属性
事件(记录到当前 Slice):
```python
trace_event("event_name", rid, ts, attrs)
```
属性(添加到当前 Slice):
```python
trace_slice_add_attr(rid, attrs)
```
## 扩展追踪框架
### 5.1 Trace Context 层级
- 两级 Trace Context
- `TraceReqContext` → 请求级上下文
- `TraceThreadContext` → 线程级上下文
- 三级 Span 结构:
- `req_root_span`
- `thread_span`
- `slice_span`
### 5.2 可用的 Span 名枚举(`TraceSpanName`
```python
FASTDEPLOY
PREPROCESS
SCHEDULE
PREFILL
DECODE
POSTPROCESS
```
- 在创建 slice 时可使用枚举,保证命名规范化
### 5.3 注意事项
1. 每个线程 Span 必须在最后一个 Slice 结束时关闭。
2. FastAPI Instrumentor 已创建的 Span 会被继承到内部追踪上下文。
@@ -0,0 +1,52 @@
version: '1.0'
services:
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./metrics/prometheus.yaml:/etc/prometheus/prometheus.yml
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
volumes:
- ./metrics/grafana/datasources:/etc/grafana/provisioning/datasources
- ./metrics/grafana/dashboards/config:/etc/grafana/provisioning/dashboards
- ./metrics/grafana/dashboards/json:/var/lib/grafana/dashboards
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Viewer
- GF_AUTH_BASIC_ENABLED=false
- GF_USERS_ALLOW_SIGN_UP=false
- GF_DASHBOARDS_DEFAULT_HOME_DASHBOARD_PATH=/var/lib/grafana/dashboards/fastdeploy-dashboard.json
depends_on:
- prometheus
jaeger:
image: jaegertracing/all-in-one
container_name: jaeger
ports:
- "16686:16686"
environment:
- COLLECTOR_OTLP_ENABLED=true
restart: unless-stopped
otel-collector:
image: docker.io/otel/opentelemetry-collector
volumes:
- ./tracing/opentelemetry.yaml:/etc/otelcol/config.yaml
- /tmp:/tmp
ports:
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
depends_on:
- jaeger
- prometheus
restart: unless-stopped
@@ -0,0 +1,11 @@
apiVersion: 1
providers:
- name: 'FastDeploy'
orgId: 1
folder: 'FastDeploy Monitoring'
type: file
disableDeletion: false
updateIntervalSeconds: 10
allowUiUpdates: false
options:
path: /var/lib/grafana/dashboards
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,9 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
# url: http://localhost:9090
url: http://prometheus:9090
isDefault: true
editable: false
@@ -0,0 +1,10 @@
# prometheus.yaml
global:
scrape_interval: 5s
evaluation_interval: 30s
scrape_configs:
- job_name: 'fastdeploy'
static_configs:
# list all your targets here
- targets: ['127.0.0.1:30000']
@@ -0,0 +1,30 @@
version: '1'
services:
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
volumes:
- ./grafana/datasources:/etc/grafana/provisioning/datasources
- ./grafana/dashboards/config:/etc/grafana/provisioning/dashboards
- ./grafana/dashboards/json:/var/lib/grafana/dashboards
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Viewer
- GF_AUTH_BASIC_ENABLED=false
- GF_USERS_ALLOW_SIGN_UP=false
- GF_DASHBOARDS_DEFAULT_HOME_DASHBOARD_PATH=/var/lib/grafana/dashboards/fastdeploy-dashboard.json
depends_on:
- prometheus
@@ -0,0 +1,38 @@
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
exporters:
otlp:
endpoint: jaeger:4317
tls:
insecure: true
file:
path: /tmp/otel_trace.json
extensions:
health_check:
pprof:
zpages:
service:
extensions: [health_check, pprof, zpages]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlp, file]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
logs:
receivers: [otlp]
processors: [batch]
exporters: [otlp]
@@ -0,0 +1,21 @@
services:
otel-collector:
image: docker.io/otel/opentelemetry-collector
volumes:
- ./opentelemetry.yaml:/etc/otelcol/config.yaml
- /tmp:/tmp
ports:
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
depends_on:
- jaeger
restart: unless-stopped
jaeger:
image: jaegertracing/all-in-one
container_name: jaeger
ports:
- "16686:16686"
environment:
- COLLECTOR_OTLP_ENABLED=true
restart: unless-stopped
+29 -5
View File
@@ -35,9 +35,9 @@ import numpy as np
import paddle
import requests
import zmq
from opentelemetry import trace
from tqdm import tqdm
import fastdeploy.metrics.trace as tracing
from fastdeploy.engine.request import Request, RequestOutput, RequestType
from fastdeploy.engine.resource_manager import ResourceManager
from fastdeploy.engine.sched.resource_manager_v1 import ResourceManagerV1
@@ -51,7 +51,6 @@ from fastdeploy.inter_communicator import (
ZmqTcpServer,
)
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.metrics.trace_util import start_span, start_span_request
from fastdeploy.model_executor.guided_decoding import schema_checker
from fastdeploy.plugins.token_processor import load_token_processor_plugins
from fastdeploy.router.utils import check_service_health
@@ -417,13 +416,16 @@ class EngineService:
"""
if not isinstance(tasks, list):
tasks = [tasks]
for task in tasks:
start_span_request("DEQUEUE", task, trace.SpanKind.CONSUMER)
self.resource_manager.check_and_free_block_tables()
need_delete_tasks = []
for task in tasks:
rid = task.request_id.split("_")[0]
trace_carrier = task.trace_carrier
if trace_carrier:
tracing.trace_set_proc_propagate_context(rid, trace_carrier)
task.trace_carrier = tracing.trace_get_proc_propagate_context(rid)
if self.cfg.scheduler_config.splitwise_role == "prefill":
status, msg = self.split_connector.check_decode_allocated(task)
if status:
@@ -447,6 +449,7 @@ class EngineService:
for item in tasks:
trace_print(LoggingEventName.RESOURCE_ALLOCATE_START, item.request_id, getattr(item, "user", ""))
available_batch = np.sum(self.resource_manager.stop_flags)
if len(tasks) > available_batch:
self.llm_logger.error(f"Inserting batch:{len(tasks)} exceeds the available batch:{available_batch}.")
@@ -484,6 +487,13 @@ class EngineService:
self.llm_logger.info(f"Tasks are sent to engine, req_ids={req_ids}")
for task in tasks:
task.metrics.inference_start_time = time.time()
tracing.trace_report_span(
tracing.TraceSpanName.SCHEDULE,
task.request_id.split("_")[0],
int(task.metrics.scheduler_recv_req_time * 1e9),
int(task.metrics.inference_start_time * 1e9),
thread_finish_flag=True,
)
trace_print(LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", ""))
trace_print(LoggingEventName.REQUEST_SCHEDULE_END, task.request_id, getattr(task, "user", ""))
trace_print(LoggingEventName.INFERENCE_START, task.request_id, getattr(task, "user", ""))
@@ -694,6 +704,7 @@ class EngineService:
Insert task to engine thread, monitor scheduler request queue.
if the engine has resource, insert task to engine
"""
tracing.trace_set_thread_info("Scheduler Task to Work")
current_id = 0
while getattr(self, "running", True):
try:
@@ -764,6 +775,7 @@ class EngineService:
"""
Insert tasks to worker with scheduler v1 (ENABLE_V1_KVCACHE_SCHEDULER=1).
"""
tracing.trace_set_thread_info("Scheduler Task to Work")
get_request_pool = ThreadPoolExecutor(max_workers=1)
is_fetching = False
@@ -981,6 +993,18 @@ class EngineService:
self.resource_manager.get_real_bsz()
for task in tasks:
if task.task_type == RequestType.PREFILL:
rid = task.request_id.split("_")[0]
trace_carrier = task.trace_carrier
tracing.trace_set_proc_propagate_context(rid, trace_carrier)
trace_carrier = tracing.trace_get_proc_propagate_context(rid)
task.trace_carrier = trace_carrier
tracing.trace_report_span(
tracing.TraceSpanName.SCHEDULE,
rid,
int(task.metrics.scheduler_recv_req_time * 1e9),
int(time.time() * 1e9),
thread_finish_flag=True,
)
trace_print(
LoggingEventName.RESOURCE_ALLOCATE_END, task.request_id, getattr(task, "user", "")
)
@@ -1038,6 +1062,7 @@ class EngineService:
self.receive_output_thread.start()
def _insert_zmq_task_to_scheduler(self):
tracing.trace_set_thread_info("Insert Task to Scheduler")
added_requests: Dict[str, int] = dict()
if envs.FD_ENABLE_INTERNAL_ADAPTER:
if self.cfg.scheduler_config.splitwise_role == "decode":
@@ -1067,7 +1092,6 @@ class EngineService:
try:
request = Request.from_dict(data)
request.metrics.scheduler_recv_req_time = time.time()
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
main_process_metrics.requests_number.inc()
trace_print(LoggingEventName.PREPROCESSING_END, data["request_id"], data.get("user", ""))
trace_print(LoggingEventName.REQUEST_SCHEDULE_START, data["request_id"], data.get("user", ""))
+3
View File
@@ -34,6 +34,7 @@ import numpy as np
import paddle
from tqdm import tqdm
import fastdeploy.metrics.trace as tracing
from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.common_engine import EngineService
from fastdeploy.engine.expert_service import start_data_parallel_service
@@ -97,6 +98,8 @@ class LLMEngine:
main_process_metrics.set_cache_config_info(obj=self.cfg.cache_config)
tracing.trace_set_thread_info("engine")
def start(self, api_server_pid=None):
"""
Initializes the engine and starts its sub-services.
+6 -1
View File
@@ -624,6 +624,7 @@ class RequestOutput:
# for internal adapter
ic_req_data: Optional[dict] = None,
prompt_token_ids_len: Optional[int] = 0,
trace_carrier: dict = dict(),
) -> None:
self.request_id = request_id
self.prompt = prompt
@@ -640,6 +641,7 @@ class RequestOutput:
self.error_msg = error_msg
self.ic_req_data = ic_req_data
self.prompt_token_ids_len = prompt_token_ids_len
self.trace_carrier = trace_carrier
if prompt_token_ids is None:
self.prompt_token_ids = []
@@ -690,6 +692,7 @@ class RequestOutput:
f"metrics={self.metrics}, "
f"error_code={self.error_code}, "
f"error_msg={self.error_msg},"
f"trace_carrier={self.trace_carrier}"
)
@classmethod
@@ -705,7 +708,8 @@ class RequestOutput:
else:
d.pop("metrics", None)
metrics = None
return RequestOutput(**d, outputs=completion_output, metrics=metrics)
trace_carrier = d.pop("trace_carrier", {})
return RequestOutput(**d, outputs=completion_output, metrics=metrics, trace_carrier=trace_carrier)
def to_dict(self):
"""convert RequestOutput into a serializable dict"""
@@ -726,6 +730,7 @@ class RequestOutput:
"error_msg": self.error_msg,
"ic_req_data": self.ic_req_data,
"prompt_token_ids_len": self.prompt_token_ids_len,
"trace_carrier": self.trace_carrier,
}
+1 -1
View File
@@ -196,7 +196,7 @@ def main(args: argparse.Namespace) -> None:
# 检查参数
if not any([args.encode, args.decode, args.vocab_size, args.info, args.vocab_export]):
print("请至少指定一个参数:--encode, --decode, --vocab-size, --info, --export-vocab")
print("请至少指定一个参数:--encode, --decode, --vocab-size, --info, --vocab-export")
return
# 初始化tokenizer
+8
View File
@@ -25,6 +25,7 @@ from http import HTTPStatus
import numpy as np
from filelock import FileLock
import fastdeploy.metrics.trace as tracing
from fastdeploy import envs
from fastdeploy.config import FDConfig
from fastdeploy.entrypoints.openai.utils import DealerConnectionManager
@@ -271,6 +272,8 @@ class EngineClient:
"""
task["preprocess_start_time"] = time.time()
request_id = task.get("request_id").split("_")[0]
tracing.trace_slice_start(tracing.TraceSpanName.PREPROCESSING, request_id)
trace_print(LoggingEventName.PREPROCESSING_START, task["request_id"], task.get("user", ""))
try:
chat_template_kwargs = task.get("chat_template_kwargs") or {}
@@ -349,10 +352,15 @@ class EngineClient:
else:
request_id = parts[0]
index = int(parts[1])
trace_carrier = tracing.trace_get_proc_propagate_context(request_id)
task["trace_carrier"] = trace_carrier
for i in range(index * n, (index + 1) * n):
child_task = copy(task)
child_task["request_id"] = f"{request_id}_{i}"
self._send_task(child_task)
tracing.trace_slice_end(
tracing.TraceSpanName.PREPROCESSING, task.get("request_id").split("_")[0], thread_finish_flag=True
)
except Exception as e:
api_server_logger.error(f"zmq_client send task error: {e}, {str(traceback.format_exc())}")
raise EngineError(str(e), error_code=400)
+27 -13
View File
@@ -30,7 +30,10 @@ from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse, Response, StreamingResponse
from gunicorn.app.base import BaseApplication
from opentelemetry import trace
from opentelemetry.propagate import extract
import fastdeploy.metrics.trace as tracing
from fastdeploy import envs
from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.engine import LLMEngine
from fastdeploy.engine.expert_service import ExpertService
@@ -58,12 +61,6 @@ from fastdeploy.entrypoints.openai.tool_parsers import ToolParserManager
from fastdeploy.entrypoints.openai.utils import UVICORN_CONFIG, make_arg_parser
from fastdeploy.envs import environment_variables
from fastdeploy.metrics.metrics import get_filtered_metrics
from fastdeploy.metrics.trace_util import (
fd_start_span,
inject_to_metadata,
instrument,
lable_span,
)
from fastdeploy.utils import (
ExceptionHandler,
FlexibleArgumentParser,
@@ -74,6 +71,8 @@ from fastdeploy.utils import (
retrive_model_from_server,
)
tracing.process_tracing_init()
parser = make_arg_parser(FlexibleArgumentParser())
args = parser.parse_args()
@@ -246,7 +245,6 @@ async def lifespan(app: FastAPI):
app = FastAPI(lifespan=lifespan)
app.add_exception_handler(RequestValidationError, ExceptionHandler.handle_request_validation_exception)
app.add_exception_handler(Exception, ExceptionHandler.handle_exception)
instrument(app)
env_api_key_func = environment_variables.get("FD_API_KEY")
@@ -367,19 +365,23 @@ def wrap_streaming_generator(original_generator: AsyncGenerator):
@app.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest):
async def create_chat_completion(request: ChatCompletionRequest, req: Request):
"""
Create a chat completion for the provided prompt and parameters.
"""
api_server_logger.debug(f"Chat Received request: {request.model_dump_json()}")
if envs.TRACES_ENABLE:
if req.headers:
headers = dict(req.headers)
trace_context = extract(headers)
request.trace_context = trace_context
if app.state.dynamic_load_weight:
status, msg = app.state.engine_client.is_workers_alive()
if not status:
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
try:
async with connection_manager():
inject_to_metadata(request)
lable_span(request)
tracing.label_span(request)
generator = await app.state.chat_handler.create_chat_completion(request)
if isinstance(generator, ErrorResponse):
api_server_logger.debug(f"release: {connection_semaphore.status()}")
@@ -399,18 +401,23 @@ async def create_chat_completion(request: ChatCompletionRequest):
@app.post("/v1/completions")
async def create_completion(request: CompletionRequest):
async def create_completion(request: CompletionRequest, req: Request):
"""
Create a completion for the provided prompt and parameters.
"""
api_server_logger.info(f"Completion Received request: {request.model_dump_json()}")
if envs.TRACES_ENABLE:
if req.headers:
headers = dict(req.headers)
trace_context = extract(headers)
request.trace_context = trace_context
if app.state.dynamic_load_weight:
status, msg = app.state.engine_client.is_workers_alive()
if not status:
return JSONResponse(content={"error": "Worker Service Not Healthy"}, status_code=304)
try:
async with connection_manager():
lable_span(request)
tracing.label_span(request)
generator = await app.state.completion_handler.create_completion(request)
if isinstance(generator, ErrorResponse):
connection_semaphore.release()
@@ -471,6 +478,7 @@ async def create_embedding(request: EmbeddingRequest):
@app.get("/update_model_weight")
@tracing.trace_span("update_model_weight")
def update_model_weight(request: Request) -> Response:
"""
update model weight
@@ -485,6 +493,7 @@ def update_model_weight(request: Request) -> Response:
@app.get("/clear_load_weight")
@tracing.trace_span("clear_load_weight")
def clear_load_weight(request: Request) -> Response:
"""
clear model weight
@@ -499,6 +508,7 @@ def clear_load_weight(request: Request) -> Response:
@app.post("/rearrange_experts")
@tracing.trace_span("rearrange_experts")
async def rearrange_experts(request: Request):
"""
rearrange experts
@@ -509,6 +519,7 @@ async def rearrange_experts(request: Request):
@app.post("/get_per_expert_tokens_stats")
@tracing.trace_span("get_per_expert_tokens_stats")
async def get_per_expert_tokens_stats(request: Request):
"""
get per expert tokens stats
@@ -519,6 +530,7 @@ async def get_per_expert_tokens_stats(request: Request):
@app.post("/check_redundant")
@tracing.trace_span("check_redundant")
async def check_redundant(request: Request):
"""
check redundant
@@ -537,7 +549,7 @@ def launch_api_server() -> None:
api_server_logger.info(f"launch Fastdeploy api server... port: {args.port}")
api_server_logger.info(f"args: {args.__dict__}")
fd_start_span("FD_START")
# fd_start_span("FD_START")
options = {
"bind": f"{args.host}:{args.port}",
@@ -565,6 +577,7 @@ if _metrics_port is None or (_main_port is not None and _metrics_port == _main_p
@metrics_app.get("/metrics")
@tracing.trace_span("metrics")
async def metrics():
"""
metrics
@@ -574,6 +587,7 @@ async def metrics():
@metrics_app.get("/config-info")
@tracing.trace_span("config-info")
def config_info() -> Response:
"""
Get the current configuration of the API server.
@@ -505,6 +505,7 @@ class CompletionRequest(BaseModel):
mm_hashes: Optional[list] = None
# doc: end-completion-extra-params
trace_context: Optional[str] = None
collect_metrics: Optional[bool] = False
@@ -681,6 +682,7 @@ class ChatCompletionRequest(BaseModel):
mm_hashes: Optional[list] = None
completion_token_ids: Optional[List[int]] = None
# doc: end-chat-completion-extra-params
trace_context: Optional[str] = None
collect_metrics: Optional[bool] = False
@@ -24,6 +24,7 @@ from typing import List, Optional
import numpy as np
import fastdeploy.metrics.trace as tracing
from fastdeploy.entrypoints.openai.protocol import (
ChatCompletionRequest,
ChatCompletionResponse,
@@ -104,6 +105,7 @@ class OpenAIServingChat:
"""
Create a new chat completion using the specified parameters.
"""
tracing.trace_set_thread_info("API Server")
if not self._check_master():
err_msg = (
f"Only master node can accept completion request, please send request to master node: {self.master_ip}"
@@ -135,6 +137,8 @@ class OpenAIServingChat:
request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}"
else:
request_id = f"chatcmpl-{uuid.uuid4()}"
tracing.trace_req_start(rid=request_id, trace_content=request.trace_context, role="FastDeploy")
del request.trace_context
api_server_logger.info(f"create chat completion request: {request_id}")
prompt_tokens = None
max_tokens = None
@@ -421,6 +425,19 @@ class OpenAIServingChat:
speculate_metrics=output_speculate_metrics,
)
if res["finished"]:
trace_carrier = res.get("trace_carrier")
if trace_carrier:
tracing.trace_set_proc_propagate_context(request_id, trace_carrier)
start_time = res["metrics"]["engine_recv_latest_token_time"]
tracing.trace_report_span(
tracing.TraceSpanName.POSTPROCESSING,
request_id,
int(start_time * 1e9),
int(time.time() * 1e9),
thread_finish_flag=True,
)
if "trace_carrier" in res:
del res["trace_carrier"]
num_choices -= 1
main_process_metrics.e2e_request_latency.observe(
time.time() - res["metrics"]["request_start_time"]
@@ -494,6 +511,7 @@ class OpenAIServingChat:
)
yield f"data: {error_data}\n\n"
finally:
tracing.trace_req_finish(request_id)
await self.engine_client.connection_manager.cleanup_request(request_id)
self.engine_client.semaphore.release()
trace_print(LoggingEventName.POSTPROCESSING_END, request_id, getattr(request, "user", ""))
@@ -620,6 +638,19 @@ class OpenAIServingChat:
prompt_logprobs_res_list[idx].extend(clamp_prompt_logprobs(prompt_logprobs_res))
speculate_metrics[idx] = data["metrics"].get("speculate_metrics", None)
if data["finished"]:
trace_carrier = data.get("trace_carrier")
if trace_carrier:
tracing.trace_set_proc_propagate_context(request_id, trace_carrier)
start_time = data["metrics"]["engine_recv_latest_token_time"]
tracing.trace_report_span(
tracing.TraceSpanName.POSTPROCESSING,
request_id,
int(start_time * 1e9),
int(time.time() * 1e9),
thread_finish_flag=True,
)
if "trace_carrier" in data:
del data["trace_carrier"]
num_choices -= 1
reasoning_num_tokens[idx] = data["outputs"].get("reasoning_token_num", 0)
if data["outputs"].get("image_token_num"):
@@ -645,6 +676,7 @@ class OpenAIServingChat:
)
choices.append(choice)
finally:
tracing.trace_req_finish(request_id)
await self.engine_client.connection_manager.cleanup_request(request_id)
self.engine_client.semaphore.release()
api_server_logger.info(f"release {self.engine_client.semaphore.status()}")
@@ -24,6 +24,7 @@ from typing import List, Optional
import numpy as np
import fastdeploy.metrics.trace as tracing
from fastdeploy.engine.request import RequestOutput
from fastdeploy.entrypoints.openai.protocol import (
CompletionLogprobs,
@@ -82,6 +83,7 @@ class OpenAIServingCompletion:
"""
Create a completion for the given prompt.
"""
tracing.trace_set_thread_info("API Server")
if not self._check_master():
err_msg = (
f"Only master node can accept completion request, please send request to master node: {self.master_ip}"
@@ -106,6 +108,8 @@ class OpenAIServingCompletion:
else:
request_id = f"cmpl-{uuid.uuid4()}"
api_server_logger.info(f"Initialize request {request_id}: {request}")
tracing.trace_req_start(rid=request_id, trace_content=request.trace_context, role="FastDeploy")
del request.trace_context
request_prompt_ids = None
request_prompts = None
@@ -322,6 +326,19 @@ class OpenAIServingCompletion:
aggregated_speculate_metrics[rid] = output_speculate_metrics
if data.get("finished", False):
trace_carrier = data.get("trace_carrier")
if trace_carrier:
tracing.trace_set_proc_propagate_context(request_id, trace_carrier)
start_time = data["metrics"]["engine_recv_latest_token_time"]
tracing.trace_report_span(
tracing.TraceSpanName.POSTPROCESSING,
request_id,
int(start_time * 1e9),
int(time.time() * 1e9),
thread_finish_flag=True,
)
if "trace_carrier" in data:
del data["trace_carrier"]
data["output_token_ids"] = output_tokens[rid]
data["outputs"]["top_logprobs"] = aggregated_top_logprobs[rid]
data["outputs"]["draft_top_logprobs"] = aggregated_draft_top_logprobs[rid]
@@ -347,6 +364,7 @@ class OpenAIServingCompletion:
except Exception as e:
api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True)
finally:
tracing.trace_req_finish(request_id)
trace_print(LoggingEventName.POSTPROCESSING_END, request_id, getattr(request, "user", ""))
self.engine_client.semaphore.release()
if dealer is not None:
@@ -577,6 +595,19 @@ class OpenAIServingCompletion:
choices = []
if res["finished"]:
trace_carrier = res.get("trace_carrier")
if trace_carrier:
tracing.trace_set_proc_propagate_context(request_id, trace_carrier)
start_time = res["metrics"]["engine_recv_latest_token_time"]
tracing.trace_report_span(
tracing.TraceSpanName.POSTPROCESSING,
request_id,
int(start_time * 1e9),
int(time.time() * 1e9),
thread_finish_flag=True,
)
if "trace_carrier" in res:
del res["trace_carrier"]
num_choices -= 1
if getattr(request, "stream_options", None) and request.stream_options.include_usage:
usage_chunk = CompletionStreamResponse(
@@ -607,6 +638,8 @@ class OpenAIServingCompletion:
api_server_logger.error(f"Error in completion_stream_generator: {e}, {str(traceback.format_exc())}")
yield f"data: {ErrorResponse(error=ErrorInfo(message=str(e), code='400', type=ErrorType.INTERNAL_ERROR)).model_dump_json(exclude_unset=True)}\n\n"
finally:
tracing.trace_req_finish(request_id)
trace_print(LoggingEventName.POSTPROCESSING_END, request_id, getattr(request, "user", ""))
del request
if dealer is not None:
+2
View File
@@ -152,6 +152,8 @@ environment_variables: dict[str, Callable[[], Any]] = {
"FD_HPU_CHUNK_SIZE": lambda: int(os.getenv("FD_HPU_CHUNK_SIZE", "64")),
"FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS": lambda: int(os.getenv("FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS", "30")),
"FMQ_CONFIG_JSON": lambda: os.getenv("FMQ_CONFIG_JSON", None),
"FD_OTLP_EXPORTER_SCHEDULE_DELAY_MILLIS": lambda: int(os.getenv("FD_OTLP_EXPORTER_SCHEDULE_DELAY_MILLIS", "500")),
"FD_OTLP_EXPORTER_MAX_EXPORT_BATCH_SIZE": lambda: int(os.getenv("FD_OTLP_EXPORTER_MAX_EXPORT_BATCH_SIZE", "64")),
"FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT": lambda: int(os.getenv("FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT", "120")),
}
+777
View File
@@ -0,0 +1,777 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
# This file is modified from https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/tracing/trace.py
from __future__ import annotations
import inspect
import os
import random
import threading
import time
import uuid
from dataclasses import dataclass
from enum import Enum, unique
from functools import wraps
from typing import Any, Dict, List, Optional
from fastdeploy import envs
from fastdeploy.utils import api_server_logger as logger
opentelemetry_imported = False
tracing_enabled = False
try:
from opentelemetry import context, propagate, trace
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_TRACES_PROTOCOL,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import SpanProcessor, TracerProvider, id_generator
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
opentelemetry_imported = True
except ImportError as e:
print(f"Failed to import opentelemetry, tracing disabled.{e}")
logger.error(f"Failed to import opentelemetry, tracing disabled.{e}")
class id_generator:
class IdGenerator:
pass
logger.info("opentelemetry package is not installed, tracing disabled")
class FilteringSpanProcessor(SpanProcessor):
def __init__(self, exporter: SpanExporter, **kwargs):
self._processor = BatchSpanProcessor(exporter, **kwargs)
def on_start(self, span, parent_context=None):
parent_span = trace.get_current_span()
if parent_span and parent_span.is_recording():
stream_attr = parent_span.attributes.get("stream")
if stream_attr is not None:
span.set_attribute("stream", stream_attr)
self._processor.on_start(span, parent_context)
def on_end(self, span):
# asgi_event_type = span.attributes.get("asgi.event.type")
# stream = span.attributes.get("stream")
span_name = span.name or ""
if "http" in span_name:
return
self._processor.on_end(span)
def shutdown(self):
self._processor.shutdown()
def force_flush(self, timeout_millis=None):
self._processor.force_flush(timeout_millis)
def label_span(request):
if request.stream:
span = trace.get_current_span()
if span is not None and span.is_recording():
span.set_attribute("stream", "true")
@dataclass
class TraceThreadInfo:
host_id: str
pid: int
thread_label: str
tp_rank: int
dp_rank: int
tracer: trace.Tracer
@dataclass
class TraceSliceContext:
slice_name: str
span: Optional[trace.span.Span] = None
# When True, defers slice_name assignment until trace_slice_end()
anonymous: bool = False
@dataclass
class TraceThreadContext:
thread_info: TraceThreadInfo
cur_slice_stack: List[TraceSliceContext]
thread_span: Optional[trace.span.Span] = None
# Record the most recently completed span as the previous span for the next span to be created.
last_span_context: Optional[trace.span.SpanContext] = None
@dataclass
class TraceReqContext:
rid: str
start_time_ns: int
threads_context: Dict[int, TraceThreadContext]
# Indicates whether this instance is a replica from the main process.
# When True, root_span is None and only root_span_context is preserved.
is_copy: bool = False
root_span: Optional[trace.span.Span] = None
root_span_context: Optional[context.Context] = None
@dataclass
class TracePropagateContext:
root_span_context: context.Context
prev_span_context: Optional[trace.span.SpanContext]
def to_dict(self):
carrier: dict[str, str] = {}
propagate.inject(carrier, context=self.root_span_context)
if self.prev_span_context:
return {
"root_span": carrier,
"prev_span": {
"span_id": self.prev_span_context.span_id,
"trace_id": self.prev_span_context.trace_id,
},
}
else:
return {"root_span": carrier, "prev_span": "None"}
@classmethod
def instance_from_dict(cls, d):
if "root_span" not in d or "prev_span" not in d:
return None
carrier = d["root_span"]
root_span_context = propagate.extract(carrier)
if d["prev_span"] == "None":
prev_span_context = None
else:
prev_span_context = trace.span.SpanContext(
trace_id=d["prev_span"]["trace_id"],
span_id=d["prev_span"]["span_id"],
is_remote=True,
)
return cls(root_span_context, prev_span_context)
class TraceCustomIdGenerator(id_generator.IdGenerator):
"""
The default IdGenerator may produce duplicate trace IDs across multiple TP scheduler processes,
hence a custom IdGenerator is implemented.
"""
def __init__(self):
super().__init__()
self.local_random = random.Random()
self.local_random.seed(time.time())
def generate_trace_id(self) -> int:
return self.local_random.getrandbits(64)
def generate_span_id(self) -> int:
return self.local_random.getrandbits(64)
# global variables
remote_trace_contexts: Dict[str, TracePropagateContext] = {}
threads_info: Dict[int, TraceThreadInfo] = {}
reqs_context: Dict[str, TraceReqContext] = {}
__get_cur_time_ns = lambda: int(time.time() * 1e9)
def __get_host_id() -> str:
"""
In distributed tracing systems, obtain a unique node identifier
and inject it into all subsequently generated spans
to prevent PID conflicts between threads on different nodes.
"""
if envs.FD_HOST_NAME:
return envs.FD_HOST_NAME
paths = ["/etc/machine-id", "/var/lib/dbus/machine-id"]
for path in paths:
try:
with open(path, "r") as f:
val = f.read().strip()
if val:
return val
except Exception:
continue
mac = uuid.getnode()
if mac != 0:
return uuid.UUID(int=mac).hex
try:
unique_id = uuid.uuid4().hex + "-" + str(os.getpid())
return unique_id
except Exception:
return "unknown"
# Should be called by each tracked process.
def process_tracing_init():
global tracing_enabled
global __get_cur_time_ns
tracing_enabled = envs.TRACES_ENABLE.lower() == "true"
if not tracing_enabled:
logger.warning("Opentelemetry is DISABLED.")
return
if not opentelemetry_imported:
tracing_enabled = False
return
try:
# --- read env ---
service_name = envs.FD_SERVICE_NAME
host_name = envs.FD_HOST_NAME
resource_attributes = {"service.name": service_name}
if host_name:
resource_attributes["host.name"] = host_name
resource = Resource(attributes=resource_attributes)
endpoint = envs.EXPORTER_OTLP_ENDPOINT
headers = envs.EXPORTER_OTLP_HEADERS
headers = dict(item.split("=") for item in headers.split(",")) if headers else None
otlp_exporter = get_otlp_span_exporter(endpoint, headers)
schedule_delay_millis = envs.FD_OTLP_EXPORTER_SCHEDULE_DELAY_MILLIS
max_export_batch_size = envs.FD_OTLP_EXPORTER_MAX_EXPORT_BATCH_SIZE
processor = FilteringSpanProcessor(
otlp_exporter,
schedule_delay_millis=schedule_delay_millis,
max_export_batch_size=max_export_batch_size,
)
tracer_provider = TracerProvider(resource=resource, id_generator=TraceCustomIdGenerator())
tracer_provider.add_span_processor(processor)
# tracer_provider.add_span_processor(
# SimpleSpanProcessor(ConsoleSpanExporter())
# )
trace.set_tracer_provider(tracer_provider)
except Exception as e:
logger.error(f": initialize opentelemetry error:{e}")
logger.warning("please set correct otlp endpoint")
tracing_enabled = False
return
if hasattr(time, "time_ns"):
__get_cur_time_ns = lambda: int(time.time_ns())
tracing_enabled = True
def get_otlp_span_exporter(endpoint, headers):
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter as GRPCSpanExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter as HTTPSpanExporter,
)
protocol = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, "grpc")
supported_protocols = {"grpc", "http/protobuf"}
if protocol not in supported_protocols:
raise ValueError(
f"Unsupported OTLP protocol '{protocol}' configured. "
f"Supported protocols are: {', '.join(sorted(supported_protocols))}"
)
if protocol == "grpc":
return GRPCSpanExporter(endpoint=endpoint, insecure=True)
elif protocol == "http/protobuf":
return HTTPSpanExporter(endpoint=endpoint, headers=headers)
# Should be called by each tracked thread.
def trace_set_thread_info(thread_label: str, tp_rank: Optional[int] = None, dp_rank: Optional[int] = None):
if not tracing_enabled:
return
pid = threading.get_native_id()
if pid in threads_info:
return
threads_info[pid] = TraceThreadInfo(
host_id=__get_host_id(),
pid=pid,
thread_label=thread_label,
tp_rank=tp_rank,
dp_rank=dp_rank,
tracer=trace.get_tracer("fastdeploy server"),
)
def __create_thread_context(pid, req_span_context, ts: Optional[int] = None):
if pid not in threads_info:
trace_set_thread_info("unknown")
thread_info = threads_info[pid]
thread_context = TraceThreadContext(
thread_info=thread_info,
cur_slice_stack=[],
)
thread_name = f"{thread_info.thread_label}"
if thread_info.tp_rank is not None:
thread_name += f" [TP {thread_info.tp_rank}] "
thread_name += f"(host:{thread_info.host_id} | pid:{pid})"
ts = ts or __get_cur_time_ns()
thread_context.thread_span = thread_context.thread_info.tracer.start_span(
name=thread_name,
start_time=ts,
context=req_span_context,
)
if thread_info.tp_rank is not None:
thread_context.thread_span.set_attributes({"tp_rank": thread_info.tp_rank})
thread_context.thread_span.set_attributes(
{
"host_id": thread_info.host_id,
"pid": thread_info.pid,
"thread_label": thread_info.thread_label,
}
)
return thread_context
def trace_get_proc_propagate_context(rid) -> Optional[Dict[str, Any]]:
if not tracing_enabled:
return None
rid = str(rid)
if rid not in reqs_context or not reqs_context[rid].root_span_context:
return None
pid = threading.get_native_id()
prev_span_context = None
thread_context = reqs_context[rid].threads_context[pid]
if thread_context.cur_slice_stack:
cur_slice_info = thread_context.cur_slice_stack[0]
prev_span_context = cur_slice_info.span.get_span_context()
elif thread_context.last_span_context:
prev_span_context = thread_context.last_span_context
root_span_context = reqs_context[rid].root_span_context
trace_context = TracePropagateContext(root_span_context, prev_span_context)
return trace_context.to_dict()
def trace_set_proc_propagate_context(rid, trace_context: Optional[Dict[str, Any]], ts: Optional[int] = None):
if not tracing_enabled:
return
if not trace_context:
return
trace_context = TracePropagateContext.instance_from_dict(trace_context)
if not trace_context:
return
rid = str(rid)
# Create a copy of the request context
if rid not in reqs_context:
reqs_context[rid] = TraceReqContext(
rid=rid,
start_time_ns=ts or __get_cur_time_ns(),
threads_context={},
root_span_context=trace_context.root_span_context,
is_copy=True,
)
pid = threading.get_native_id()
if pid in reqs_context[rid].threads_context:
return
# Create new thread context.
reqs_context[rid].threads_context[pid] = __create_thread_context(
pid,
trace_context.root_span_context,
reqs_context[rid].start_time_ns,
)
reqs_context[rid].threads_context[pid].last_span_context = trace_context.prev_span_context
def trace_req_start(
rid: str,
trace_content: str,
ts: Optional[int] = None,
role: Optional[str] = "null",
):
if not tracing_enabled:
return
rid = str(rid)
ts = ts or __get_cur_time_ns()
pid = threading.get_native_id()
if pid not in threads_info:
return
tracer = threads_info[pid].tracer
upstream_context = trace_content
# 1. Check if there is already an active Span (from FastAPI Instrumentor)
active_span = trace.get_current_span()
if active_span is not None and active_span.is_recording():
active_span.set_attribute("rid", rid)
new_span_name = active_span.name + f" (Req: {rid})"
active_span.update_name(new_span_name)
active_span_context = active_span.get_span_context()
if active_span_context.is_valid and active_span_context.trace_id != 0:
# Scenario: FastAPIInstrumentor has created the top-level Span
if rid in reqs_context:
return
logger.info(f"Using existing active span from context as root for RID: {rid}")
# Inject the FastAPI Span Context as the root Span Context into the internal structure
reqs_context[rid] = TraceReqContext(
rid=rid,
start_time_ns=ts,
threads_context={},
root_span=active_span,
root_span_context=context.get_current(),
is_copy=True,
)
# Thread context is necessary so that trace_slice_start can find the tracer
if pid not in reqs_context[rid].threads_context:
reqs_context[rid].threads_context[pid] = __create_thread_context(
pid,
context.get_current(),
ts,
)
# No need to manually end req/bootstrap room span, this is handled by FastAPIInstrumentor
return
parent_context = None
use_upstream = False
if upstream_context:
ctx_span = trace.get_current_span(upstream_context)
if ctx_span.get_span_context().is_valid:
use_upstream = True
if use_upstream:
logger.info(f"Continuing upstream trace for RID={rid}")
parent_context = upstream_context
reqs_context[rid] = TraceReqContext(
rid=rid,
start_time_ns=ts,
threads_context={},
is_copy=True,
)
else:
reqs_context[rid] = TraceReqContext(
rid=rid,
start_time_ns=ts,
threads_context={},
is_copy=False,
)
orig_rid = rid.split("_")[0]
role = "" if role == "null" else role
attrs = {"rid": orig_rid}
root_span = tracer.start_span(
name=f"{role} Req {orig_rid}".strip(),
start_time=ts,
context=parent_context,
kind=trace.SpanKind.SERVER,
attributes=attrs,
)
root_span.set_attributes(
{
"rid": rid,
}
)
# Consistently populate the Root Span information in reqs_context
reqs_context[rid].root_span = root_span
reqs_context[rid].root_span_context = trace.set_span_in_context(root_span)
# create thread context and thread span
reqs_context[rid].threads_context[pid] = __create_thread_context(
pid,
reqs_context[rid].root_span_context,
ts,
)
def trace_req_finish(rid: str, ts: Optional[int] = None, attrs: Optional[Dict[str, Any]] = None):
if not tracing_enabled:
return
rid = str(rid)
if rid not in reqs_context:
return
req_context = reqs_context[rid]
ts = ts or __get_cur_time_ns()
# End all unclosed thread spans.
for thread_context in req_context.threads_context.values():
thread_context.thread_span.end(end_time=ts)
# Only end the root_span if it was manually created
if req_context.root_span:
if attrs:
req_context.root_span.set_attributes(attrs)
req_context.root_span.end(end_time=ts)
del reqs_context[rid]
def trace_slice_start(
name: str,
rid: str,
ts: Optional[int] = None,
anonymous: bool = False,
):
if not tracing_enabled:
return
rid = str(rid)
if rid not in reqs_context:
return
pid = threading.get_native_id()
if pid not in reqs_context[rid].threads_context:
return
thread_context = reqs_context[rid].threads_context[pid]
ts = ts or __get_cur_time_ns()
slice_info = TraceSliceContext(
slice_name=name,
anonymous=anonymous,
)
# find prev slice
prev_span_context = None
if not thread_context.cur_slice_stack:
if thread_context.last_span_context:
prev_span_context = thread_context.last_span_context
parent_span = thread_context.thread_span
if thread_context.cur_slice_stack:
parent_span = thread_context.cur_slice_stack[-1].span
parent_span_context = trace.set_span_in_context(parent_span)
span = thread_context.thread_info.tracer.start_span(
name=slice_info.slice_name,
start_time=ts,
context=parent_span_context,
)
if prev_span_context:
span.add_link(prev_span_context)
slice_info.span = span
thread_context.cur_slice_stack.append(slice_info)
def trace_slice_end(
name: str,
rid: str,
ts: Optional[int] = None,
attrs: Optional[Dict[str, Any]] = None,
auto_next_anon: bool = False,
thread_finish_flag: bool = False,
):
if not tracing_enabled:
return
rid = str(rid)
if rid not in reqs_context:
return
pid = threading.get_native_id()
if pid not in reqs_context[rid].threads_context:
return
thread_context = reqs_context[rid].threads_context[pid]
if not thread_context.cur_slice_stack:
logger.warning(f"No matching with the SLICE_START event{name} is required.")
return
ts = ts or __get_cur_time_ns()
slice_info = thread_context.cur_slice_stack[-1]
span = slice_info.span
if slice_info.anonymous:
span.update_name(name)
else:
span = slice_info.span
if slice_info.slice_name != name:
span.set_status(trace.Status(trace.StatusCode.ERROR))
logger.warning(f"Slice name mismatch: {name} != {slice_info.slice_name}")
if attrs:
span.set_attributes(attrs)
span.end(end_time=ts)
thread_context.cur_slice_stack.pop()
if len(thread_context.cur_slice_stack) == 0:
thread_context.last_span_context = span.get_span_context()
# If this is the last slice in the thread,
# release the thread context and check whether to release the request context.
if thread_finish_flag:
thread_context.thread_span.end(end_time=ts)
del reqs_context[rid].threads_context[pid]
if reqs_context[rid].is_copy and not reqs_context[rid].threads_context:
del reqs_context[rid]
return
if auto_next_anon:
trace_slice_start("", rid, ts, True)
# alias
trace_slice = trace_slice_end
def trace_report_span(
name: str,
rid: str,
start_time_ns: int,
end_time_ns: int,
attrs: Dict[str, Any] = None,
thread_finish_flag: bool = False,
):
if not tracing_enabled:
return
trace_slice_start(name, rid, start_time_ns)
trace_slice_end(name, rid, end_time_ns, attrs, False, thread_finish_flag)
# Add event to the current slice on the same thread with the same rid.
def trace_event(name: str, rid: str, ts: Optional[int] = None, attrs: Dict[str, Any] = None):
if not tracing_enabled:
return
rid = str(rid)
if rid not in reqs_context:
return
pid = threading.get_native_id()
if pid not in reqs_context[rid].threads_context:
return
thread_context = reqs_context[rid].threads_context[pid]
if not thread_context.cur_slice_stack:
logger.warning("No slice is currently being traced.")
return
ts = ts or __get_cur_time_ns()
slice_info = thread_context.cur_slice_stack[-1]
slice_info.span.add_event(name=name, timestamp=ts, attributes=attrs)
# Add attrs to the current slice on the same thread with the same rid.
def trace_slice_add_attr(rid: str, attrs: Dict[str, Any]):
if not tracing_enabled:
return
rid = str(rid)
if rid not in reqs_context:
return
pid = threading.get_native_id()
if pid not in reqs_context[rid].threads_context:
return
thread_context = reqs_context[rid].threads_context[pid]
if not thread_context.cur_slice_stack:
logger.warning("No slice is currently being traced.")
return
slice_info = thread_context.cur_slice_stack[-1]
slice_info.span.set_attributes(attrs)
def trace_span(span_name: str = None):
def decorator(func):
if not tracing_enabled:
return func
pid = threading.get_native_id()
if pid not in threads_info:
trace_set_thread_info("FastDeploy")
tracer = threads_info[pid].tracer
name = span_name or func.__name__
if inspect.iscoroutinefunction(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
with tracer.start_as_current_span(name):
return await func(*args, **kwargs)
return async_wrapper
else:
@wraps(func)
def sync_wrapper(*args, **kwargs):
with tracer.start_as_current_span(name):
return func(*args, **kwargs)
return sync_wrapper
return decorator
@unique
class TraceSpanName(str, Enum):
FASTDEPLOY = "FASTDEPLOY"
PREPROCESSING = "PREPROCESSING"
SCHEDULE = "SCHEDULE"
PREFILL = "PREFILL"
DECODE = "DECODE"
POSTPROCESSING = "POSTPROCESSING"
-262
View File
@@ -1,262 +0,0 @@
import json
import os
from fastapi import FastAPI
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.propagate import extract, inject
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import SpanProcessor, TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
SpanExporter,
)
from fastdeploy import envs
from fastdeploy.utils import llm_logger
# OpenTelemetry Trace context store in metadata
TRACE_CARRIER = "trace_carrier"
traces_enable = False
tracer = trace.get_tracer(__name__)
class FilteringSpanProcessor(SpanProcessor):
def __init__(self, exporter: SpanExporter):
self._processor = BatchSpanProcessor(exporter)
# 父span属性继承逻辑
def on_start(self, span, parent_context=None):
parent_span = trace.get_current_span()
if parent_span and parent_span.is_recording():
stream_attr = parent_span.attributes.get("stream")
if stream_attr is not None:
span.set_attribute("stream", stream_attr)
self._processor.on_start(span, parent_context)
# span导出时的过滤逻辑
def on_end(self, span):
asgi_event_type = span.attributes.get("asgi.event.type")
stream = span.attributes.get("stream")
span_name = span.name or ""
if stream and asgi_event_type == "http.response.body" and "http send" in span_name:
return
self._processor.on_end(span)
def shutdown(self):
self._processor.shutdown()
def force_flush(self, timeout_millis=None):
self._processor.force_flush(timeout_millis)
# 标记函数
def lable_span(request):
if request.stream:
span = trace.get_current_span()
if span is not None and span.is_recording():
span.set_attribute("stream", "true")
def set_up():
try:
# when TRACES_ENABLED=true start trace
global traces_enable
traces_enable = envs.TRACES_ENABLE.lower() == "true"
if not traces_enable:
llm_logger.warning("Opentelemetry is DISABLED.")
return
llm_logger.info("Opentelemetry is ENABLED, configuring...")
# --- read env ---
service_name = envs.FD_SERVICE_NAME
host_name = envs.FD_HOST_NAME
# --- set attributes (Service Name, Host Name, etc.) ---
resource_attributes = {"service.name": service_name}
if host_name:
resource_attributes["host.name"] = host_name
resource = Resource(attributes=resource_attributes)
# --- set Exporter ---
exporter_type = envs.TRACES_EXPORTER.lower()
if exporter_type == "otlp":
endpoint = envs.EXPORTER_OTLP_ENDPOINT # should be set
headers = envs.EXPORTER_OTLP_HEADERS # e.g., "Authentication=***,k2=v2"
otlp_exporter = OTLPSpanExporter(
endpoint=endpoint,
headers=(dict(item.split("=") for item in headers.split(",")) if headers else None),
)
processor = FilteringSpanProcessor(otlp_exporter)
llm_logger.info(f"Using OTLP Exporter, sending to {endpoint} with headers {headers}")
else: # default console
processor = FilteringSpanProcessor(ConsoleSpanExporter())
llm_logger.info("Using Console Exporter.")
# --- set Tracer Provider ---
provider = TracerProvider(resource=resource)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
global tracer
tracer = trace.get_tracer(__name__)
except:
llm_logger.error("set_up failed")
pass
def instrument(app: FastAPI):
try:
set_up()
if traces_enable:
llm_logger.info("Applying instrumentors...")
FastAPIInstrumentor.instrument_app(app)
try:
LoggingInstrumentor().instrument(set_logging_format=True)
except Exception:
pass
except:
llm_logger.info("instrument failed")
pass
def inject_to_metadata(request, metadata_attr="metadata"):
"""
Inject OpenTelemetry trace context into the metadata field of the request.
Parameters:
request: can be a dict or object, with metadata attributes or fields.
metadata_attr: the field name of metadata, default is 'metadata'.
Operation:
- If metadata does not exist, create a new one and mount it on the request.
- Inject the current trace context as a JSON string and store it in metadata.
- Use the key TRACE_CARRIER to store the injected content.
Note:
- This function is a non-blocking operation, and errors are silently ignored.
- If there is no metadata attribute in the request, an empty dict will be created for it as its attribute
"""
try:
if request is None or not traces_enable:
return
metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None)
if metadata is None:
metadata = {}
if isinstance(request, dict):
request[metadata_attr] = metadata
else:
setattr(request, metadata_attr, metadata)
trace_carrier = {}
inject(trace_carrier)
trace_carrier_json_string = json.dumps(trace_carrier)
metadata[TRACE_CARRIER] = trace_carrier_json_string
except:
pass
def extract_from_metadata(request, metadata_attr="metadata"):
"""
Extract trace context from metadata of request object (dict or class instance).
Parameters:
request: can be a dictionary or any object, containing metadata attributes or fields.
metadata_attr: metadata field name, default is 'metadata'.
Returns:
- Extraction success: returns OpenTelemetry context object (Context)
- Extraction failure or exception: returns None
"""
try:
metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None)
if metadata is None:
return None
trace_carrier_json_string = metadata.get(TRACE_CARRIER)
if trace_carrier_json_string is None:
return None
trace_carrier = json.loads(trace_carrier_json_string)
ctx = extract(trace_carrier)
return ctx
except:
return None
def extract_from_request(request):
"""
Extract trace context from trace_carrier of request object (dict or class instance).
Parameters:
request: can be a dictionary or any object, containing metadata attributes or fields.
metadata_attr: metadata field name, default is 'metadata'.
Returns:
- Extraction success: returns OpenTelemetry context object (Context)
- Extraction failure or exception: returns None
"""
try:
trace_carrier_info = getattr(request, TRACE_CARRIER, None)
if trace_carrier_info is None:
return None
trace_carrier = json.loads(trace_carrier_info)
ctx = extract(trace_carrier)
return ctx
except:
return None
def start_span(span_name, request, kind=trace.SpanKind.CLIENT):
"""
just start a new span in request trace context
"""
try:
if not traces_enable:
return
# extract Trace context from request.metadata.trace_carrier
ctx = extract_from_metadata(request)
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
span.set_attribute("job_id", os.getenv("FD_JOB_ID", default="null"))
pass
except:
pass
def fd_start_span(span_name, kind=trace.SpanKind.CLIENT):
"""
when fd start, start a new span show start success
"""
try:
if not traces_enable:
return
with tracer.start_as_current_span(span_name, kind=kind) as span:
span.set_attribute("job_id", os.getenv("FD_JOB_ID", default="null"))
pass
except:
pass
def start_span_request(span_name, request, kind=trace.SpanKind.CLIENT):
"""
just start a new span in request trace context
"""
try:
if not traces_enable:
return
# extract Trace context from request.metadata.trace_carrier
ctx = extract_from_request(request)
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
span.set_attribute("job_id", os.getenv("FD_JOB_ID", default="null"))
pass
except:
pass
+28
View File
@@ -27,6 +27,7 @@ import numpy as np
import paddle
import zmq
import fastdeploy.metrics.trace as tracing
from fastdeploy import envs
from fastdeploy.engine.request import (
CompletionOutput,
@@ -361,6 +362,7 @@ class TokenProcessor:
"""
read tokens from paddle inference engine and process
"""
tracing.trace_set_thread_info("Token Processor")
if current_platform.is_xpu():
from fastdeploy.model_executor.ops.xpu import (
@@ -704,6 +706,12 @@ class TokenProcessor:
is_prefill = task.disaggregate_info is not None and self.cfg.scheduler_config.splitwise_role == "prefill"
is_decode = task.disaggregate_info is not None and self.cfg.scheduler_config.splitwise_role == "decode"
rid = task_id.split("_")[0]
trace_carrier = task.trace_carrier
metrics = task.metrics
t = metrics.inference_start_time
ts = int(t * 1_000_000_000) if t is not None else 0
tracing.trace_set_proc_propagate_context(rid, trace_carrier, ts)
if self.cfg.speculative_config.method:
self._record_speculative_decoding_accept_num_per_request(task_id, accept_num[i])
if accept_num[i] == -3:
@@ -748,11 +756,21 @@ class TokenProcessor:
self.total_step += 1
current_time = time.time()
trace_carrier = None
if self.tokens_counter[task_id] == 0:
task.metrics.record_recv_first_token()
task.metrics.cal_cost_time()
metrics = copy.copy(task.metrics)
self._record_first_token_metrics(task, current_time)
tracing.trace_report_span(
name=tracing.TraceSpanName.PREFILL,
rid=rid,
start_time_ns=int(task.metrics.inference_start_time * 1e9),
end_time_ns=int(time.time() * 1e9),
thread_finish_flag=False,
)
else:
task.metrics.record_recv_token()
if self.tokens_counter[task_id] == 1 and self.cfg.scheduler_config.splitwise_role == "decode":
@@ -774,6 +792,7 @@ class TokenProcessor:
metrics=metrics,
ic_req_data=task.ic_req_data,
prompt_token_ids_len=task.prompt_token_ids_len,
trace_carrier=trace_carrier,
)
if self.tokens_counter[task_id] == 0:
if task.messages is not None:
@@ -830,6 +849,15 @@ class TokenProcessor:
if token_id in task.eos_token_ids or is_prefill or recovery_stop:
result.finished = True
trace_carrier = tracing.trace_get_proc_propagate_context(rid=rid)
result.trace_carrier = trace_carrier
tracing.trace_report_span(
name=tracing.TraceSpanName.DECODE,
rid=rid,
start_time_ns=int(task.metrics.inference_start_time * 1e9),
end_time_ns=int(time.time() * 1e9),
thread_finish_flag=True,
)
if recovery_stop:
result.error_msg = "Recover is not supported, the result is incomplete!"
llm_logger.info(
+4
View File
@@ -66,6 +66,8 @@ plugins:
Scheduler: 调度器
Graceful Shutdown: 服务优雅关闭
Offline Inference: 离线推理
Observability: 可观测性
Trace: Trace服务
CLI: CLI 使用说明
Chat: Chat命令
Complete: Complete命令
@@ -173,3 +175,5 @@ nav:
- Bench: cli/bench.md
- Run Batch: cli/run-batch.md
- Tokenizer: cli/tokenizer.md
- Observability:
- Trace: observability/trace.md
File diff suppressed because it is too large Load Diff
-193
View File
@@ -1,193 +0,0 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import unittest
from unittest.mock import MagicMock, patch
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from fastdeploy.metrics.trace_util import FilteringSpanProcessor, lable_span
class TestFilteringSpanProcessor(unittest.TestCase):
"""Test cases for FilteringSpanProcessor class"""
def setUp(self):
"""Set up test fixtures"""
self.exporter = ConsoleSpanExporter()
self.processor = FilteringSpanProcessor(self.exporter)
def test_initialization(self):
"""Test that FilteringSpanProcessor is properly initialized"""
self.assertIsInstance(self.processor._processor, BatchSpanProcessor)
self.assertEqual(self.processor._processor.span_exporter, self.exporter)
def test_on_start_with_parent_span(self):
"""Test on_start method with parent span containing stream attribute"""
# Mock span and parent context
mock_span = MagicMock()
mock_parent_span = MagicMock()
mock_parent_span.is_recording.return_value = True
mock_parent_span.attributes.get.return_value = "test_stream"
# Mock trace.get_current_span to return parent span
with patch("fastdeploy.metrics.trace_util.trace.get_current_span", return_value=mock_parent_span):
with patch.object(self.processor._processor, "on_start") as mock_parent_on_start:
self.processor.on_start(mock_span, parent_context=None)
# Verify stream attribute is set on child span
mock_span.set_attribute.assert_called_once_with("stream", "test_stream")
mock_parent_on_start.assert_called_once_with(mock_span, None)
def test_on_start_without_parent_span(self):
"""Test on_start method without parent span"""
mock_span = MagicMock()
# Mock trace.get_current_span to return None
with patch("fastdeploy.metrics.trace_util.trace.get_current_span", return_value=None):
with patch.object(self.processor._processor, "on_start") as mock_parent_on_start:
self.processor.on_start(mock_span, parent_context=None)
# Verify no attributes are set
mock_span.set_attribute.assert_not_called()
mock_parent_on_start.assert_called_once_with(mock_span, None)
def test_on_start_with_non_recording_parent_span(self):
"""Test on_start method with non-recording parent span"""
mock_span = MagicMock()
mock_parent_span = MagicMock()
mock_parent_span.is_recording.return_value = False
with patch("fastdeploy.metrics.trace_util.trace.get_current_span", return_value=mock_parent_span):
with patch.object(self.processor._processor, "on_start") as mock_parent_on_start:
self.processor.on_start(mock_span, parent_context=None)
# Verify no attributes are set
mock_span.set_attribute.assert_not_called()
mock_parent_on_start.assert_called_once_with(mock_span, None)
def test_on_end_filter_stream_http_response(self):
"""Test on_end method filters out stream http response spans"""
mock_span = MagicMock()
mock_span.attributes.get.side_effect = lambda key: {
"asgi.event.type": "http.response.body",
"stream": "true",
}.get(key)
mock_span.name = "http send request"
with patch.object(self.processor._processor, "on_end") as mock_parent_on_end:
self.processor.on_end(mock_span)
# Verify parent on_end is NOT called (span is filtered out)
mock_parent_on_end.assert_not_called()
def test_on_end_keep_non_stream_spans(self):
"""Test on_end method keeps non-stream spans"""
mock_span = MagicMock()
mock_span.attributes.get.side_effect = lambda key: {"asgi.event.type": "http.request", "stream": None}.get(key)
mock_span.name = "http receive request"
with patch.object(self.processor._processor, "on_end") as mock_parent_on_end:
self.processor.on_end(mock_span)
# Verify parent on_end is called
mock_parent_on_end.assert_called_once_with(mock_span)
def test_on_end_keep_spans_without_http_send(self):
"""Test on_end method keeps spans without 'http send' in name"""
mock_span = MagicMock()
mock_span.attributes.get.side_effect = lambda key: {
"asgi.event.type": "http.response.body",
"stream": "true",
}.get(key)
mock_span.name = "other operation"
with patch.object(self.processor._processor, "on_end") as mock_parent_on_end:
self.processor.on_end(mock_span)
# Verify parent on_end is called
mock_parent_on_end.assert_called_once_with(mock_span)
def test_shutdown(self):
"""Test shutdown method"""
with patch.object(self.processor._processor, "shutdown") as mock_shutdown:
self.processor.shutdown()
mock_shutdown.assert_called_once()
def test_force_flush(self):
"""Test force_flush method"""
with patch.object(self.processor._processor, "force_flush") as mock_force_flush:
self.processor.force_flush(timeout_millis=5000)
mock_force_flush.assert_called_once_with(5000)
class TestLableSpan(unittest.TestCase):
"""Test cases for lable_span function"""
def test_lable_span_with_stream_request(self):
"""Test lable_span function with streaming request"""
mock_request = MagicMock()
mock_request.stream = True
mock_span = MagicMock()
mock_span.is_recording.return_value = True
with patch("fastdeploy.metrics.trace_util.trace.get_current_span", return_value=mock_span):
lable_span(mock_request)
# Verify stream attribute is set
mock_span.set_attribute.assert_called_once_with("stream", "true")
def test_lable_span_without_stream_request(self):
"""Test lable_span function with non-streaming request"""
mock_request = MagicMock()
mock_request.stream = False
mock_span = MagicMock()
mock_span.is_recording.return_value = True
with patch("fastdeploy.metrics.trace_util.trace.get_current_span", return_value=mock_span):
lable_span(mock_request)
# Verify no attributes are set
mock_span.set_attribute.assert_not_called()
def test_lable_span_without_current_span(self):
"""Test lable_span function when no current span exists"""
mock_request = MagicMock()
mock_request.stream = True
with patch("fastdeploy.metrics.trace_util.trace.get_current_span", return_value=None):
# Should not raise any exception
lable_span(mock_request)
def test_lable_span_with_non_recording_span(self):
"""Test lable_span function with non-recording span"""
mock_request = MagicMock()
mock_request.stream = True
mock_span = MagicMock()
mock_span.is_recording.return_value = False
with patch("fastdeploy.metrics.trace_util.trace.get_current_span", return_value=mock_span):
lable_span(mock_request)
# Verify no attributes are set
mock_span.set_attribute.assert_not_called()
if __name__ == "__main__":
unittest.main()
@@ -66,6 +66,7 @@ class MockTask:
self.llm_engine_recv_req_timestamp = time.time()
self.ic_req_data = {}
self.prompt_token_ids_len = 0
self.trace_carrier = {}
now = time.time()
self.metrics = RequestMetrics(