From a09792e08536cfffed9bc7818e1e97b893b71f4d Mon Sep 17 00:00:00 2001 From: xiaoxiaohehe001 <49090790+xiaoxiaohehe001@users.noreply.github.com> Date: Wed, 22 Apr 2026 11:58:33 +0800 Subject: [PATCH] [Optimization] Support multimodal runner for image/video feature processing (#7485) * [NewFeature] support mm runner * [NewFeature] support mm runner part1 * support mm runner part2 * support mm runner part3 * support mm runner part4 --- .../engine/sched/resource_manager_v1.py | 13 --- fastdeploy/worker/gpu_model_runner.py | 96 ++++++++++++++++++- fastdeploy/worker/input_batch.py | 40 ++++++++ 3 files changed, 135 insertions(+), 14 deletions(-) diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 38bd25bed0..d4ce72870b 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -33,7 +33,6 @@ from fastdeploy.cache_manager.multimodal_cache_manager import ( ProcessorCacheManager, ) from fastdeploy.cache_manager.v1.metadata import CacheSwapMetadata -from fastdeploy.config import ErnieArchitectures from fastdeploy.engine.request import ( BatchRequest, ImagePosition, @@ -785,13 +784,6 @@ class ResourceManagerV1(ResourceManager): Try to pull a batch of requests from the waiting queue and schedule them. """ - def get_enough_request(request, batch_request): - return ( - ErnieArchitectures.is_ernie5_arch(self.config.model_config.architectures) - and self._is_mm_request(request) - and self.exist_mm_prefill(batch_request) - ) - with self.lock: preempted_reqs: list[Request] = [] error_reqs: list[tuple[str, str]] = [] @@ -930,9 +922,6 @@ class ResourceManagerV1(ResourceManager): ): req_index += 1 continue - if get_enough_request(request, batch_request): - req_index += 1 - continue num_new_tokens = self._get_num_new_tokens(request, token_budget) if num_new_tokens == 0: req_index += 1 @@ -981,8 +970,6 @@ class ResourceManagerV1(ResourceManager): break request = self.waiting[0] - if get_enough_request(request, batch_request): - break if request.status == RequestStatus.WAITING: result = self.waiting_async_process(request) if result is None: diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 95e4e6c75e..e3af67a304 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -86,6 +86,7 @@ else: set_data_ipc, unset_data_ipc, get_position_ids_and_mask_encoder_batch, + update_attn_mask_offsets, ) import zmq @@ -188,6 +189,9 @@ class GPUModelRunner(ModelRunnerBase): else: self.encoder_cache = None + # Note(Zhengshifeng) init video cache for VL model + self.video_cache = {} + # Sampler if not self.speculative_decoding: self.sampler = Sampler(fd_config) @@ -519,6 +523,8 @@ class GPUModelRunner(ModelRunnerBase): "feature_position_list": [], "grid_thw_lst_batches": [], "feature_position_list_batches": [], + "image_features": [], + "image_grid_thws": [], } for request in request_list: if request.task_type.value != RequestType.PREFILL.value: @@ -531,10 +537,10 @@ class GPUModelRunner(ModelRunnerBase): self.encoder_cache.pop(mm_hash, None) idx = self.share_inputs.get_index_by_batch_id(request.idx) req_idx_img_index_map[idx] = -1 + inputs = request.multimodal_inputs if request.with_image: req_idx_img_index_map[idx] = img_index img_index = img_index + 1 - inputs = request.multimodal_inputs if self.encoder_cache is not None: if envs.FD_ENABLE_MAX_PREFILL: if "vit_seqlen" in inputs: @@ -640,6 +646,42 @@ class GPUModelRunner(ModelRunnerBase): prefill_end_index=request.prefill_end_index, ) ) + if ( + inputs is not None + and inputs.get("image_feature_urls", None) is not None + and len(inputs["image_feature_urls"]) > 0 + ): + multi_vision_inputs["image_grid_thws"].extend( + inputs["image_grid_thws"][request.image_start : request.image_end] + ) + image_feature = inputs["image_features"][request.image_start : request.image_end] + + if len(image_feature) > 0: + if isinstance(image_feature[0], paddle.Tensor) and len(image_feature[0].shape) == 2: + # Enable encode vision_embedding + for image_feature_tensor in image_feature: + if image_feature_tensor.shape[1] != self.fd_config.model_config.hidden_size: + logger.error( + f"Shape mismatch: expected shape={self.fd_config.model_config.hidden_size}, \ + but got {image_feature_tensor.shape}" + ) + image_features_gpu = [vf.cuda() for vf in image_feature] + image_embeds = paddle.concat(image_features_gpu, axis=0) + multi_vision_inputs["image_features"].append(image_embeds) + logger.info("Enable Encode image embedding.") + else: + multi_vision_inputs["image_features"].extend(image_feature) + logger.info("Disable Encode image embedding.") + + self.share_inputs["image_features"] = multi_vision_inputs["image_features"] + if len(multi_vision_inputs["image_features"]) > 0: + if ( + isinstance(multi_vision_inputs["image_features"][0], paddle.Tensor) + and len(multi_vision_inputs["image_features"][0].shape) == 2 + ): + self.share_inputs["image_features"] = paddle.concat(multi_vision_inputs["image_features"], axis=0) + self.share_inputs["image_grid_thws"] = multi_vision_inputs["image_grid_thws"] + if self.encoder_cache is not None: if len(multi_vision_inputs["images_lst"]) > 0 or len(multi_vision_inputs["encoder_cache_info"]) > 0: image_features_output = None @@ -756,6 +798,9 @@ class GPUModelRunner(ModelRunnerBase): "position_ids_offset": [0], "max_tokens_lst": [], } + if self.enable_mm: + # Sort by idx to ensure attention mask offsets are filled in order during mm prefill + req_dicts = sorted(req_dicts, key=lambda r: r.idx) if self.enable_cache_manager_v1: # submit_swap_tasks handles: # 1. Waiting for pending evict handlers before submitting new evict @@ -812,6 +857,23 @@ class GPUModelRunner(ModelRunnerBase): prefill_start_index = request.prefill_start_index prefill_end_index = request.prefill_end_index length = prefill_end_index - prefill_start_index + + if self.enable_mm: + self.share_inputs["decode_states"][idx, 0] = 0 + inputs = request.multimodal_inputs + # mm attention_mask + attn_offset_len = prefill_end_index - prefill_start_index + if inputs.get("attention_mask_offset", None) is None: + attention_mask_offset_slice = np.arange(prefill_start_index, prefill_end_index, dtype=np.int32) + else: + attention_mask_offset_slice = np.asarray( + inputs["attention_mask_offset"][prefill_start_index:prefill_end_index], dtype=np.int32 + ) + async_set_value( + self.share_inputs["attn_mask_offsets_full"][idx : idx + 1, 0:attn_offset_len], + attention_mask_offset_slice, + ) + if not self.is_pooling_model: if request.get("enable_thinking") is not None: enable_thinking = bool(request.get("enable_thinking")) @@ -1260,6 +1322,19 @@ class GPUModelRunner(ModelRunnerBase): self._real_output_token_num_host.copy_(real_output_token_num, False) self.output_token_num_event.record() + if self.enable_mm: + attn_mask_offsets = update_attn_mask_offsets( + self.share_inputs["ids_remove_padding"], + self.share_inputs["seq_lens_this_time"], + self.share_inputs["seq_lens_encoder"], + self.share_inputs["seq_lens_decoder"], + self.share_inputs["cu_seqlens_q"], + self.share_inputs["attn_mask_offsets_full"], + self.share_inputs["is_block_step"], + self.share_inputs["decode_states"], + ) + self.share_inputs["attn_mask_offsets"].copy_(attn_mask_offsets, False) + # Initialize forward meta data self.initialize_forward_meta(is_dummy_or_profile_run=is_dummy_or_profile_run) self.forward_meta.real_bsz = real_bsz @@ -1396,6 +1471,7 @@ class GPUModelRunner(ModelRunnerBase): kv_batch_ids=self.share_inputs["kv_batch_ids"], kv_tile_ids_per_batch=self.share_inputs["kv_tile_ids_per_batch"], kv_num_blocks_x_cpu=self.share_inputs["kv_num_blocks_x_cpu"], + attn_mask_offsets=self.share_inputs["attn_mask_offsets"] if self.enable_mm else None, routing_replay_table=routing_replay_table, ) @@ -2280,6 +2356,24 @@ class GPUModelRunner(ModelRunnerBase): model_inputs["generated_modality"] = self.share_inputs["generated_modality"] if self.enable_mm: model_inputs["image_features"] = self.share_inputs["image_features"] + model_inputs["decode_states"] = self.share_inputs["decode_states"] + model_inputs["image_grid_thws"] = self.share_inputs.get("image_grid_thws", None) + video_features = self.share_inputs.get("video_features", None) + video_grid_thws = self.share_inputs.get("video_grid_thws", None) + video_infinity_scales = self.share_inputs.get("video_infinity_scales", None) + if video_features is not None: + model_inputs["video_features"] = video_features + if video_grid_thws is not None: + model_inputs["video_grid_thws"] = video_grid_thws + if video_infinity_scales is not None: + model_inputs["video_infinity_scales"] = video_infinity_scales + + # init features and grid_thws + self.share_inputs["image_features"] = None + self.share_inputs["image_grid_thws"] = None + self.share_inputs["video_features"] = None + self.share_inputs["video_grid_thws"] = None + self.share_inputs["video_infinity_scales"] = None return model_inputs, p_done_idxs, token_num_event diff --git a/fastdeploy/worker/input_batch.py b/fastdeploy/worker/input_batch.py index 6a4489f2bb..f2b5627cbf 100644 --- a/fastdeploy/worker/input_batch.py +++ b/fastdeploy/worker/input_batch.py @@ -233,7 +233,11 @@ class InputBatch: ) if self.is_mm_model: self.image_features = None + self.image_grid_thws = None self.image_features_list = None + self.video_features = None + self.video_grid_thws = None + self.video_infinity_scales = None # Set block tables pre_max_block_num = ( @@ -342,7 +346,26 @@ class InputBatch: dtype="float32", ) self.image_features = None # Built before the forward + self.image_grid_thws = None self.image_features_list = None + self.video_features = None + self.video_grid_thws = None + self.video_infinity_scales = None + + decode_states_len = self.speculative_config.num_speculative_tokens + 1 if self.speculative_decoding else 1 + self.decode_states = paddle.full( + [self.scheduler_config.max_num_seqs, decode_states_len], + -1, + dtype="int32", + ) + self.attn_mask_offsets = paddle.full( + shape=[self.scheduler_config.max_num_seqs * self.model_config.max_model_len], + fill_value=-1, + dtype="int32", + ) + self.attn_mask_offsets_full = paddle.full( + [self.scheduler_config.max_num_seqs, self.model_config.max_model_len], -1, dtype="int32" + ) # For logits processors self.logits_processors = build_logits_processors(self.fd_config) @@ -409,6 +432,7 @@ class InputBatch: swap_data(self.ori_seq_lens_encoder, i1, i2) swap_data(self.system_lens, i1, i2) swap_data(self.system_ids, i1, i2) + swap_data(self.generated_modality, i1, i2) swap_data(self.enable_thinking, i1, i2) swap_data(self.max_think_lens, i1, i2) swap_data(self.limit_think_status, i1, i2) @@ -451,6 +475,8 @@ class InputBatch: self.image_features_list[i1], ) swap_data(self.share_inputs["rope_emb"], i1, i2) + swap_data(self.decode_states, i1, i2) + swap_data(self.attn_mask_offsets_full, i1, i2) # Swap mask rollback swap_data(self.mask_rollback, i1, i2) @@ -578,6 +604,7 @@ class InputBatch: fill_paddle_tensor(self, "ori_seq_lens_encoder", 0) fill_paddle_tensor(self, "system_lens", 0) fill_paddle_tensor(self, "system_ids", -1) + fill_paddle_tensor(self, "generated_modality", -1) fill_paddle_tensor(self, "ids_remove_padding", 0) fill_paddle_tensor(self, "batch_id_per_token", 0) @@ -662,7 +689,14 @@ class InputBatch: dtype="float32", ) self.image_features = None + self.image_grid_thws = None self.image_features_list = None + self.video_features = None + self.video_grid_thws = None + self.video_infinity_scales = None + fill_paddle_tensor(self, "decode_states", -1) + fill_paddle_tensor(self, "attn_mask_offsets", -1) + fill_paddle_tensor(self, "attn_mask_offsets_full", -1) else: # Reset non-multimodal rope_emb self.rope_emb = get_rope( @@ -674,7 +708,11 @@ class InputBatch: ) if self.is_mm_model: self.image_features = None + self.image_grid_thws = None self.image_features_list = None + self.video_features = None + self.video_grid_thws = None + self.video_infinity_scales = None # Reset other miscellaneous tensors fill_paddle_tensor(self, "mask_rollback", 0) @@ -892,6 +930,8 @@ class ProposerInputBatch(InputBatch): swap_data(self.mask_rollback, i1, i2) swap_data(self.recompute_token_num, i1, i2) if self.enable_mm: + swap_data(self.decode_states, i1, i2) + swap_data(self.attn_mask_offsets, i1, i2) swap_data(self.attn_mask_offsets_full, i1, i2) swap_data(self.attn_mask_offsets_decoder, i1, i2)