[Processor]add qwen3vl prompt_token_ids support (#6764)

* [Processor]add qwen3vl prompt_token_ids support

* [Processor]add qwen3vl prompt_token_ids support unittest

* [Processor]add qwen3vl prompt_token_ids support precommit
This commit is contained in:
CSWYF3634076
2026-03-11 15:08:56 +08:00
committed by GitHub
parent cffa8c246c
commit 97a4b3631e
6 changed files with 1133 additions and 30 deletions
+390 -2
View File
@@ -14,6 +14,7 @@
# limitations under the License.
"""
import copy
import unittest
from unittest.mock import MagicMock, patch
@@ -379,7 +380,6 @@ class TestQwen3VLProcessor(unittest.TestCase):
# mock vision position computation
# -----------------------
dp = self.processor.processor
dp.image_patch_id = dp.image_token_id
dp._compute_vision_positions = MagicMock(return_value=np.array([[10, 11, 12]], dtype=np.int64))
dp._add_processed_image(img_cache, outputs, uuid)
@@ -388,7 +388,7 @@ class TestQwen3VLProcessor(unittest.TestCase):
self.assertEqual(len(outputs["input_ids"]), num_tokens)
self.assertEqual(
outputs["input_ids"],
[self.processor.image_patch_id] * num_tokens,
[dp.image_token_id] * num_tokens,
)
# ---- mm_positions ----
@@ -416,6 +416,270 @@ class TestQwen3VLProcessor(unittest.TestCase):
self.assertEqual(outputs["image_type_ids"], [0])
self.assertEqual(outputs["fps"], [0])
def test_multimodal_token_len_validation(self):
"""Test token_len validation for raw and processed multimodal paths"""
dp = self.processor.processor
merge_size = dp.image_processor.merge_size
def build_outputs(image=False, video=False):
outputs = {
"mm_positions": [],
"input_ids": [],
"token_type_ids": [],
"position_ids": [],
"cur_position": 0,
"images": [],
"mm_hashes": [],
"grid_thw": [],
"image_type_ids": [],
"fps": [],
}
if image:
outputs["num_input_image_tokens"] = 0
if video:
outputs["num_input_video_tokens"] = 0
return outputs
processed_image = (
np.zeros((merge_size * merge_size, 3, 3), dtype=np.float32),
{"thw": (1, 8, 8)},
)
processed_video = (
np.zeros((merge_size * merge_size, 3, 3), dtype=np.float32),
{"thw": (2, 8, 8), "fps": 5},
)
with self.subTest("add_image"):
with patch.object(
dp.image_processor,
"preprocess",
return_value={
"grid_thw": np.array([1, merge_size * 2, merge_size * 2]),
"pixel_values": np.zeros((1, 3, 3), dtype=np.float32),
},
):
with self.assertRaisesRegex(ValueError, "image tokens num not match the size"):
dp._add_image(mock_pil_image(32, 32), build_outputs(image=True), None, token_len=3)
with self.subTest("add_processed_image"):
with self.assertRaisesRegex(ValueError, "image tokens num not match the size"):
dp._add_processed_image(processed_image, build_outputs(), "uuid", token_len=2)
with self.subTest("add_video"):
with patch.object(
dp.image_processor,
"preprocess",
return_value={
"grid_thw": np.array([1, merge_size * 2, merge_size * 2]),
"pixel_values": np.zeros((1, 3, 3), dtype=np.float32),
},
):
with self.assertRaisesRegex(ValueError, "video tokens num not match the size"):
dp._add_video(
np.zeros((2, 4, 4, 3), dtype=np.uint8),
{"fps": 4},
build_outputs(video=True),
None,
token_len=3,
)
with self.subTest("add_processed_video"):
with self.assertRaisesRegex(ValueError, "video tokens num not match the size"):
dp._add_processed_video(processed_video, build_outputs(), "uuid", token_len=2)
def test_prompt_token_ids2outputs_error_branches(self):
"""Test prompt_token_ids2outputs error branches with minimal fixtures"""
dp = self.processor.processor
request = {
"prompt_token_ids": [dp.image_token_id],
"messages": [{"role": "user", "content": [{"type": "image_url", "uuid": "missing-image"}]}],
}
parsed_messages = [{"role": "user", "content": {"type": "image", "data": None, "uuid": "missing-image"}}]
with self.subTest("missing_without_cache"):
with patch(
"fastdeploy.input.qwen3_vl_processor.process.parse_chat_messages", return_value=parsed_messages
):
with self.assertRaisesRegex(ValueError, "Missing items cannot be retrieved without processor cache."):
dp.prompt_token_ids2outputs(request)
with self.subTest("missing_cache_item_not_found"):
old_enable_processor_cache = dp.enable_processor_cache
dp.enable_processor_cache = True
fake_context = MagicMock()
fake_context.socket.return_value = MagicMock()
try:
with patch(
"fastdeploy.input.qwen3_vl_processor.process.parse_chat_messages", return_value=parsed_messages
):
with patch("fastdeploy.input.qwen3_vl_processor.process.zmq.Context", return_value=fake_context):
with patch.object(dp, "get_processor_cache", return_value=[None]):
with self.assertRaisesRegex(ValueError, "Missing item 0 not found in processor cache"):
dp.prompt_token_ids2outputs(request)
finally:
dp.enable_processor_cache = old_enable_processor_cache
with self.subTest("unexpected_multimodal_type"):
class FlakyTypeItem:
def __init__(self):
self.calls = 0
def get(self, key, default=None):
if key == "type":
self.calls += 1
return "image" if self.calls == 1 else "audio"
if key == "data":
return "bad-data"
if key == "uuid":
return "bad-uuid"
return default
parsed_messages = [{"role": "user", "content": FlakyTypeItem()}]
with patch(
"fastdeploy.input.qwen3_vl_processor.process.parse_chat_messages", return_value=parsed_messages
):
with self.assertRaisesRegex(ValueError, "Unsupported multimodal type: audio"):
dp.prompt_token_ids2outputs(request)
def test_prompt_token_ids2outputs_cache_update_paths(self):
"""Test prompt_token_ids2outputs cache update for missing, 1D and 2D grid_thw paths"""
dp = self.processor.processor
merge_size = dp.image_processor.merge_size
old_enable_processor_cache = dp.enable_processor_cache
dp.enable_processor_cache = True
missing_image = (
np.zeros((merge_size * merge_size, 3, 3), dtype=np.float32),
{"thw": (1, 8, 8)},
)
processed_video = (
np.zeros((merge_size * merge_size, 3, 3), dtype=np.float32),
{"thw": (2, 8, 8), "fps": 6},
)
parsed_messages = [
{
"role": "user",
"content": [
{"type": "image", "data": None, "uuid": "missing-image"},
{"type": "video", "data": {"video": "raw-video", "fps": 4}, "uuid": "raw-video"},
{"type": "video", "data": processed_video, "uuid": "processed-video"},
],
}
]
request = {
"prompt_token_ids": [dp.image_token_id, 99, dp.image_token_id, 98, dp.image_token_id],
"messages": [{"role": "user", "content": [{"type": "text", "text": "unused"}]}],
}
fake_socket = MagicMock()
fake_context = MagicMock()
fake_context.socket.return_value = fake_socket
try:
with patch(
"fastdeploy.input.qwen3_vl_processor.process.parse_chat_messages", return_value=parsed_messages
):
with patch("fastdeploy.input.qwen3_vl_processor.process.zmq.Context", return_value=fake_context):
with patch.object(dp, "_compute_vision_positions", return_value=np.array([[0]], dtype=np.int64)):
with patch.object(
dp.image_processor,
"preprocess",
return_value={
"grid_thw": np.array([1, merge_size, merge_size]),
"pixel_values": np.zeros((1, 3, 3), dtype=np.float32),
},
):
with patch.object(
dp, "_load_and_process_video", return_value=mock_read_frames(4, 4, 2, 4)
):
with patch.object(
dp, "get_processor_cache", return_value=[missing_image]
) as cache_get:
with patch.object(dp, "update_processor_cache") as cache_update:
outputs = dp.prompt_token_ids2outputs(request)
cache_get.assert_called_once_with(fake_socket, ["missing-image"])
cache_update.assert_called_once()
_, cached_hashes, cached_items = cache_update.call_args.args
self.assertEqual(cached_hashes, ["raw-video", "processed-video"])
self.assertEqual(cached_items[0][1]["thw"], (1, merge_size, merge_size))
self.assertEqual(cached_items[1][1]["thw"], (2, 8, 8))
self.assertEqual(outputs["mm_hashes"], ["missing-image", "raw-video", "processed-video"])
self.assertEqual(outputs["input_ids"][-1], dp.image_token_id)
finally:
dp.enable_processor_cache = old_enable_processor_cache
def test_request2ids_cache_update_paths(self):
"""Test request2ids cache update for missing, 1D and 2D grid_thw paths"""
dp = self.processor.processor
merge_size = dp.image_processor.merge_size
old_enable_processor_cache = dp.enable_processor_cache
dp.enable_processor_cache = True
missing_image = (
np.zeros((merge_size * merge_size, 3, 3), dtype=np.float32),
{"thw": (1, 8, 8)},
)
processed_image = (
np.zeros((merge_size * merge_size, 3, 3), dtype=np.float32),
{"thw": (1, 8, 8)},
)
parsed_messages = [
{
"role": "user",
"content": [
{"type": "image", "data": None, "uuid": "missing-image"},
{"type": "image", "data": processed_image, "uuid": "processed-image"},
{"type": "video", "data": {"video": "raw-video", "fps": 4}, "uuid": "raw-video"},
],
}
]
request = {
"messages": [{"role": "user", "content": [{"type": "text", "text": "unused"}]}],
"add_generation_prompt": True,
}
fake_socket = MagicMock()
fake_context = MagicMock()
fake_context.socket.return_value = fake_socket
try:
with patch(
"fastdeploy.input.qwen3_vl_processor.process.parse_chat_messages", return_value=parsed_messages
):
with patch("fastdeploy.input.qwen3_vl_processor.process.zmq.Context", return_value=fake_context):
with patch.object(dp, "_compute_vision_positions", return_value=np.array([[0]], dtype=np.int64)):
with patch.object(
dp.image_processor,
"preprocess",
return_value={
"grid_thw": np.array([1, merge_size, merge_size]),
"pixel_values": np.zeros((1, 3, 3), dtype=np.float32),
},
):
with patch.object(
dp, "_load_and_process_video", return_value=mock_read_frames(4, 4, 2, 4)
):
with patch.object(
dp, "get_processor_cache", return_value=[missing_image]
) as cache_get:
with patch.object(dp, "update_processor_cache") as cache_update:
with patch.object(
self.processor.tokenizer,
"apply_chat_template",
return_value="<|image_pad|>a<|image_pad|>b<|video_pad|>",
):
outputs = dp.request2ids(request)
cache_get.assert_called_once_with(fake_socket, ["missing-image"])
cache_update.assert_called_once()
_, cached_hashes, cached_items = cache_update.call_args.args
self.assertEqual(cached_hashes, ["processed-image", "raw-video"])
self.assertEqual(cached_items[0][1]["thw"], (1, 8, 8))
self.assertEqual(cached_items[1][1]["thw"], (1, merge_size, merge_size))
self.assertEqual(outputs["mm_hashes"], ["missing-image", "processed-image", "raw-video"])
finally:
dp.enable_processor_cache = old_enable_processor_cache
def test_parse_processor_kwargs_valid(self):
"""Test _parse_processor_kwargs with valid input"""
valid_kwargs = {"video_max_frames": 10, "video_min_frames": 1}
@@ -545,6 +809,130 @@ class TestQwen3VLProcessor(unittest.TestCase):
self.assertIn("prompt_token_ids", result)
self.assertIn("multimodal_inputs", result)
def test_process_request_dict_with_prompt_token_ids_only(self):
"""Test process_request_dict with prompt_token_ids only"""
request = {
"request_id": "12345",
"prompt_token_ids": [1, 2, 3],
}
result = self.processor.process_request_dict(request, 1024)
self.assertEqual(result["prompt_token_ids"], [1, 2, 3])
self.assertEqual(result["prompt_token_ids_len"], 3)
self.assertIsNone(result["multimodal_inputs"]["images"])
self.assertEqual(result["multimodal_inputs"]["token_type_ids"].tolist(), [0, 0, 0])
def test_process_request_dict_with_prompt_token_ids_and_messages(self):
"""Test process_request_dict with prompt_token_ids and multimodal messages"""
source_request = {
"request_id": "12345",
"messages": [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": "file://demo.jpeg"}},
{"type": "video_url", "video_url": {"url": "file://3_frame_video.mp4"}},
{"type": "text", "text": "Describe image and video."},
],
}
],
}
source_result = self.processor.process_request_dict(source_request, 1024 * 100)
token_request = {
"request_id": "12345",
"prompt_token_ids": list(source_result["prompt_token_ids"]),
"messages": copy.deepcopy(source_request["messages"]),
}
token_result = self.processor.process_request_dict(token_request, 1024 * 100)
self.assertEqual(token_result["prompt_token_ids"], source_result["prompt_token_ids"])
self.assertTrue(
np.equal(
token_result["multimodal_inputs"]["grid_thw"], source_result["multimodal_inputs"]["grid_thw"]
).all()
)
self.assertTrue(
np.equal(
token_result["multimodal_inputs"]["position_ids"],
source_result["multimodal_inputs"]["position_ids"],
).all()
)
self.assertTrue(
np.equal(
token_result["multimodal_inputs"]["image_type_ids"],
source_result["multimodal_inputs"]["image_type_ids"],
).all()
)
def test_process_request_dict_prompt_token_ids_more_multimodal_segments_than_messages(self):
"""Test prompt_token_ids path when token-side multimodal segments exceed messages"""
source_request = {
"request_id": "12345",
"messages": [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": "file://demo.jpeg"}},
{"type": "video_url", "video_url": {"url": "file://3_frame_video.mp4"}},
{"type": "text", "text": "Describe image and video."},
],
}
],
}
source_result = self.processor.process_request_dict(source_request, 1024 * 100)
token_request = {
"request_id": "12345",
"prompt_token_ids": list(source_result["prompt_token_ids"]),
"messages": [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": "file://demo.jpeg"}},
{"type": "text", "text": "Describe image and video."},
],
}
],
}
with self.assertRaisesRegex(ValueError, "more multimodal placeholder"):
self.processor.process_request_dict(token_request, 1024 * 100)
def test_process_request_dict_prompt_token_ids_unused_multimodal_messages(self):
"""Test prompt_token_ids path when messages have unused multimodal items"""
source_request = {
"request_id": "12345",
"messages": [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": "file://demo.jpeg"}},
{"type": "text", "text": "Describe image."},
],
}
],
}
source_result = self.processor.process_request_dict(source_request, 1024 * 100)
token_request = {
"request_id": "12345",
"prompt_token_ids": list(source_result["prompt_token_ids"]),
"messages": [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": "file://demo.jpeg"}},
{"type": "video_url", "video_url": {"url": "file://3_frame_video.mp4"}},
{"type": "text", "text": "Describe image."},
],
}
],
}
with self.assertRaisesRegex(ValueError, "number of multimodal items does not match"):
self.processor.process_request_dict(token_request, 1024 * 100)
def test_process_request_dict_invalid_format(self):
"""Test process_request_dict with invalid format"""
request = {"request_id": "12345"}