[Optimization] Support multimodal runner for image/video feature processing (#7485)

* [NewFeature] support mm runner

* [NewFeature] support mm runner part1

* support mm runner part2

* support mm runner part3

* support mm runner part4
This commit is contained in:
xiaoxiaohehe001
2026-04-22 11:58:33 +08:00
committed by GitHub
parent 76b960cb5b
commit a09792e085
3 changed files with 135 additions and 14 deletions
@@ -33,7 +33,6 @@ from fastdeploy.cache_manager.multimodal_cache_manager import (
ProcessorCacheManager,
)
from fastdeploy.cache_manager.v1.metadata import CacheSwapMetadata
from fastdeploy.config import ErnieArchitectures
from fastdeploy.engine.request import (
BatchRequest,
ImagePosition,
@@ -785,13 +784,6 @@ class ResourceManagerV1(ResourceManager):
Try to pull a batch of requests from the waiting queue and schedule them.
"""
def get_enough_request(request, batch_request):
return (
ErnieArchitectures.is_ernie5_arch(self.config.model_config.architectures)
and self._is_mm_request(request)
and self.exist_mm_prefill(batch_request)
)
with self.lock:
preempted_reqs: list[Request] = []
error_reqs: list[tuple[str, str]] = []
@@ -930,9 +922,6 @@ class ResourceManagerV1(ResourceManager):
):
req_index += 1
continue
if get_enough_request(request, batch_request):
req_index += 1
continue
num_new_tokens = self._get_num_new_tokens(request, token_budget)
if num_new_tokens == 0:
req_index += 1
@@ -981,8 +970,6 @@ class ResourceManagerV1(ResourceManager):
break
request = self.waiting[0]
if get_enough_request(request, batch_request):
break
if request.status == RequestStatus.WAITING:
result = self.waiting_async_process(request)
if result is None:
+95 -1
View File
@@ -86,6 +86,7 @@ else:
set_data_ipc,
unset_data_ipc,
get_position_ids_and_mask_encoder_batch,
update_attn_mask_offsets,
)
import zmq
@@ -188,6 +189,9 @@ class GPUModelRunner(ModelRunnerBase):
else:
self.encoder_cache = None
# Note(Zhengshifeng) init video cache for VL model
self.video_cache = {}
# Sampler
if not self.speculative_decoding:
self.sampler = Sampler(fd_config)
@@ -519,6 +523,8 @@ class GPUModelRunner(ModelRunnerBase):
"feature_position_list": [],
"grid_thw_lst_batches": [],
"feature_position_list_batches": [],
"image_features": [],
"image_grid_thws": [],
}
for request in request_list:
if request.task_type.value != RequestType.PREFILL.value:
@@ -531,10 +537,10 @@ class GPUModelRunner(ModelRunnerBase):
self.encoder_cache.pop(mm_hash, None)
idx = self.share_inputs.get_index_by_batch_id(request.idx)
req_idx_img_index_map[idx] = -1
inputs = request.multimodal_inputs
if request.with_image:
req_idx_img_index_map[idx] = img_index
img_index = img_index + 1
inputs = request.multimodal_inputs
if self.encoder_cache is not None:
if envs.FD_ENABLE_MAX_PREFILL:
if "vit_seqlen" in inputs:
@@ -640,6 +646,42 @@ class GPUModelRunner(ModelRunnerBase):
prefill_end_index=request.prefill_end_index,
)
)
if (
inputs is not None
and inputs.get("image_feature_urls", None) is not None
and len(inputs["image_feature_urls"]) > 0
):
multi_vision_inputs["image_grid_thws"].extend(
inputs["image_grid_thws"][request.image_start : request.image_end]
)
image_feature = inputs["image_features"][request.image_start : request.image_end]
if len(image_feature) > 0:
if isinstance(image_feature[0], paddle.Tensor) and len(image_feature[0].shape) == 2:
# Enable encode vision_embedding
for image_feature_tensor in image_feature:
if image_feature_tensor.shape[1] != self.fd_config.model_config.hidden_size:
logger.error(
f"Shape mismatch: expected shape={self.fd_config.model_config.hidden_size}, \
but got {image_feature_tensor.shape}"
)
image_features_gpu = [vf.cuda() for vf in image_feature]
image_embeds = paddle.concat(image_features_gpu, axis=0)
multi_vision_inputs["image_features"].append(image_embeds)
logger.info("Enable Encode image embedding.")
else:
multi_vision_inputs["image_features"].extend(image_feature)
logger.info("Disable Encode image embedding.")
self.share_inputs["image_features"] = multi_vision_inputs["image_features"]
if len(multi_vision_inputs["image_features"]) > 0:
if (
isinstance(multi_vision_inputs["image_features"][0], paddle.Tensor)
and len(multi_vision_inputs["image_features"][0].shape) == 2
):
self.share_inputs["image_features"] = paddle.concat(multi_vision_inputs["image_features"], axis=0)
self.share_inputs["image_grid_thws"] = multi_vision_inputs["image_grid_thws"]
if self.encoder_cache is not None:
if len(multi_vision_inputs["images_lst"]) > 0 or len(multi_vision_inputs["encoder_cache_info"]) > 0:
image_features_output = None
@@ -756,6 +798,9 @@ class GPUModelRunner(ModelRunnerBase):
"position_ids_offset": [0],
"max_tokens_lst": [],
}
if self.enable_mm:
# Sort by idx to ensure attention mask offsets are filled in order during mm prefill
req_dicts = sorted(req_dicts, key=lambda r: r.idx)
if self.enable_cache_manager_v1:
# submit_swap_tasks handles:
# 1. Waiting for pending evict handlers before submitting new evict
@@ -812,6 +857,23 @@ class GPUModelRunner(ModelRunnerBase):
prefill_start_index = request.prefill_start_index
prefill_end_index = request.prefill_end_index
length = prefill_end_index - prefill_start_index
if self.enable_mm:
self.share_inputs["decode_states"][idx, 0] = 0
inputs = request.multimodal_inputs
# mm attention_mask
attn_offset_len = prefill_end_index - prefill_start_index
if inputs.get("attention_mask_offset", None) is None:
attention_mask_offset_slice = np.arange(prefill_start_index, prefill_end_index, dtype=np.int32)
else:
attention_mask_offset_slice = np.asarray(
inputs["attention_mask_offset"][prefill_start_index:prefill_end_index], dtype=np.int32
)
async_set_value(
self.share_inputs["attn_mask_offsets_full"][idx : idx + 1, 0:attn_offset_len],
attention_mask_offset_slice,
)
if not self.is_pooling_model:
if request.get("enable_thinking") is not None:
enable_thinking = bool(request.get("enable_thinking"))
@@ -1260,6 +1322,19 @@ class GPUModelRunner(ModelRunnerBase):
self._real_output_token_num_host.copy_(real_output_token_num, False)
self.output_token_num_event.record()
if self.enable_mm:
attn_mask_offsets = update_attn_mask_offsets(
self.share_inputs["ids_remove_padding"],
self.share_inputs["seq_lens_this_time"],
self.share_inputs["seq_lens_encoder"],
self.share_inputs["seq_lens_decoder"],
self.share_inputs["cu_seqlens_q"],
self.share_inputs["attn_mask_offsets_full"],
self.share_inputs["is_block_step"],
self.share_inputs["decode_states"],
)
self.share_inputs["attn_mask_offsets"].copy_(attn_mask_offsets, False)
# Initialize forward meta data
self.initialize_forward_meta(is_dummy_or_profile_run=is_dummy_or_profile_run)
self.forward_meta.real_bsz = real_bsz
@@ -1396,6 +1471,7 @@ class GPUModelRunner(ModelRunnerBase):
kv_batch_ids=self.share_inputs["kv_batch_ids"],
kv_tile_ids_per_batch=self.share_inputs["kv_tile_ids_per_batch"],
kv_num_blocks_x_cpu=self.share_inputs["kv_num_blocks_x_cpu"],
attn_mask_offsets=self.share_inputs["attn_mask_offsets"] if self.enable_mm else None,
routing_replay_table=routing_replay_table,
)
@@ -2280,6 +2356,24 @@ class GPUModelRunner(ModelRunnerBase):
model_inputs["generated_modality"] = self.share_inputs["generated_modality"]
if self.enable_mm:
model_inputs["image_features"] = self.share_inputs["image_features"]
model_inputs["decode_states"] = self.share_inputs["decode_states"]
model_inputs["image_grid_thws"] = self.share_inputs.get("image_grid_thws", None)
video_features = self.share_inputs.get("video_features", None)
video_grid_thws = self.share_inputs.get("video_grid_thws", None)
video_infinity_scales = self.share_inputs.get("video_infinity_scales", None)
if video_features is not None:
model_inputs["video_features"] = video_features
if video_grid_thws is not None:
model_inputs["video_grid_thws"] = video_grid_thws
if video_infinity_scales is not None:
model_inputs["video_infinity_scales"] = video_infinity_scales
# init features and grid_thws
self.share_inputs["image_features"] = None
self.share_inputs["image_grid_thws"] = None
self.share_inputs["video_features"] = None
self.share_inputs["video_grid_thws"] = None
self.share_inputs["video_infinity_scales"] = None
return model_inputs, p_done_idxs, token_num_event
+40
View File
@@ -233,7 +233,11 @@ class InputBatch:
)
if self.is_mm_model:
self.image_features = None
self.image_grid_thws = None
self.image_features_list = None
self.video_features = None
self.video_grid_thws = None
self.video_infinity_scales = None
# Set block tables
pre_max_block_num = (
@@ -342,7 +346,26 @@ class InputBatch:
dtype="float32",
)
self.image_features = None # Built before the forward
self.image_grid_thws = None
self.image_features_list = None
self.video_features = None
self.video_grid_thws = None
self.video_infinity_scales = None
decode_states_len = self.speculative_config.num_speculative_tokens + 1 if self.speculative_decoding else 1
self.decode_states = paddle.full(
[self.scheduler_config.max_num_seqs, decode_states_len],
-1,
dtype="int32",
)
self.attn_mask_offsets = paddle.full(
shape=[self.scheduler_config.max_num_seqs * self.model_config.max_model_len],
fill_value=-1,
dtype="int32",
)
self.attn_mask_offsets_full = paddle.full(
[self.scheduler_config.max_num_seqs, self.model_config.max_model_len], -1, dtype="int32"
)
# For logits processors
self.logits_processors = build_logits_processors(self.fd_config)
@@ -409,6 +432,7 @@ class InputBatch:
swap_data(self.ori_seq_lens_encoder, i1, i2)
swap_data(self.system_lens, i1, i2)
swap_data(self.system_ids, i1, i2)
swap_data(self.generated_modality, i1, i2)
swap_data(self.enable_thinking, i1, i2)
swap_data(self.max_think_lens, i1, i2)
swap_data(self.limit_think_status, i1, i2)
@@ -451,6 +475,8 @@ class InputBatch:
self.image_features_list[i1],
)
swap_data(self.share_inputs["rope_emb"], i1, i2)
swap_data(self.decode_states, i1, i2)
swap_data(self.attn_mask_offsets_full, i1, i2)
# Swap mask rollback
swap_data(self.mask_rollback, i1, i2)
@@ -578,6 +604,7 @@ class InputBatch:
fill_paddle_tensor(self, "ori_seq_lens_encoder", 0)
fill_paddle_tensor(self, "system_lens", 0)
fill_paddle_tensor(self, "system_ids", -1)
fill_paddle_tensor(self, "generated_modality", -1)
fill_paddle_tensor(self, "ids_remove_padding", 0)
fill_paddle_tensor(self, "batch_id_per_token", 0)
@@ -662,7 +689,14 @@ class InputBatch:
dtype="float32",
)
self.image_features = None
self.image_grid_thws = None
self.image_features_list = None
self.video_features = None
self.video_grid_thws = None
self.video_infinity_scales = None
fill_paddle_tensor(self, "decode_states", -1)
fill_paddle_tensor(self, "attn_mask_offsets", -1)
fill_paddle_tensor(self, "attn_mask_offsets_full", -1)
else:
# Reset non-multimodal rope_emb
self.rope_emb = get_rope(
@@ -674,7 +708,11 @@ class InputBatch:
)
if self.is_mm_model:
self.image_features = None
self.image_grid_thws = None
self.image_features_list = None
self.video_features = None
self.video_grid_thws = None
self.video_infinity_scales = None
# Reset other miscellaneous tensors
fill_paddle_tensor(self, "mask_rollback", 0)
@@ -892,6 +930,8 @@ class ProposerInputBatch(InputBatch):
swap_data(self.mask_rollback, i1, i2)
swap_data(self.recompute_token_num, i1, i2)
if self.enable_mm:
swap_data(self.decode_states, i1, i2)
swap_data(self.attn_mask_offsets, i1, i2)
swap_data(self.attn_mask_offsets_full, i1, i2)
swap_data(self.attn_mask_offsets_decoder, i1, i2)