Files
FastDeploy/fastdeploy/worker/worker_process.py
T
kevin 7707be8384 [Feature][KVCache] Implement Cache Manager V1 with GPU + CPU Cache Support (1/n) (#7097)
* [Feature][KVCache] Support cache manager v1 architecture

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Update cache manager and related modules

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* chore: update cache_manager and related modules

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: add node to evictable set in complete_swap_to_device

When a node transitions from SWAP_TO_DEVICE to DEVICE via
complete_swap_to_device, it was not being added to the
_evictable_device set. This caused nodes with ref_count=0 to
become "orphaned" - not appearing in any evictable set despite
having cache_status=DEVICE.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: update cache manager v1 and related modules

- Add new cache_manager.py with cache management functionality
- Add radix_tree.py for prefix caching
- Update block_pool.py and metadata.py
- Update request.py and resource_manager_v1.py for scheduling
- Update gpu_model_runner.py for GPU model execution

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat(cache): add cache controller v1 implementation

- Add CacheController class for cache management
- Update config.py with cache related configurations
- Refactor gpu_model_runner.py for improved cache handling

* feat(cache_manager): update cache manager v1

* fix(cache_manager): 修复 swap_cache H2D/D2H 方向的 block_ids 逻辑并清理 ForwardMeta

## Motivation

修复 swap_cache_optimized.cu 中 H2D 方向时 src/dst block_ids 使用错误的问题,
并清理 ForwardMeta 中已废弃的 cache_controller 字段。

## Modifications

- fix: swap_cache_optimized.cu 中根据 D2H 模板参数正确选取 src/dst block_ids,
  修复 H2D 方向 src/dst 倒置 bug(同时修复 SwapCachePerLayerImpl 和 SwapCacheAllLayersBatchImpl)
- refactor: cache_manager/v1/__init__.py 将 LayerSwapTimeoutError 导入从
  cache_controller 改为 cache_utils(正确来源)
- refactor: ForwardMeta 移除废弃的 cache_controller 字段
- refactor: gpu_model_runner.py 移除对应的 cache_controller 赋值语句
- test: 新增 tests/cache_manager/v1/test_swap_cache_ops.py 单元测试

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(cache_manager): refactor cache manager v1 and optimize swap ops

## Motivation

对 cache manager v1 进行重构和优化,精简代码结构,提升可维护性。

## Modifications

- 重构 transfer_manager.py,大幅精简代码逻辑
- 优化 swap_cache_optimized.cu GPU 算子实现
- 调整 cache_manager.py、cache_controller.py 逻辑,修复 free_device_blocks 方法缺失问题
- 更新 block_pool.py、cache_utils.py、metadata.py、radix_tree.py
- 精简 gpu_model_runner.py、forward_meta.py、attention.py 中相关调用
- 更新对应单元测试(test_cache_controller、test_swap_cache_ops、test_transfer_manager)
- 调整 config.py 中相关配置项

* [KVCache][MTP] 支持 cache_manager_v1 下的 MTP KV Cache 初始化及多模态 hash

## Motivation

在 enable_cache_manager_v1 路径下,MTP(speculative decode)的 KV Cache 需要由
CacheController 统一管理,以复用 swap/transfer 能力,同时修复多模态场景下 block
hash 未携带 multimodal extra_keys 的问题。

## Modifications

- `cache_controller.py`
  - 新增 `initialize_mtp_kv_cache`:通过 CacheController 初始化 MTP KV Cache,
    并将其注册到 cache_kvs_map,使 transfer_manager 自动覆盖 MTP 层
  - `initialize_host_cache` 中的 num_layers 改为包含 MTP 额外 cache 层数,保证
    Host Cache 也为 MTP 分配足够空间
  - `_free_gpu_cache` 改名为 `free_gpu_cache`(对外可调用)

- `cache_utils.py`
  - 新增 `get_block_hash_extra_keys`:提取单个 block 内的多模态 hash 信息,
    对齐 PrefixCacheManager 的 multimodal extra_keys 逻辑
  - `get_request_block_hasher` 中在 hash_block_tokens 时携带 extra_keys,
    修复多模态场景 prefix cache 命中率不准的问题

- `spec_decode/mtp.py`
  - `update_mtp_block_num` 新增 `skip_cache_init` 参数,避免 v1 cache manager
    路径下重复初始化 MTP KV Cache

- `gpu_model_runner.py`
  - `initialize_kv_cache(v1)` 路径:在主模型 cache 初始化后,调用
    `cache_controller.initialize_mtp_kv_cache` 完成 MTP cache 创建
  - `clear_cache` / `wakeup` / `reset` 等路径:respect `enable_cache_manager_v1`
    标志,跳过重复的 proposer.initialize_kv_cache 调用

## Usage or Command

```bash
# 启动支持 MTP + cache_manager_v1 的推理服务(示例)
bash run.sh
```

* fix(cache_manager): multi-GPU fix, mm hash boundary fix, and remove batch ops

1. Fix CuPy stream/event creation for multi-GPU: wrap all stream operations
   with cp.cuda.Device(device_id) context to ensure streams/events are bound
   to the correct device, preventing cross-device errors in multi-GPU setups.

2. Remove cudaSetDevice from SwapCacheAllLayers (handled by cupy context now).

3. Remove swap_cache_all_layers_batch op: simplified the implementation by
   removing the batch upload variant; all-layer transfers now use the standard
   swap_cache_all_layers with cupy device context.

4. Fix mm hash boundary comparison in get_block_hash_extra_keys: change
   strict less-than (<) to less-than-or-equal (<=) so that multimodal items
   ending exactly at block start are correctly excluded.

5. Extract config fields to KVCacheBase: model_config, cache_config,
   quant_config, parallel_config are now set in the base class __init__ to
   avoid duplication in CacheController and CacheManager subclasses.

6. Translate metadata.py docstrings from Chinese to English for broader
   contributor accessibility.

7. Add test_cache_utils.py: comprehensive unit tests for
   get_block_hash_extra_keys covering all boundary and overlap scenarios.

8. Expand test suite: test_request.py cache fields tests, test_radix_tree.py
   backup candidate tests, test_transfer_manager.py and test_cache_manager.py
   multi-GPU and concurrent operation tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [BugFix][KVCache] fix List import and move write_policy normalization to CacheManager

## Motivation

修复两处问题:
1. `fastdeploy/engine/request.py` 中 `List` 未导入导致 pre-commit F821 报错
2. `write_policy` 归一化逻辑(`write_through` → `write_through_selective`)不应放在 `FDConfig`,移至 `CacheManager.__init__` 中,使其只影响 Cache Manager V1 的内部逻辑

## Modifications

- `fastdeploy/engine/request.py`: 在 `typing` 导入中补充 `List`,删除重复的 `CacheSwapMetadata` TYPE_CHECKING 导入,修复 F821/F811
- `fastdeploy/config.py`: 删除 `write_policy` 归一化逻辑
- `fastdeploy/cache_manager/v1/cache_manager.py`: 将归一化逻辑移入 `CacheManager.__init__`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [BugFix][KVCache] fix pre-commit code style issues

## Motivation

修复 CI pre-commit 代码风格检查失败问题。

## Modifications

- `fastdeploy/engine/common_engine.py`: black 格式化
- `fastdeploy/worker/worker_process.py`: black 格式化 + isort 修复
- `fastdeploy/cache_manager/v1/storage/__init__.py`: isort 修复
- `fastdeploy/worker/gpu_worker.py`: isort 修复

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [Feature][KVCache] update cache_manager_v1 modules

## Motivation

更新 Cache Manager V1 相关模块,完善版权信息、改进模块结构与可维护性。

## Modifications

- `fastdeploy/cache_manager/v1/` 系列模块:补充版权 header,优化代码结构
- `fastdeploy/config.py`:配置项更新
- `fastdeploy/engine/sched/resource_manager_v1.py`:调度相关更新

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [Feature][KVCache] add BatchRequest.from_tasks and refactor worker task parsing

## Motivation

将 worker_process 中重复的 task 解析逻辑收敛到 BatchRequest,减少代码冗余,提升可维护性。

## Modifications

- `fastdeploy/engine/request.py`:新增 `BatchRequest.from_tasks()` 类方法,统一将 task_queue 任务分类为推理请求和控制请求
- `fastdeploy/worker/worker_process.py`:使用 `BatchRequest.from_tasks()` 替代内联解析逻辑,并修复重复的 control_reqs 处理块

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [Feature][KVCache] add NUMA affinity for host cache and skip swap cache tests

## Motivation

优化 Host cache 内存分配的 NUMA 亲和性,减少跨 NUMA 访问延迟;
同时跳过 swap cache ops 测试(当前环境不支持)。

## Modifications

- `fastdeploy/cache_manager/v1/cache_controller.py`:
  - 新增 `_get_numa_node_for_gpu()` 方法,通过 nvidia-smi 或 sysfs 获取 GPU 对应的 NUMA 节点
  - 新增 `_bind_to_closest_numa_node()` 方法,绑定当前线程到 GPU 最近的 NUMA 节点
  - 在 `initialize_host_cache()` 中调用 NUMA 绑定,优化 H2D 传输性能
- `tests/cache_manager/v1/test_swap_cache_ops.py`:跳过所有测试类(`TestSwapCacheAllLayersCorrectness`、`TestSwapCacheAllLayersPerformance`、`TestSwapCacheRandomBlockIndices`)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [BugFix][KVCache] fix unittest failures for cache_manager_v1

三个单测因接口变更或 Mock 方式问题导致失败,需修复。

- tests/distributed/chunked_moe.py:`setup_model_runner` 使用 `__new__` 跳过 `__init__`,补加 `enable_cache_manager_v1 = False`,修复 `AttributeError`
- tests/engine/test_resource_manager.py:`PrefixCacheManager` 为局部导入,`patch` 路径改为定义位置 `fastdeploy.cache_manager.prefix_cache_manager.PrefixCacheManager`
- tests/v1/test_resource_manager_v1.py:`_trigger_preempt` 第四参数已由 `list` 改为 `BatchRequest`,更新测试传参和断言

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [BugFix][KVCache] remove debug logging code

## Modifications

- fastdeploy/engine/request.py:删除调试用 logger 及 prompt_hashes 中的 debug 日志
- fastdeploy/worker/worker_process.py:删除 __main__ 中的调试 import 和 print 语句

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [BugFix][KVCache] fix cupy device id caching and pickle for _match_result

## Motivation

修复两个 bug:
1. `transfer_manager.py` 中每次调用 `cp.cuda.runtime.getDevice()` 存在隐患,应在初始化时缓存为实例变量,保证后续操作使用一致的设备 ID。
2. `request.py` 的 `__getstate__` 未跳过 `_match_result`,该字段包含 BlockNode 树的父子循环引用,pickle 时会触发 `RecursionError`;同时补充 `__setstate__` 确保 unpickle 后字段恢复为安全默认值。

## Modifications

- `transfer_manager.py`:初始化时调用 `cp.cuda.runtime.getDevice()` 并缓存到 `self._cupy_device_id`,后续 `with cp.cuda.Device(...)` 和日志均使用该缓存值。
- `request.py`:
  - `__getstate__` 中将 `_match_result` 加入跳过集合 `_SKIP_KEYS`,避免循环引用导致 pickle 失败。
  - 新增 `__setstate__`,unpickle 后将 `_block_hasher` 和 `_match_result` 恢复为 `None`。

## Usage or Command

* fix(test): fix unit test errors for _trigger_preempt and wakeup with MTP

- Use BatchRequest instead of list in test_trigger_preempt_records_tasks
- Add missing enable_cache_manager_v1 attr in TestSleepWakeupBehavior._make_runner

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [BugFix][KVCache] fix gpu_free_block_list returning wrong block IDs

## Motivation

`gpu_free_block_list` 的兼容 property 中误用了 `list(range(N))`,
将 `available_blocks()` 的返回值当作整数传给 `range()`,
导致返回 `[0, 1, ..., N-1]` 的假列表,而非真实的空闲 block ID。

## Modifications

- `cache_manager/v1/cache_manager.py`:将 `list(range(self._device_pool.available_blocks()))` 改为 `list(self._device_pool.available_blocks())`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [BugFix][KVCache] 修复 gpu_free_block_list 返回 int 导致 TypeError

## Motivation

gpu_free_block_list 属性中调用 BlockPool.available_blocks(),
该方法返回 int(空闲块数量),用 list() 包装 int 会触发
TypeError: 'int' object is not iterable。

## Modifications

将 list(self._device_pool.available_blocks()) 改为
list(self._device_pool._free_blocks),直接返回空闲块索引列表。

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [KVCache][CacheManager] 适配 V1 CacheManager 的 pause/sleep/free_cache 操作

## Motivation

V1 CacheManager 引入了新的 reset_cache() 接口,pause 和 sleep 操作需要适配,
同时 free_cache 需要支持可选的 clear_storage 参数。

## Modifications

- cache_controller.py: free_cache 新增 clear_storage 参数(默认 False),
  仅当 clear_storage=True 时才调用 _clear_storage(),避免不必要的 storage 清空
- common_engine.py: pause 和 sleep 操作中,当 ENABLE_V1_KVCACHE_MANAGER 时
  使用 cache_manager.reset_cache() 替代旧的 reset() 和 pause_transfer 逻辑
- gpu_model_runner.py: sleep 时仅在非 V1 cache manager 下执行 MTP cache 清除

## Usage or Command

# 启动服务(V1 CacheManager)
python -m fastdeploy.entrypoints.openai.api_server \
  --enable-v1-kvcache-manager \
  ...

* [BugFix][KVCache] fix missing enable_cache_manager_v1 in test mocks and remove unused select_blocks_for_backup

- Remove unused `select_blocks_for_backup` method from radix_tree.py
- Fix `match_prefix` default param `skip_storage=True` and log order in cache_manager.py
- Sync test_gpu_model_runner.py with upstream/develop (add TestInsertTasksV1SplitwiseSuffix)
- Add `enable_cache_manager_v1=False` to all mock runners to fix AttributeError in CI

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [BugFix][KVCache] simplify _free_blocks in ResourceManagerV1 for non-v1 path

Remove redundant prefix_caching branch in else path; always call
recycle_gpu_blocks with full block_tables for non-cache-manager-v1 case.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [KVCache][Optimization][BugFix] fix and optimize block_pool, cache_manager, transfer_manager, request

## Motivation

修复 cache_manager v1 中若干代码质量问题,提升性能并消除潜在的类型不一致 Bug。

## Modifications

1. **block_pool.py**:`BlockPool.allocate` 将逐个 pop 循环替换为切片 + 批量 set.update,消除 Python 循环开销,O(n) → O(k)(C 层批量操作)
2. **cache_manager.py**:`match_prefix` 在 prefix caching 关闭时提前 return 前写入空 `MatchResult()`,避免调用方解引用 `_match_result=None` 崩溃
3. **transfer_manager.py**:`_build_device_layer_indices` 在 `_cache_kvs_map` 为空时也重置四个层索引列表,防止残留旧 tensor 被 swap 算子使用
4. **request.py**:`BatchRequest.append_swap_metadata` / `append_evict_metadata` 构造 `CacheSwapMetadata` 时将 `src_type`/`dst_type` 从字符串改为 `CacheLevel` 枚举,与字段类型声明一致;补充 `CacheLevel` 导入;`match_result` 属性返回类型标注修正为 `Optional[MatchResult]`
5. **resource_manager_v1.py**:`_allocate_gpu_blocks` 日志从 `INFO` 降级为 `DEBUG`,消除高频调度路径的日志噪音
6. **tests/engine/test_request.py**:同步更新 `src_type`/`dst_type` 断言为 `CacheLevel` 枚举值,补充 `CacheLevel` 导入

## Usage or Command

单元测试:
```bash
source .venv/py310/bin/activate
cd baidu/FastDeploy
python -m pytest tests/cache_manager/v1/test_cache_manager.py -v
python -m pytest tests/cache_manager/v1/test_transfer_manager.py -v
python -m pytest tests/engine/test_request.py -v
```

* [BugFix][KVCache] Fix BlockPool.allocate returns all blocks when num_blocks=0

## Motivation

当 `allocate(num_blocks=0)` 被调用时,Python 负索引陷阱导致严重错误:
`-0 == 0`,所以 `self._free_blocks[-0:]` 等价于 `self._free_blocks[0:]`,
会返回并清空整个空闲块列表,而非返回空列表。

## Modifications

在 `BlockPool.allocate` 中增加对 `num_blocks == 0` 的提前判断,直接返回 `[]`,
避免触发 Python 负索引陷阱。

## Usage or Command

```bash
# 运行相关单元测试验证修复
python -m pytest tests/cache_manager/v1/test_cache_manager.py -vv -s
```

* [KVCache][Test] add unit tests for cache_manager v1 modules

## Motivation

补全 cache_manager/v1 各模块的单测覆盖,确保核心方法有完整的测试保障。

## Modifications

新增/补充以下测试文件,全部 326 个用例通过:

- tests/cache_manager/v1/test_block_pool.py(新建)
  覆盖 BlockPool.get_metadata/set_metadata/resize、DeviceBlockPool/HostBlockPool
- tests/cache_manager/v1/test_metadata.py(新建)
  覆盖 BlockNode、RadixTreeStats、MatchResult、CacheSwapMetadata、AsyncTaskHandler
- tests/cache_manager/v1/test_cache_utils.py(补充)
  新增 hash_block_tokens、get_request_block_hasher、LayerDoneCounter 时间追踪及内部辅助方法
- tests/cache_manager/v1/test_radix_tree.py(补充)
  新增 TestCompleteSwapToDevice 专项测试类(6 个用例)
- tests/cache_manager/v1/test_cache_manager.py(补充)
  新增 offload_to_host、load_from_host、pending backup 系列、prepare_prefetch_metadata
- tests/cache_manager/v1/test_transfer_manager.py(补充)
  新增 _swap_single_layer 校验路径、sync_input/output_stream、record_input_stream_event

## Usage or Command

```bash
# 运行所有新增单测
source .venv/py310/bin/activate
python -m pytest tests/cache_manager/v1/test_block_pool.py \
  tests/cache_manager/v1/test_metadata.py \
  tests/cache_manager/v1/test_cache_utils.py \
  tests/cache_manager/v1/test_radix_tree.py \
  tests/cache_manager/v1/test_cache_manager.py \
  tests/cache_manager/v1/test_transfer_manager.py -v
# 期望结果:326 passed
```

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
2026-04-21 14:39:00 +08:00

1331 lines
53 KiB
Python

"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import argparse
import asyncio
import json
import os
import time
import traceback
from typing import Tuple
import numpy as np
from fastdeploy.logger.logger import intercept_paddle_loggers
with intercept_paddle_loggers():
import paddle
import paddle.distributed as dist
from paddle.distributed import fleet
from fastdeploy import envs
from fastdeploy.config import (
CacheConfig,
DeployModality,
DeviceConfig,
EarlyStopConfig,
EPLBConfig,
ErnieArchitectures,
FDConfig,
GraphOptimizationConfig,
LoadConfig,
ModelConfig,
ParallelConfig,
PlasAttentionConfig,
RoutingReplayConfig,
SpeculativeConfig,
StructuredOutputsConfig,
)
from fastdeploy.engine.request import (
BatchRequest,
ControlRequest,
ControlResponse,
RequestType,
)
from fastdeploy.eplb.async_expert_loader import (
MODEL_MAIN_NAME,
REARRANGE_EXPERT_MAGIC_NUM,
create_mmap,
load_tensor_from_shm_mem,
)
from fastdeploy.eplb.experts_manager import RedundantExpertManager
from fastdeploy.inter_communicator import EngineWorkerQueue as TaskQueue
from fastdeploy.inter_communicator import (
ExistTaskStatus,
IPCSignal,
ModelWeightsStatus,
RearrangeExpertStatus,
)
from fastdeploy.inter_communicator.fmq import FMQ
from fastdeploy.model_executor.layers.quantization import parse_quant_config
from fastdeploy.model_executor.utils import v1_loader_support
from fastdeploy.platforms import current_platform
from fastdeploy.scheduler import SchedulerConfig
from fastdeploy.utils import all_gather_values, get_logger, optional_type
from fastdeploy.worker.worker_base import WorkerBase
logger = get_logger("worker_process", "worker_process.log")
def get_worker(fd_config: FDConfig, local_rank: int, rank: int) -> WorkerBase:
"""
get worker of different device
"""
if fd_config.model_config.enable_logprob and not current_platform.is_cuda() and not current_platform.is_xpu():
raise NotImplementedError("Only CUDA and XPU platforms support logprob.")
if current_platform.is_dcu():
from fastdeploy.worker.dcu_worker import DcuWorker
return DcuWorker(fd_config=fd_config, local_rank=local_rank, rank=rank)
if current_platform.is_cuda():
from fastdeploy.worker.gpu_worker import GpuWorker
return GpuWorker(fd_config=fd_config, local_rank=local_rank, rank=rank)
if current_platform.is_xpu():
from fastdeploy.worker.xpu_worker import XpuWorker
return XpuWorker(fd_config=fd_config, local_rank=local_rank, rank=rank)
if current_platform.is_iluvatar():
from fastdeploy.worker.iluvatar_worker import IluvatarWorker
return IluvatarWorker(fd_config=fd_config, local_rank=local_rank, rank=rank)
if current_platform.is_gcu():
from fastdeploy.worker.gcu_worker import GcuWorker
return GcuWorker(fd_config=fd_config, local_rank=local_rank, rank=rank)
if current_platform.is_maca():
from fastdeploy.worker.metax_worker import MetaxWorker
return MetaxWorker(fd_config=fd_config, local_rank=local_rank, rank=rank)
if current_platform.is_intel_hpu():
from fastdeploy.worker.hpu_worker import HpuWorker
return HpuWorker(fd_config=fd_config, local_rank=local_rank, rank=rank)
def init_distributed_environment(seed: int = 20) -> Tuple[int, int]:
"""Initialize Paddle Fleet and get rank of worker"""
# Global rank
ranks = dist.get_world_size()
dist_strategy = fleet.DistributedStrategy()
if ranks > 0:
dist_strategy.hybrid_configs = {
"dp_degree": 1,
"mp_degree": ranks,
"pp_degree": 1,
"sharding_degree": 1,
}
# Set control in tensor parallel
dist_strategy.tensor_parallel_configs = {"tensor_init_seed": seed}
fleet.init(is_collective=True, strategy=dist_strategy)
# Local rank
local_rank = fleet.worker_index()
else:
local_rank = 0
return ranks, local_rank
def update_fd_config_for_mm(fd_config: FDConfig) -> None:
architectures = fd_config.model_config.architectures
if fd_config.enable_mm_runtime and ErnieArchitectures.contains_ernie_arch(architectures):
fd_config.model_config.tensor_model_parallel_size = fd_config.parallel_config.tensor_parallel_size
fd_config.model_config.tensor_parallel_rank = fd_config.parallel_config.tensor_parallel_rank
fd_config.model_config.vision_config.dtype = fd_config.model_config.dtype
class PaddleDisWorkerProc:
"""
Paddle Distributed wrapper for fastdeploy.worker.Worker,
for handling single-node multi-GPU tensor parallel.
The wrapper internally executes an event loop that continuously executes requests
in the task queue. Control flow is transmitted by IPC.
"""
def __init__(self, fd_config: FDConfig, ranks: int = 1, local_rank: int = 0) -> None:
"""
Initialize a distributed worker and task queue for single-node multi-GPU setup.
Args:
fd_config (FDConfig): Arguments related to inference, containing
attributes such as weight_dtype, act_dtype, mp_size, hidden_size, head_dim,
num_attention_heads, and ffn_hidden_size.
"""
self.ranks = ranks
self.local_rank = local_rank
self.fd_config = fd_config
self.parallel_config = fd_config.parallel_config
self.cache_config = fd_config.cache_config
self.scheduler_config = fd_config.scheduler_config
self.eplb_config = fd_config.eplb_config
# TODO(gongshaotian): Use worker factory to get worker
self.worker = get_worker(fd_config=fd_config, local_rank=self.local_rank, rank=self.ranks)
self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
self.enable_overlap_schedule = self.scheduler_config.enable_overlap_schedule
self.cached_control_reqs = []
def init_control(self):
engine_worker_queue_port = self.parallel_config.local_engine_worker_queue_port
queue_name = f"ctrl_w2e_rank{self.local_rank}_{engine_worker_queue_port}"
logger.info(f"Init Control Output Queue: {queue_name}(producer)")
self._ctrl_output = FMQ().queue(queue_name, "producer")
def init_health_status(self) -> None:
"""
Initialize the health status of the worker.
Worker Status:
worker_ready_signal:
worker_healthy_live_signal:
exist_task_signal:
exist_swapped_task_signal:
model_weights_status:
"""
if self.parallel_config.data_parallel_size > 1 and not envs.FD_ENABLE_MULTI_API_SERVER:
launched_expert_service_signal_data = np.zeros(
shape=[self.parallel_config.data_parallel_size // self.fd_config.nnode], dtype=np.int32
)
self.launched_expert_service_signal = IPCSignal(
name="launched_expert_service_signal",
array=launched_expert_service_signal_data,
dtype=np.int32,
suffix=self.parallel_config.engine_pid,
create=False,
)
while (
self.launched_expert_service_signal.value[
self.parallel_config.local_data_parallel_id % self.max_chips_per_node
]
== 0
):
pass
# init worker_ready_signal
array_size = min(
self.max_chips_per_node,
self.parallel_config.tensor_parallel_size * self.parallel_config.data_parallel_size,
)
workers_ready = np.zeros(shape=[array_size], dtype=np.int32)
self.worker_ready_signal = IPCSignal(
name="worker_ready_signal",
array=workers_ready,
dtype=np.int32,
suffix=self.parallel_config.engine_pid,
create=False,
)
self.worker_ready_signal.value[self.local_rank % self.max_chips_per_node] = 1
# init worker_healthy_live_signal
workers_alive = np.zeros(shape=[min(array_size, self.parallel_config.tensor_parallel_size)], dtype=np.int32)
self.worker_healthy_live_signal = IPCSignal(
name="worker_healthy_live_signal",
array=workers_alive,
dtype=np.int32,
suffix=self.parallel_config.local_engine_worker_queue_port,
create=False,
)
local_rank = self.local_rank % self.parallel_config.tensor_parallel_size
self.worker_healthy_live_signal.value[local_rank % self.max_chips_per_node] = int(time.time())
# init model_weights_status
workers_model_weights = np.zeros(shape=[1], dtype=np.int32)
self.model_weights_status = IPCSignal(
name="model_weights_status",
array=workers_model_weights,
dtype=np.int32,
suffix=self.parallel_config.local_engine_worker_queue_port,
create=False,
)
# init kv_cache_status
kv_cache_status_data = np.zeros(shape=[1], dtype=np.int32)
self.kv_cache_status = IPCSignal(
name="kv_cache_status",
array=kv_cache_status_data,
dtype=np.int32,
suffix=self.parallel_config.local_engine_worker_queue_port,
create=False,
)
# init exist_task_signal
workers_exist_task = np.zeros([1], dtype=np.int32)
self.exist_task_signal = IPCSignal(
name="exist_task_signal",
array=workers_exist_task,
dtype=np.int32,
suffix=self.parallel_config.local_engine_worker_queue_port,
create=False,
)
# init exist_swapped_task_signal
workers_swapped_task = np.zeros(shape=[1], dtype=np.int32)
self.exist_swapped_task_signal = IPCSignal(
name="exist_swapped_task_signal",
array=workers_swapped_task,
dtype=np.int32,
suffix=self.parallel_config.local_engine_worker_queue_port,
create=False,
)
# init exist_prefill_task_signal
exist_prefill_task_signal_data = np.zeros([1], dtype=np.int32)
self.exist_prefill_task_signal = IPCSignal(
name="exist_prefill_task_signal",
array=exist_prefill_task_signal_data,
dtype=np.int32,
suffix=self.parallel_config.local_engine_worker_queue_port,
create=False,
)
# init engine forward signal
# If engine is being forward, engine_forward_signal_data should be 1.
# If engine is out of forward, engine_forward_signal_data should be 0.
# In pd disaggregation + EP parallel, only when engine is out of forward, scheduler send next batch to worker.
# When engine is out of forward, engine_forward_signal_data must be 0, otherwise scheduler will not schedule next batch.
engine_forward_signal_data = np.zeros([1], dtype=np.int32)
self.engine_forward_signal = IPCSignal(
name="engine_forward_signal",
array=engine_forward_signal_data,
dtype=np.int32,
suffix=self.parallel_config.local_engine_worker_queue_port,
create=False,
)
def update_weights_from_tensor(self, mmap_infos):
"""
update_weights_from_tensor
"""
import time
while True:
if self.experts_manager.tensor_infos is None:
time.sleep(0.1)
else:
break
state_dicts = load_tensor_from_shm_mem(self.experts_manager.tensor_infos, mmap_infos[MODEL_MAIN_NAME], logger)
rank_expert_list, logical_to_physical_map, expert_count = self.experts_manager.get_ep_rank_to_expert_id_list()
self.worker.get_model().redundant_table_manger.update_expert_rank_table(
rank_expert_list, logical_to_physical_map, expert_count
)
# TO BE FIXED
self.worker.get_model().update_state_dict(state_dicts)
self.experts_manager.tensor_infos = None
def _broadcast_model_weights_signal(self, src: int, group) -> int:
model_weights_signal_tensor = paddle.full(shape=[1], fill_value=self.model_weights_signal[0], dtype="int32")
paddle.distributed.broadcast(model_weights_signal_tensor, src=src, group=group)
value = model_weights_signal_tensor.numpy()[0]
return int(value)
def _tp_barrier_wait(self):
if current_platform.is_xpu() or self.enable_overlap_schedule:
self.task_queue.worker_process_tp_barrier.wait()
else:
paddle.distributed.barrier(self.parallel_config.tp_group)
def _init_eplb_signal(self):
if not self.eplb_config.enable_eplb:
return
local_rank = self.local_rank % self.parallel_config.tensor_parallel_size
self.last_dump_expert_workload_ts = 0
self.experts_manager = RedundantExpertManager(
rank=self.local_rank,
ep_size=self.ranks,
fd_config=self.fd_config,
ipc_signal_suffix=self.parallel_config.local_engine_worker_queue_port,
)
dp_ipc_signal_suffix = (
f"{self.parallel_config.local_engine_worker_queue_port}_dp{self.parallel_config.local_data_parallel_id}"
)
if local_rank == 0: # master rank0
signal_update_weight_from_tensor = np.zeros([1], dtype=np.int32)
self.signal_update_weight_from_tensor_array = IPCSignal(
name="signal_update_weight_from_tensor",
array=signal_update_weight_from_tensor,
dtype=np.int32,
suffix=dp_ipc_signal_suffix,
create=False,
)
rearrange_experts_status = np.zeros([1], dtype=np.int32)
self.rearrange_experts_signal = IPCSignal(
name="rearrange_experts_status",
array=rearrange_experts_status,
dtype=np.int32,
suffix=dp_ipc_signal_suffix,
create=False,
)
tp_ipc_signal_suffix = f"{dp_ipc_signal_suffix}_tp{local_rank}"
experts_token_stats = np.zeros(
(self.fd_config.model_config.num_hidden_layers, self.fd_config.model_config.moe_num_experts),
dtype=np.int32,
)
self.local_experts_token_stats_array = IPCSignal(
name="local_experts_token_stats",
array=experts_token_stats,
dtype=np.int32,
suffix=tp_ipc_signal_suffix,
create=False,
)
clear_experts_token_stats = np.zeros([1], dtype=np.int32)
self.signal_clear_experts_token_stats = IPCSignal(
name="signal_clear_experts_token_stats",
array=clear_experts_token_stats,
dtype=np.int32,
suffix=tp_ipc_signal_suffix,
create=False,
)
self.mmap_infos = create_mmap(
[MODEL_MAIN_NAME],
self.local_rank,
self.ranks,
shm_uuid=self.parallel_config.local_engine_worker_queue_port,
eplb_config=self.eplb_config,
logger=logger,
)
def _run_eplb(self, tp_rank):
"""internal call to run eplb"""
if not self.eplb_config.enable_eplb:
return
rearrange_time = time.time()
# Get expert load
if self.local_experts_token_stats_array.value is not None and (
int(rearrange_time) - self.last_dump_expert_workload_ts
> self.eplb_config.redundant_expert_dump_workload_interval
):
self.last_dump_expert_workload_ts = int(rearrange_time)
clear_stat = False
if self.signal_clear_experts_token_stats.value[0] == 1:
clear_stat = True
self.signal_clear_experts_token_stats.value[0] = 0
(
new_stats_array,
_,
_,
_,
) = self.worker.get_model().redundant_table_manger.get_expert_tokens_stats(clear_stat=clear_stat)
self.local_experts_token_stats_array.value[:] = new_stats_array[:]
elif self.local_experts_token_stats_array.value is None:
logger.warning("redundant_expert: local_experts_token_stats not init")
# All DP synchronously update weights
broadcast_value = 0
if tp_rank == 0 and self.signal_update_weight_from_tensor_array.value[0] == 1:
logger.info("redundant_expert: update_weight_from_tensor broadcast signal")
self.signal_update_weight_from_tensor_array.value[0] = 0
broadcast_value = REARRANGE_EXPERT_MAGIC_NUM
data = paddle.to_tensor([broadcast_value])
paddle.distributed.broadcast(data, 0)
if data[0] == REARRANGE_EXPERT_MAGIC_NUM:
self.update_weights_from_tensor(self.mmap_infos)
logger.info(
f"redundant_expert: update_weight_from_tensor success, cost {(time.time() - rearrange_time)*1000}ms"
)
paddle.distributed.barrier()
if tp_rank == 0:
self.rearrange_experts_signal.value[0] = RearrangeExpertStatus.DONE.value
logger.info("redundant_expert: done")
def event_loop_normal(self) -> None:
"""Main event loop for Paddle Distributed Workers.
TODO(gongshaotian): support remote calling of functions that control worker.
"""
# init eplb signal
self._init_eplb_signal()
tp_size = self.parallel_config.tensor_parallel_size
# Currently, only support single node
self.nnode = (tp_size + self.max_chips_per_node) // self.max_chips_per_node
max_occupied_batch_index = 0
tp_rank = self.local_rank % tp_size
# TODO: Unify status variables model_weights_status (shared memory) and model_weights_signal (numpy array) to one
self.model_weights_signal = np.zeros([1], dtype=np.int32)
while True:
if self.fd_config.load_config.dynamic_load_weight and not envs.FD_ENABLE_V1_UPDATE_WEIGHTS:
self.model_weights_signal[0] = int(self.model_weights_status.value[0])
if self.ranks > 1:
self.model_weights_signal[0] = self._broadcast_model_weights_signal(src=0, group=None)
req_dicts = None
self.worker_healthy_live_signal.value[tp_rank % self.max_chips_per_node] = int(time.time())
# The first worker detects whether there are tasks in the task queue
if tp_rank == 0:
if self.task_queue.exist_tasks():
if envs.ENABLE_V1_KVCACHE_SCHEDULER or not (
self.fd_config.enable_mm_runtime and self.worker.exist_prefill()
):
if self.nnode > 1:
self.task_queue.read_finish_flag.set(1)
else:
self.exist_task_signal.value[0] = ExistTaskStatus.EXIST
# Synchronize the signal set by tp_rank0 visiable to other workers
self._tp_barrier_wait() if tp_size > 1 else None
if self.fd_config.load_config.dynamic_load_weight and not envs.FD_ENABLE_V1_UPDATE_WEIGHTS:
if self.ranks > 1:
paddle.distributed.barrier()
if self.model_weights_signal[0] != ModelWeightsStatus.NORMAL:
logger.info(
f"Rank: {self.local_rank} to update or clear parameters, signal is {self.model_weights_signal[0]}, [-1:clear, 1:update]"
)
from fastdeploy.rl.dynamic_weight_manager import (
DynamicWeightManager,
)
self.model_weights_status.value[0] = self.model_weights_signal[0]
self.kv_cache_status.value[0] = self.model_weights_signal[0]
cache_flag = (
self.fd_config.cache_config.num_cpu_blocks > 0
or self.fd_config.cache_config.kvcache_storage_backend is not None
)
DynamicWeightManager.check_model_weights_status(
self.model_weights_status,
self.kv_cache_status if cache_flag else None,
# model_weights_signal
self.worker.model_runner,
self.parallel_config.local_engine_worker_queue_port,
self.parallel_config.shutdown_comm_group_if_worker_idle,
)
logger.info(f"current task queue data: {self.task_queue.num_tasks()}")
self.task_queue.clear_data()
if self.model_weights_signal[0] == ModelWeightsStatus.UPDATING:
logger.info(
f"Rank: {self.local_rank} has updated parameters. {self.model_weights_status.value[0]}"
)
self.model_weights_signal[0] = ModelWeightsStatus.NORMAL
elif self.model_weights_signal[0] == ModelWeightsStatus.CLEARING:
logger.info(
f"Rank: {self.local_rank} has cleared parameters. {self.model_weights_status.value[0]}"
)
# 如果清理权重后不关闭通信组,那么将推理进程统一阻塞在下面的循环中,否则信号量可能同步混乱;直到下次权重更新时唤醒
if not self.fd_config.parallel_config.shutdown_comm_group_if_worker_idle:
if self.ranks > 1: # 所有 Rank 同时入睡,监听下次的更新信号
paddle.distributed.barrier()
while self.model_weights_signal[0] != ModelWeightsStatus.UPDATING:
self.model_weights_signal[0] = self.model_weights_status.value[0]
if self.ranks > 1:
self.model_weights_signal[0] = self._broadcast_model_weights_signal(
src=0, group=None
)
time.sleep(1)
self.model_weights_status.value[0] = (
ModelWeightsStatus.UPDATING
) # 所有 Rank 已同步唤醒,启动权重更新流程
continue
if self.exist_task_signal.value[0] == ExistTaskStatus.EXIST or self.task_queue.read_finish_flag.get() == 1:
logger.debug(f"Rank: {self.local_rank} Detected new requests.")
self.engine_forward_signal.value[0] = 1
tasks, read_finish = self.task_queue.get_tasks()
# Only one of all tp_size client will get read_finish == True.
if read_finish:
# Reset the two signal.
if self.nnode > 1:
self.task_queue.read_finish_flag.set(0)
else:
self.exist_task_signal.value[0] = ExistTaskStatus.EMPTY
# In EP parallel(corresponing to dp attention), we need to barrier for prefill to prevent data imbalance due to inconsistent data arrival.
# Only EP + DP prefill should barrier for data arrival.
# In mixed mode and decoder in D, we should not barrier to influence decoding.
if self.parallel_config.use_ep and self.scheduler_config.splitwise_role == "prefill":
paddle.distributed.barrier(self.parallel_config.ep_group)
assert (
len(tasks) > 0
), f"task_queue.get_tasks() should contain at least one tuple, [([req1, ...] ,real_bsz)], but got len(tasks)={len(tasks)}"
batch_request, control_reqs, max_occupied_batch_index = BatchRequest.from_tasks(tasks)
if len(control_reqs) > 0:
logger.info(f"Rank: {self.local_rank} received {len(control_reqs)} control request.")
for control_req in control_reqs:
if self.parallel_config.use_ep:
self.cached_control_reqs.append(control_req)
logger.info(f"Rank: {self.local_rank} cached ep control request: {control_req}")
else:
self.run_control_method(control_req)
self._tp_barrier_wait() if tp_size > 1 else None
if len(batch_request) > 0:
# Count prefill requests in current batch
num_prefill_requests = sum(1 for req in batch_request if req.task_type == RequestType.PREFILL)
num_scheduled_requests = len(batch_request)
scheduled_request_ids = [req.request_id for req in batch_request]
logger.info(
f"Rank: {self.local_rank}, num_prefill_requests: {num_prefill_requests}, "
f"max_occupied_batch_index: {max_occupied_batch_index}, "
f"num_scheduled_requests: {num_scheduled_requests}, "
f"scheduled_request_ids: {scheduled_request_ids}"
)
# Process prefill inputs
self.worker.preprocess_new_task(batch_request, max_occupied_batch_index)
else:
if self.scheduler_config.splitwise_role == "prefill":
if tp_size > 1:
# Synchronize the signal for other workers
self._tp_barrier_wait()
continue
# Let the ep group run control method synchronically
if envs.FD_ENABLE_V1_UPDATE_WEIGHTS and self.parallel_config.use_ep:
pendings = all_gather_values(len(self.cached_control_reqs), self.parallel_config.ep_group)
if all([p > 0 for p in pendings]):
logger.info(f"Rank: {self.local_rank} Detected all ep ranks have pending control tasks.")
self.run_control_method(self.cached_control_reqs.pop(0))
if (
not self.parallel_config.use_ep
and hasattr(self.worker.model_runner, "not_need_stop")
and not self.worker.model_runner.not_need_stop()
):
self._tp_barrier_wait() if tp_size > 1 else None
self.engine_forward_signal.value[0] = 0
time.sleep(0.001)
continue
# Check if worker is paused (V1 update weights flow)
if (
self.fd_config.load_config.dynamic_load_weight
and hasattr(self.worker.model_runner, "is_sleeping")
and self.worker.model_runner.is_sleeping
):
self._tp_barrier_wait() if tp_size > 1 else None
continue
# Execute model to generate token. The generated token will be written to the buffer.
# These generated tokens can be obtained through get_output op.
start_execute_time = time.time()
self.worker.execute_model(req_dicts, max_occupied_batch_index)
# Only v0 use this signal
if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.exist_prefill_task_signal.value[0] = self.worker.exist_prefill()
logger.debug(f"execute model cost: {time.time()-start_execute_time:.5f} s")
# run eplb
self._run_eplb(tp_rank)
self.engine_forward_signal.value[0] = 0
if (
not self.parallel_config.use_ep
and hasattr(self.worker.model_runner, "current_launch_token_num")
and self.worker.model_runner.current_launch_token_num == 0
):
self._tp_barrier_wait() if tp_size > 1 else None
time.sleep(0.001)
def initialize_kv_cache(self) -> None:
"""Profiles the peak memory usage of the model to determine how many
KV blocks may be allocated without OOMs.
The engine will first conduct a profiling of the existing memory usage.
Then, it calculate the maximum possible number of GPU and CPU blocks
that can be allocated with the remaining free memory.
.. tip::
You may limit the usage of GPU memory
by adjusting the `gpu_memory_utilization` parameter.
"""
if self.fd_config.parallel_config.do_profile:
# 1. Get available memory(bytes)
available_kv_cache_memory = self.worker.determine_available_memory()
logger.info(f"------- available_kv_cache_memory:{available_kv_cache_memory / 1024**3} GB --------")
# 2. Calculate the appropriate number of blocks
model_block_memory_used = self.worker.cal_theortical_kvcache()
num_blocks_local = int(available_kv_cache_memory // model_block_memory_used)
logger.info(f"------- model_block_memory_used:{model_block_memory_used / 1024**3} GB --------")
logger.info(f"------- num_blocks_local:{num_blocks_local} --------")
if num_blocks_local <= 0:
raise ValueError(
f"The total number of blocks cannot be less than zero bug got {num_blocks_local}. "
"Please increase gpu_memory_utilization "
"Or decrease max_num_batched_tokens(max model length)."
)
if self.ranks > 1:
num_blocks_local = paddle.full(shape=[1], fill_value=num_blocks_local, dtype="int32")
dist.all_reduce(num_blocks_local, op=dist.ReduceOp.MIN)
num_blocks_local = num_blocks_local.item()
if self.local_rank % self.max_chips_per_node == 0:
# 3. Send IPCSignal
get_profile_block_num = np.zeros(shape=[1], dtype=np.int32)
self.get_profile_block_num_signal = IPCSignal(
name="get_profile_block_num",
array=get_profile_block_num,
dtype=np.int32,
suffix=self.parallel_config.engine_pid,
create=False,
)
self.get_profile_block_num_signal.value[0] = num_blocks_local
else:
num_blocks_local = self.fd_config.cache_config.total_block_num
logger.info(f"------- num_blocks_global: {num_blocks_local} --------")
# 4. init kv_cache with accurate num_blocks
self.worker.initialize_cache(num_gpu_blocks=num_blocks_local)
def graph_optimize_and_warm_up_model(self) -> None:
self.worker.graph_optimize_and_warm_up_model()
# reset cache_messager prefilled_step signal
if not envs.ENABLE_V1_KVCACHE_SCHEDULER and self.scheduler_config.splitwise_role == "prefill":
gpu_id = self.worker.model_runner.device_id
prefilled_step_name = f"splitwise_complete_prefilled_step_{self.local_rank}"
prefilled_step_idx_data = np.zeros(shape=[1], dtype=np.int32)
step_shm_value = IPCSignal(
name=prefilled_step_name, array=prefilled_step_idx_data, dtype=np.int32, suffix=gpu_id, create=False
)
step_shm_value.value[0] = -1
def init_device(self) -> None:
"""Initialize device and Construct model runner"""
self.worker.init_device()
def start_task_queue_service(self):
# Initialize task queue
if not envs.FD_ENGINE_TASK_QUEUE_WITH_SHM:
task_address = (
self.parallel_config.pod_ip,
self.parallel_config.local_engine_worker_queue_port,
)
else:
task_address = f"/dev/shm/fd_task_queue_{self.parallel_config.local_engine_worker_queue_port}.sock"
logger.info(f"connect task queue address {task_address}")
self.task_queue = TaskQueue(
address=task_address,
is_server=False,
num_client=self.parallel_config.tensor_parallel_size,
client_id=self.parallel_config.tensor_parallel_rank,
local_data_parallel_id=self.parallel_config.local_data_parallel_id,
)
def load_model(self) -> None:
"""Load weights and create model"""
self.worker.load_model()
loaded_model_signal_data = np.zeros(shape=[1], dtype=np.int32)
self.loaded_model_signal = IPCSignal(
name="loaded_model_signal",
array=loaded_model_signal_data,
dtype=np.int32,
suffix=self.parallel_config.engine_pid,
create=False,
)
if self.ranks > 1:
paddle.distributed.barrier()
self.loaded_model_signal.value[0] = 1
def run_control_method(self, control_request: ControlRequest) -> None:
logger.info(f"Rank: {self.local_rank} Start to run control request: {control_request}")
request_id = control_request.request_id
method = control_request.method
kwargs = control_request.args
handler = getattr(self.worker, method, None)
if handler is None or not callable(handler):
error_msg = f"Rank: {self.local_rank} Unknown control method {method}"
error_result = ControlResponse(request_id, 400, error_msg)
asyncio.run(self._ctrl_output.put(error_result))
return
try:
result = handler(**kwargs)
succ_result = ControlResponse(request_id, 200, "Success", result)
logger.info(
f"Rank: {self.local_rank} Successfully run control request: {control_request}, response: {succ_result}"
)
asyncio.run(self._ctrl_output.put(succ_result, shm_threshold=100 * 1024 * 1024))
except Exception as e:
error_msg = f"Rank: {self.local_rank} Failed to run control method {method}: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
error_result = ControlResponse(request_id, 500, error_msg)
asyncio.run(self._ctrl_output.put(error_result))
def parse_args():
"""
Parse args from command line
"""
parser = argparse.ArgumentParser("FastDeploy LLM Inference")
parser.add_argument(
"-m",
"--model",
type=str,
default="./output",
help="model dir",
)
parser.add_argument("-mbs", "--max_num_seqs", type=int, default=34, help="max batch size")
parser.add_argument("--num_gpu_blocks_override", type=int, default=None)
parser.add_argument("--block_size", type=int, default=64)
parser.add_argument("--pod_ip", type=str, default="127.0.0.1")
parser.add_argument("--engine_worker_queue_port", type=str, default="9923")
parser.add_argument("--max_model_len", type=int, default=3072, help="max model len")
parser.add_argument("--device_ids", type=str, default="0", help="cuda visible devices")
parser.add_argument("--dtype", type=str, default="bfloat16", help="input dtype")
parser.add_argument("--enc_dec_block_num", type=int, default=1, help="encoder's decoder num")
parser.add_argument(
"--kv_cache_ratio",
type=float,
default=0.7,
help="kv cache ratio for input",
)
parser.add_argument("--first_token_id", type=int, default=1, help="first token id")
parser.add_argument(
"--gpu_memory_utilization",
type=float,
default=0.9,
help="gpu memory utilization",
)
parser.add_argument("--engine_pid", type=int, default=None, help="Process ID of engine")
parser.add_argument("--do_profile", action="store_true", help="do profile or not")
parser.add_argument("--pad_token_id", type=int, default=-1, help="pad token id")
parser.add_argument("--eos_tokens_lens", type=int, default=2, help="eos token lens")
parser.add_argument(
"--enable_chunked_prefill",
action="store_true",
help="enable chunked prefill",
)
parser.add_argument(
"--use_internode_ll_two_stage",
action="store_true",
help="enable internode_ll_two_stage",
)
parser.add_argument(
"--speculative_config",
type=json.loads,
default=None,
help="Configuration of SpeculativeConfig.",
)
parser.add_argument(
"--enable_flashinfer_allreduce_fusion",
action="store_true",
default=False,
help="Flag to enable all reduce fusion kernel in flashinfer.",
)
parser.add_argument(
"--max_num_batched_tokens",
type=int,
default=2048,
help="max num batched tokens",
)
parser.add_argument(
"--enable_prefix_caching",
action="store_true",
help="enable prefix cache",
)
parser.add_argument(
"--disable_custom_all_reduce",
action="store_true",
help="enable custom all-reduce",
)
parser.add_argument(
"--disable_sequence_parallel_moe",
action="store_true",
help="disable sequence parallel moe",
)
parser.add_argument("--splitwise_role", type=str, default="mixed", help="splitwise role")
parser.add_argument(
"--tensor_parallel_size",
type=int,
default=1,
help="tensor parallel size",
)
parser.add_argument(
"--expert_parallel_size",
type=int,
default=1,
help="expert parallel size",
)
parser.add_argument(
"--data_parallel_size",
type=int,
default=1,
help="data parallel size",
)
parser.add_argument(
"--enable_expert_parallel",
action="store_true",
help="enable expert parallel",
)
parser.add_argument(
"--enable_chunked_moe",
action="store_true",
help="enable chunked moe",
)
parser.add_argument(
"--chunked_moe_size",
type=int,
default=256,
help="chunk size of moe input",
)
parser.add_argument("--ori_vocab_size", type=int, default=None)
parser.add_argument("--think_start_id", type=int, default=-1)
parser.add_argument("--think_end_id", type=int, default=-1)
parser.add_argument("--image_patch_id", type=int, default=-1)
parser.add_argument("--line_break_id", type=int, default=-1)
parser.add_argument("--think_truncate_prompt_ids", type=json.loads, default=[])
parser.add_argument(
"--quantization",
type=json.loads,
default=None,
help="Quantization name for the model, currently support "
"'wint4', 'wint8',"
"default is None. The priority of this configuration "
"is lower than that of the config file. "
"More complex quantization methods need to be configured via the config file.",
)
parser.add_argument(
"--graph_optimization_config",
type=json.loads,
default=None,
help="Configuration of Graph optimization backend.",
)
parser.add_argument(
"--plas_attention_config",
type=json.loads,
default=None,
help="Configation of plas attention.",
)
parser.add_argument(
"--guided_decoding_backend",
type=str,
default="off",
help="guided decoding backend",
)
parser.add_argument(
"--disable_any_whitespace",
action="store_true",
help="Disable any whitespace for guided decoding.",
)
parser.add_argument(
"--dynamic_load_weight",
action="store_true",
help="Enable dynamic weight loading strategy",
)
parser.add_argument(
"--load_strategy",
type=str,
choices=["ipc", "ipc_snapshot", "meta", "normal", "rsync"],
default="ipc_snapshot",
help="Weight loading method when dynamic loading is enabled: "
"'ipc': real-time IPC streaming with automatic resharding, "
"'ipc_snapshot': load from disk snapshot of IPC weights.",
)
parser.add_argument(
"--rsync_config",
type=json.loads,
default=None,
help="Rsync weights config",
)
parser.add_argument(
"--enable_logprob",
action="store_true",
help="Enable output of token-level log probabilities.",
)
parser.add_argument(
"--max_logprobs",
type=int,
default=20,
help="Maximum number of log probabilities.",
)
parser.add_argument(
"--logprobs_mode",
type=str,
default="raw_logprobs",
help="Indicates the content returned in the logprobs.",
)
parser.add_argument(
"--reasoning_parser",
type=str,
default=None,
help="Flag specifies the reasoning parser to use for extracting reasoning content from the model output",
)
parser.add_argument(
"--early_stop_config",
type=json.loads,
default=None,
help="Configuration of early stop.",
)
parser.add_argument(
"--load_choices",
type=str,
default="default_v1",
help="The format of the model weights to load. default/default_v1/dummy.",
)
parser.add_argument(
"--model_loader_extra_config",
type=json.loads,
default=None,
help="Additional configuration for model loader (JSON format). "
'e.g., \'{"enable_multithread_load": true, "num_threads": 8}\'',
)
parser.add_argument(
"--ips",
type=str,
default=None,
help="The ips of multinode deployment.",
)
parser.add_argument(
"--lm_head_fp32",
action="store_true",
help="Flag to specify dtype of lm_head as FP32",
)
parser.add_argument(
"--moe_gate_fp32",
action="store_true",
help="Flag to specify dtype of gate as FP32",
)
parser.add_argument(
"--max_encoder_cache",
type=int,
help="Maximum encoder cache tokens(use 0 to disable).",
)
parser.add_argument(
"--model-impl",
type=str,
choices=["auto", "fastdeploy", "paddleformers"],
default="auto",
help="Model implementation backend (auto, fastdeploy, paddleformers)",
)
parser.add_argument(
"--cache-transfer-protocol",
type=str,
default="ipc",
help="support protocol list, comma separated, default is ipc",
)
parser.add_argument(
"--runner",
type=str,
default="auto",
help="The type of model runner to use.Each FD instance only supports one model runner.even if the same model can be used for multiple types.",
)
parser.add_argument(
"--convert",
type=str,
default="auto",
help="Convert the model using adapters. The most common use case is to adapt a text generation model to be used for pooling tasks.",
)
parser.add_argument(
"--override-pooler-config",
type=optional_type(json.loads),
default=None,
help="Override configuration for the pooler.",
)
parser.add_argument(
"--logits-processors",
type=str,
nargs="+",
default=[],
help="FQCNs (Fully Qualified Class Names) of logits processors supported by the service.",
)
parser.add_argument(
"--eplb_config",
type=json.loads,
default=None,
help="EPLB Configuration.",
)
parser.add_argument(
"--routing_replay_config",
type=json.loads,
default=None,
help="Configation of Rollout Routing Replay.",
)
parser.add_argument(
"--shutdown_comm_group_if_worker_idle",
action="store_true",
help="Shutdown comm group if worker idle.",
)
parser.add_argument(
"--enable_entropy",
action="store_true",
help="Enable output of token-level entropy.",
)
parser.add_argument(
"--mm_max_tokens_per_item",
type=json.loads,
default=None,
help="Maximum tokens per item in mm input.",
)
parser.add_argument(
"--num_cpu_blocks",
type=int,
default=0,
help="Number of cpu blocks.",
)
parser.add_argument(
"--kvcache_storage_backend",
type=str,
help="KVCache storage backend.",
)
parser.add_argument(
"--enable_overlap_schedule",
action="store_true",
help="Enable overlap schedule",
)
parser.add_argument(
"--ep_prefill_use_worst_num_tokens",
action="store_true",
help="enable to avoid cpu sync",
)
parser.add_argument(
"--deploy_modality",
type=str,
default="mixed",
choices=["mixed", "text"],
help="Deploy modality: 'mixed' for multimodal, 'text' for text-only.",
)
args = parser.parse_args()
return args
def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
"""Initialize FDConfig from either RolloutModelConfig or argparse.Namespace
Args:
config: Configuration object containing all parameters (either RolloutModelConfig or argparse.Namespace)
Returns:
FDConfig: Initialized FastDeploy configuration object
"""
# RL rollout
paddle.set_default_dtype(args.dtype)
model_config = ModelConfig(vars(args))
device_config = DeviceConfig(vars(args))
speculative_config = SpeculativeConfig(args.speculative_config)
parallel_config = ParallelConfig(vars(args))
cache_config = CacheConfig(vars(args))
scheduler_config = SchedulerConfig(vars(args))
eplb_config = EPLBConfig(args.eplb_config)
parallel_config.tensor_parallel_rank = local_rank % parallel_config.tensor_parallel_size
parallel_config.data_parallel_rank = local_rank // parallel_config.tensor_parallel_size
# config for DP
if parallel_config.data_parallel_size > 1:
max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
parallel_config.local_data_parallel_id = parallel_config.data_parallel_rank % (
max_chips_per_node // parallel_config.tensor_parallel_size
)
# config for EP
if parallel_config.expert_parallel_size > 1:
expert_parallel_rank = int(local_rank % parallel_config.expert_parallel_size)
if isinstance(model_config.moe_num_experts, list):
num_experts = model_config.moe_num_experts[0] + eplb_config.redundant_experts_num
elif hasattr(model_config, "num_local_experts") and model_config.num_local_experts is not None:
num_experts = model_config.num_local_experts + eplb_config.redundant_experts_num
else:
num_experts = model_config.moe_num_experts + eplb_config.redundant_experts_num
num_experts_per_rank = num_experts // parallel_config.expert_parallel_size
num_experts_start_offset = expert_parallel_rank * num_experts_per_rank
parallel_config.expert_parallel_rank = expert_parallel_rank
parallel_config.num_experts_per_rank = num_experts_per_rank
parallel_config.num_experts_start_offset = num_experts_start_offset
parallel_config.set_communicate_group()
load_config = LoadConfig(vars(args))
graph_opt_config = GraphOptimizationConfig(args.graph_optimization_config)
plas_attention_config = PlasAttentionConfig(args.plas_attention_config)
early_stop_config = EarlyStopConfig(args.early_stop_config)
structured_outputs_config: StructuredOutputsConfig = StructuredOutputsConfig(args=vars(args))
routing_replay_config = RoutingReplayConfig(args.routing_replay_config)
# Note(tangbinhan): used for load_checkpoint
model_config.pretrained_config.tensor_parallel_rank = parallel_config.tensor_parallel_rank
model_config.pretrained_config.tensor_model_parallel_size = parallel_config.tensor_parallel_size
model_config.pretrained_config.is_mtp = False
model_config.pretrained_config.head_dim = model_config.head_dim
logger.info(f"parallel_config.use_ep {parallel_config.use_ep}")
logger.info(f"parallel_config.tensor_parallel_size {parallel_config.tensor_parallel_size}")
logger.info(f"parallel_config.tensor_parallel_rank {parallel_config.tensor_parallel_rank}")
logger.info(f"parallel_config.engine_worker_queue_port {parallel_config.engine_worker_queue_port}")
if getattr(model_config, "num_hidden_layers", None) is None:
raise ValueError("num_hidden_layers is None")
quant_config = parse_quant_config(
args,
model_config,
is_ernie=(
ErnieArchitectures.contains_ernie_arch(model_config.architectures)
or ErnieArchitectures.is_ernie5_arch(model_config.architectures)
),
is_v1_loader=load_config.load_choices == "default_v1",
)
# Log quantization info
logger.info("===========quantization_config==============")
if quant_config is not None:
if model_config.is_quantized:
logger.info("Model Status: Offline Quantized (pre-quantized weights loaded)")
else:
logger.info("Model Status: Original (will apply online quantization)")
logger.info(f"{model_config.quantization_config}")
else:
logger.info("No quantization config found and use original weight and act dtype.")
logger.info(f"- Dynamic load weight: {load_config.dynamic_load_weight}")
logger.info(f"- Load strategy: {load_config.load_strategy}")
logger.info(f"- Rsync config: {load_config.rsync_config}, {type(load_config.rsync_config)}")
if not (
current_platform.is_cuda()
or current_platform.is_xpu()
or current_platform.is_maca()
or current_platform.is_iluvatar()
or current_platform.is_intel_hpu()
):
logger.info("Set ENABLE_V1_KVCACHE_SCHEDULER to 0 due to not supported.")
envs.ENABLE_V1_KVCACHE_SCHEDULER = 0
if envs.ENABLE_V1_KVCACHE_SCHEDULER and args.splitwise_role == "prefill":
os.environ["PREFILL_NODE_ONE_STEP_STOP_V1"] = "1"
elif envs.ENABLE_V1_KVCACHE_SCHEDULER and args.splitwise_role == "decode":
os.environ["PREFILL_NODE_ONE_STEP_STOP_V1"] = "0"
fd_config = FDConfig(
model_config=model_config,
parallel_config=parallel_config,
speculative_config=speculative_config,
device_config=device_config,
load_config=load_config,
quant_config=quant_config,
graph_opt_config=graph_opt_config,
early_stop_config=early_stop_config,
cache_config=cache_config,
scheduler_config=scheduler_config,
ips=args.ips,
plas_attention_config=plas_attention_config,
structured_outputs_config=structured_outputs_config,
eplb_config=eplb_config,
routing_replay_config=routing_replay_config,
deploy_modality=DeployModality.from_str(getattr(args, "deploy_modality", "mixed")),
)
logger.info(f"parallel_config.local_engine_worker_queue_port {parallel_config.local_engine_worker_queue_port}")
update_fd_config_for_mm(fd_config)
if fd_config.load_config.load_choices == "default_v1" and not v1_loader_support(fd_config):
fd_config.load_config.load_choices = "default"
architecture = fd_config.model_config.architectures[0]
if "PaddleOCR" in architecture:
envs.FD_ENABLE_MAX_PREFILL = 1
fd_config.cache_config.enable_prefix_caching = False
fd_config.cache_config.max_encoder_cache = 0
return fd_config
@paddle.no_grad()
def run_worker_proc() -> None:
"""
start worker process
"""
# Get args form Engine
args = parse_args()
ranks, local_rank = init_distributed_environment()
# Get fd_config
fd_config = initialize_fd_config(args, ranks, local_rank)
# Create worker process
if current_platform.is_iluvatar():
from fastdeploy.worker.iluvatar_worker import IluvatarPaddleDisWorkerProc
worker_proc = IluvatarPaddleDisWorkerProc(fd_config, ranks, local_rank)
else:
worker_proc = PaddleDisWorkerProc(fd_config, ranks, local_rank)
worker_proc.init_control()
# Enable batch-invariant mode for deterministic inference.
# This must happen AFTER worker creation but BEFORE model loading,
# because enable_batch_invariant_mode() calls paddle.enable_compat()
# which makes torch appear available via proxy. If called before worker creation,
# the gpu_model_runner import chain (ernie4_5_vl_processor → paddleformers →
# transformers) will fail when transformers tries to query torch metadata.
if envs.FD_DETERMINISTIC_MODE:
from fastdeploy.model_executor.layers.batch_invariant_ops import (
init_deterministic_mode,
)
init_deterministic_mode()
# Initialize device and create model runner
worker_proc.init_device()
# Load model
worker_proc.load_model()
# Initialize KV Cache
worker_proc.initialize_kv_cache()
# Trigger CUDAGraph capture
worker_proc.graph_optimize_and_warm_up_model()
# Initialize health status
worker_proc.init_health_status()
worker_proc.start_task_queue_service()
# Start event loop
worker_proc.event_loop_normal()
if __name__ == "__main__":
run_worker_proc()