[Models] Add Qwen3-VL Moe Model Support (#5913)

* [Model] add Qwen3vl moe model support

* [Model] add Qwen3vl moe model support remove log

* [Model] add Qwen3vl moe model support unittest
This commit is contained in:
CSWYF3634076
2026-01-08 11:36:42 +08:00
committed by GitHub
parent d8c6ba61f3
commit d8fcb7c07d
10 changed files with 2633 additions and 5 deletions
+1
View File
@@ -64,3 +64,4 @@ repos:
- id: detect-private-key
- id: check-symlinks
- id: check-added-large-files
args: ["--maxkb=1024"]
@@ -632,7 +632,7 @@ class DataProcessor(MMBaseDataProcessor):
min_frames=min_frames,
max_frames=max_frames,
metadata=meta,
fps=fps,
fps=-1 if num_frames > 0 else fps, # num_frames first
num_frames=num_frames,
)
@@ -549,7 +549,7 @@ class DataProcessor(MMBaseDataProcessor):
min_frames=min_frames,
max_frames=max_frames,
metadata=meta,
fps=fps,
fps=-1 if num_frames > 0 else fps, # num_frames first,
num_frames=num_frames,
)
@@ -1,3 +1,19 @@
"""
# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
from __future__ import annotations
import re
@@ -0,0 +1,278 @@
"""
# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
from __future__ import annotations
import re
from typing import List, Optional
import paddle
from paddle import nn
from paddleformers.utils.log import logger
from fastdeploy.config import FDConfig
from fastdeploy.model_executor.forward_meta import ForwardMeta
from fastdeploy.model_executor.layers.embeddings import VocabParallelEmbedding
from fastdeploy.model_executor.layers.moe.moe import FusedMoE
from fastdeploy.model_executor.layers.normalization import RMSNorm
from fastdeploy.model_executor.models.model_base import ModelCategory, ModelRegistry
from fastdeploy.model_executor.models.qwen3_vl.qwen3_vl import (
Qwen3VLForConditionalGeneration,
)
from fastdeploy.model_executor.models.qwen3moe import (
Qwen3DecoderLayer as Qwen3MoeDecoderLayer,
)
# from fastdeploy.model_executor.graph_optimization.decorator import support_graph_optimization
# @support_graph_optimization
class Qwen3MoeVLModel(nn.Layer):
"""Language backbone for Qwen3-VL."""
def __init__(self, fd_config: FDConfig) -> None:
super().__init__()
self.num_layers = fd_config.model_config.num_hidden_layers
self.image_token_id = fd_config.model_config.image_token_id
self.video_token_id = fd_config.model_config.video_token_id
self._dtype = fd_config.model_config.dtype
fd_config.model_config.pretrained_config.prefix_name = "model"
self.fd_config = fd_config
self.embed_tokens = VocabParallelEmbedding(
fd_config=fd_config,
num_embeddings=fd_config.model_config.vocab_size,
embedding_dim=fd_config.model_config.hidden_size,
params_dtype=paddle.get_default_dtype,
prefix=f"{fd_config.model_config.pretrained_config.prefix_name}.embed_tokens",
)
self.layers = nn.LayerList(
[
Qwen3MoeDecoderLayer(
fd_config=fd_config,
prefix=f"{fd_config.model_config.pretrained_config.prefix_name}.layers.{i}",
)
for i in range(self.num_layers)
]
)
self.norm = RMSNorm(
fd_config,
hidden_size=fd_config.model_config.hidden_size,
eps=fd_config.model_config.rms_norm_eps,
prefix=f"{fd_config.model_config.pretrained_config.prefix_name}.norm",
)
def load_state_dict(self, state_dict):
self.embed_tokens.load_state_dict(state_dict)
self.norm.load_state_dict(state_dict)
for i in range(self.num_layers):
logger.info(f"Start load layer {i}")
self.layers[i].load_state_dict(state_dict)
def get_input_embeddings(self, ids_remove_padding: paddle.Tensor) -> paddle.Tensor:
return self.embed_tokens(ids_remove_padding=ids_remove_padding)
def forward(
self,
input_embeddings: paddle.Tensor,
ids_remove_padding: paddle.Tensor,
image_features: Optional[paddle.Tensor],
forward_meta: ForwardMeta,
deepstack_inputs: Optional[List[paddle.Tensor]] = None,
) -> paddle.Tensor:
hidden_states = input_embeddings
residual = None
for layer_id, layer in enumerate(self.layers):
hidden_states, residual = layer(
forward_meta,
hidden_states,
residual,
)
if deepstack_inputs is not None and layer_id < len(deepstack_inputs):
hidden_states = hidden_states + deepstack_inputs[layer_id]
hidden_states, _ = self.norm(hidden_states, residual)
return hidden_states
@ModelRegistry.register_model_class(
architecture="Qwen3VLMoeForConditionalGeneration",
module_name="qwen3_vl.qwen3_vl_moe",
category=ModelCategory.MULTIMODAL,
primary_use=ModelCategory.MULTIMODAL,
)
class Qwen3VLMoeForConditionalGeneration(Qwen3VLForConditionalGeneration):
def __init__(self, fd_config: FDConfig) -> None:
super().__init__(fd_config)
self.config = fd_config.model_config
self.model = Qwen3MoeVLModel(fd_config=fd_config)
@classmethod
def name(cls) -> str:
return "Qwen3VLMoeForConditionalGeneration"
def get_expert_mapping(
self,
) -> list[tuple[str, str, int, str]]:
# (param_name, weight_name, expert_id, shard_id)
return FusedMoE.make_expert_params_mapping(
num_experts=self.fd_config.model_config.num_experts,
ckpt_gate_proj_name="gate_proj",
ckpt_down_proj_name="down_proj",
ckpt_up_proj_name="up_proj",
param_gate_up_proj_name="experts.up_gate_proj_",
param_down_proj_name="experts.down_proj_",
)
def load_fused_expert_weights(
self,
name: str,
params_dict: dict,
loaded_weight: paddle.Tensor,
shard_id: str,
num_experts: int,
):
param = params_dict[name]
weight_loader = param.weight_loader
for expert_id in range(num_experts):
curr_expert_weight = loaded_weight[expert_id]
weight_loader(
param,
curr_expert_weight,
shard_id=shard_id,
expert_id=expert_id,
)
@paddle.no_grad()
def load_weights(self, weights_iterator) -> None:
"""Load model parameters from a given weights iterator."""
from fastdeploy.model_executor.utils import (
default_weight_loader,
process_weights_after_loading,
)
stacked_params_mapping = [
# (param_name, weight_name, expert_id, shard_id)
("qkv_proj", "q_proj", "q"),
("qkv_proj", "k_proj", "k"),
("qkv_proj", "v_proj", "v"),
("up_gate_proj", "gate_proj", "gate"),
("up_gate_proj", "up_proj", "up"),
("embed_tokens.embeddings", "embed_tokens", None),
("lm_head.linear", "lm_head", None),
("visual", "model.visual", None),
]
expert_params_mapping = self.get_expert_mapping() # Not actually used
params_dict = dict(self.named_parameters())
is_fused_expert = False
fused_expert_params_mapping = [
("experts.up_gate_proj_weight", "experts.gate_up_proj", 0, "gate"),
("experts.down_proj_weight", "experts.down_proj", 0, "down"),
]
num_experts = self.config.num_experts
# params_name model.embed_tokens.embeddings.weight
# weight_name model.language_model.embed_tokens.weight
process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers()), self.fd_config)
logger.info(f"[Qwen3Moe-VL] params_dict names: {list(params_dict.keys())} ")
for loaded_weight_name, loaded_weight in weights_iterator:
loaded_weight_name = loaded_weight_name.replace(".language_model", "")
for param_name, weight_name, shard_id in stacked_params_mapping:
if "experts.gate_up_proj" in loaded_weight_name or "experts.down_proj" in loaded_weight_name:
is_fused_expert = True
expert_params_mapping = fused_expert_params_mapping
if weight_name not in loaded_weight_name:
continue
if "mlp.experts" in loaded_weight_name:
continue
model_param_name = loaded_weight_name.replace(weight_name, param_name)
if model_param_name not in params_dict:
continue
param = params_dict[model_param_name]
weight_loader = getattr(param, "weight_loader", default_weight_loader(self.fd_config))
weight_loader(param, loaded_weight, shard_id)
break
else:
is_expert_weight = False
# Note: mlp.experts.gate_up_proj in qwen3moe_vl is merged and should be processed separately when loading weights
for mapping in expert_params_mapping:
param_name, weight_name, expert_id, shard_id = mapping
if weight_name not in loaded_weight_name:
continue
# Anyway, this is an expert weight and should not be
# attempted to load as other weights later
is_expert_weight = True
model_param_name = loaded_weight_name.replace(weight_name, param_name)
if is_fused_expert:
loaded_weight = loaded_weight.transpose(-1, -2)
if "experts.gate_up_proj" in loaded_weight_name:
gate_weight, up_weight = loaded_weight.chunk(2, dim=-2)
self.load_fused_expert_weights(
model_param_name,
params_dict,
gate_weight,
"gate",
num_experts,
)
self.load_fused_expert_weights(
model_param_name,
params_dict,
up_weight,
"up",
num_experts,
)
else:
# down_proj
self.load_fused_expert_weights(
model_param_name,
params_dict,
loaded_weight,
shard_id,
num_experts,
)
break
else:
if weight_name not in loaded_weight_name:
continue
model_param_name = loaded_weight_name.replace(weight_name, param_name)
if model_param_name not in params_dict:
continue
param = params_dict[model_param_name]
weight_loader = param.weight_loader
weight_loader(param, loaded_weight, shard_id=shard_id, expert_id=expert_id)
break
else:
if is_expert_weight:
continue
model_param_name = loaded_weight_name
if model_param_name not in params_dict:
continue
param = params_dict[model_param_name]
weight_loader = getattr(param, "weight_loader", default_weight_loader(self.fd_config))
weight_loader(param, loaded_weight)
model_sublayer_name = re.sub(r"\.(up_gate_proj_weight|down_proj_weight|weight)$", "", model_param_name)
process_weights_after_loading_fn(model_sublayer_name, param)
if self.tie_word_embeddings:
self.lm_head.linear.weight.set_value(
self.model.embed_tokens.embeddings.weight.transpose([1, 0]).astype(self.lm_head.linear.weight.dtype)
)
+72
View File
@@ -51,6 +51,9 @@ from fastdeploy.model_executor.models.qwen3_vl.qwen3_vl import (
Qwen3VLForConditionalGeneration,
Qwen3VLPretrainedModel,
)
from fastdeploy.model_executor.models.qwen3_vl.qwen3_vl_moe import (
Qwen3VLMoeForConditionalGeneration,
)
from fastdeploy.model_executor.models.qwen3moe import (
Qwen3MoeForCausalLM,
Qwen3MoePretrainedModel,
@@ -613,6 +616,75 @@ class Qwen3VLForConditionalGenerationRL(Qwen3VLForConditionalGeneration, BaseRLM
return self.infer_to_train_mapping
class Qwen3VLMoeForConditionalGenerationRL(Qwen3VLMoeForConditionalGeneration, BaseRLModel):
"""
Qwen3VLMoeForConditionalGenerationRL
"""
def __init__(self, fd_config: FDConfig):
"""
Args:
fd_config (FDConfig): Configurations for the LLM model.
"""
super(Qwen3VLMoeForConditionalGenerationRL, self).__init__(fd_config)
@classmethod
def name(self) -> str:
"""name"""
return "Qwen3VLMoeForConditionalGenerationRL"
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
if self._mappings_built:
return self.infer_to_train_mapping
self.infer_to_train_mapping = {}
self._mappings_built = True
# Prepare placeholders
place_holders = ["weight"]
# Initialize mapping dictionary
self._update_base_mappings("model")
base_name = "model.layers"
# Helper function to add layer mappings
def _add_layer_mappings(layer_idx: int):
# MoE specific mappings
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.gate.weight"] = (
f"{base_name}.{layer_idx}.mlp.gate.weight"
)
if self.fd_config.model_config.moe_use_aux_free:
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.experts.gate_correction_bias"] = (
f"{base_name}.{layer_idx}.mlp.moe_statics.e_score_correction_bias"
)
# MoE experts mappings
for expert_idx in range(self.fd_config.model_config.num_experts):
for ph in place_holders:
# up_gate_proj (up_gate_proj)
up_gate_proj_key = f"{base_name}.{layer_idx}.mlp.experts.up_gate_proj_weight"
if up_gate_proj_key not in self.infer_to_train_mapping:
self.infer_to_train_mapping[up_gate_proj_key] = []
self.infer_to_train_mapping[up_gate_proj_key].append(
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.up_gate_proj.{ph}"
)
# down_proj (down_proj)
down_proj_key = f"{base_name}.{layer_idx}.mlp.experts.down_proj_weight"
if down_proj_key not in self.infer_to_train_mapping:
self.infer_to_train_mapping[down_proj_key] = []
self.infer_to_train_mapping[down_proj_key].append(
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.down_proj.{ph}"
)
for layer_idx in range(self.fd_config.model_config.num_hidden_layers):
_add_layer_mappings(layer_idx)
self._complete_missing_mappings()
return self.infer_to_train_mapping
class Glm4MoeForCausalLMRL(Glm4MoeForCausalLM, BaseRLModel):
"""
Glm4MoeForCausalLMRL
File diff suppressed because one or more lines are too long
@@ -0,0 +1,68 @@
# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
import sys
def test_rollout_model_with_distributed_launch():
"""
test_rollout_model
"""
current_dir = os.path.dirname(os.path.abspath(__file__))
utils_dir = os.path.join(os.path.dirname(current_dir), "utils")
rollout_script = os.path.join(utils_dir, "rollout_model.py")
baseline_path = os.path.join(current_dir, "baseline.txt")
base_path = os.getenv("MODEL_PATH")
if base_path:
model_path = os.path.join(base_path, "Qwen3-VL-30B-A3B-Instruct")
else:
model_path = "./Qwen3-VL-30B-A3B-Instruct"
command = [
sys.executable,
"-m",
"paddle.distributed.launch",
"--gpus",
"0,1",
rollout_script,
"--model_path",
model_path,
"--baseline_path",
baseline_path,
"--enable_mm",
"--quantization",
"wint8",
]
print(f"Executing command: {' '.join(command)}")
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
try:
stdout, stderr = process.communicate(timeout=300)
return_code = process.returncode
except subprocess.TimeoutExpired:
process.kill()
stdout, stderr = process.communicate()
return_code = -1
print("\n" + "=" * 50 + " STDOUT " + "=" * 50)
print(stdout)
print("\n" + "=" * 50 + " STDERR " + "=" * 50)
print(stderr)
assert return_code != 1, f"Process exited with code {return_code}"
+430
View File
@@ -0,0 +1,430 @@
# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import signal
import subprocess
import sys
import time
import openai
import pytest
import requests
from utils.serving_utils import (
FD_API_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
is_port_open,
)
@pytest.fixture(scope="session", autouse=True)
def setup_and_run_server():
"""
Pytest fixture that runs once per test session:
- Cleans ports before tests
- Starts the API server as a subprocess
- Waits for server port to open (up to 30 seconds)
- Tears down server after all tests finish
"""
print("Pre-test port cleanup...")
clean_ports()
model_path = "/ModelData/Qwen3-VL-30B-A3B-Instruct"
log_path = "server.log"
limit_mm_str = json.dumps({"image": 100, "video": 100})
cmd = [
sys.executable,
"-m",
"fastdeploy.entrypoints.openai.api_server",
"--model",
model_path,
"--port",
str(FD_API_PORT),
"--tensor-parallel-size",
"2",
"--engine-worker-queue-port",
str(FD_ENGINE_QUEUE_PORT),
"--metrics-port",
str(FD_METRICS_PORT),
"--max-model-len",
"32768",
"--max-num-seqs",
"128",
"--limit-mm-per-prompt",
limit_mm_str,
]
print(cmd)
# Start subprocess in new process group
with open(log_path, "w") as logfile:
process = subprocess.Popen(
cmd,
stdout=logfile,
stderr=subprocess.STDOUT,
start_new_session=True, # Enables killing full group via os.killpg
)
print(f"Started API server with pid {process.pid}")
# Wait up to 10 minutes for API server to be ready
for _ in range(10 * 60):
if is_port_open("127.0.0.1", FD_API_PORT):
print(f"API server is up on port {FD_API_PORT}")
break
time.sleep(1)
else:
print("[TIMEOUT] API server failed to start in 10 minutes. Cleaning up...")
try:
os.killpg(process.pid, signal.SIGTERM)
except Exception as e:
print(f"Failed to kill process group: {e}")
raise RuntimeError(f"API server did not start on port {FD_API_PORT}")
yield # Run tests
print("\n===== Post-test server cleanup... =====")
try:
os.killpg(process.pid, signal.SIGTERM)
print(f"API server (pid={process.pid}) terminated")
except Exception as e:
print(f"Failed to terminate API server: {e}")
@pytest.fixture(scope="session")
def api_url(request):
"""
Returns the API endpoint URL for chat completions.
"""
return f"http://0.0.0.0:{FD_API_PORT}/v1/chat/completions"
@pytest.fixture(scope="session")
def metrics_url(request):
"""
Returns the metrics endpoint URL.
"""
return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics"
@pytest.fixture
def headers():
"""
Returns common HTTP request headers.
"""
return {"Content-Type": "application/json"}
@pytest.fixture
def consistent_payload():
"""
Returns a fixed payload for consistency testing,
including a fixed random seed and temperature.
"""
return {
"messages": [
{
"role": "user",
"content": [
{
"type": "video_url",
"video_url": {
"url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_video/example_video.mp4",
"detail": "high",
},
},
{"type": "text", "text": "视频中手机支架的颜色是什么?"},
],
}
],
"temperature": 0,
"top_p": 1, # fix top_p to reduce randomness
"seed": 13, # fixed random seed
"max_tokens": 32,
}
# ==========================
# Consistency test for repeated runs with fixed payload
# ==========================
def test_consistency_between_runs(api_url, headers, consistent_payload):
"""
Test that result is same as the base result.
"""
print("test_consistency_between_runs")
# request
resp1 = requests.post(api_url, headers=headers, json=consistent_payload)
assert resp1.status_code == 200
result1 = resp1.json()
content1 = result1["choices"][0]["message"]["content"]
file_res_temp = "Qwen3-VL-4B-Instruct-temp"
f_o = open(file_res_temp, "a")
f_o.writelines(content1)
f_o.close()
# base result
content2 = "根据您提供的视频帧,我们可以观"
# Verify that result is same as the base result
assert content1.startswith(content2), content1
# ==========================
# OpenAI Client Chat Completion Test
# ==========================
@pytest.fixture
def openai_client():
ip = "0.0.0.0"
service_http_port = str(FD_API_PORT)
client = openai.Client(
base_url=f"http://{ip}:{service_http_port}/v1",
api_key="EMPTY_API_KEY",
)
return client
# Non-streaming test
def test_non_streaming_chat(openai_client):
"""Test non-streaming chat functionality with the local service"""
print("test_non_streaming_chat")
response = openai_client.chat.completions.create(
model="default",
messages=[
{
"role": "system",
"content": "You are a helpful AI assistant.",
}, # system不是必需,可选
{
"role": "user",
"content": [
{
"type": "video_url",
"video_url": {
"url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_video/example_video.mp4",
"detail": "high",
},
},
{"type": "text", "text": "视频中手机支架的颜色是什么?"},
],
},
],
temperature=1,
max_tokens=53,
stream=False,
)
assert hasattr(response, "choices")
assert len(response.choices) > 0
assert hasattr(response.choices[0], "message")
assert hasattr(response.choices[0].message, "content")
# Streaming test
def test_streaming_chat(openai_client, capsys):
"""Test streaming chat functionality with the local service"""
print("test_streaming_chat")
response = openai_client.chat.completions.create(
model="default",
messages=[
{
"role": "system",
"content": "You are a helpful AI assistant.",
}, # system不是必需,可选
{"role": "user", "content": "List 3 countries and their capitals."},
{
"role": "assistant",
"content": "China(Beijing), France(Paris), Australia(Canberra).",
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": "https://ku.baidu-int.com/vk-assets-ltd/space/2024/09/13/933d1e0a0760498e94ec0f2ccee865e0",
"detail": "high",
},
},
{"type": "text", "text": "请描述图片内容"},
],
},
],
temperature=1,
max_tokens=512,
stream=True,
)
output = []
for chunk in response:
if hasattr(chunk.choices[0], "delta") and hasattr(chunk.choices[0].delta, "content"):
output.append(chunk.choices[0].delta.content)
assert len(output) > 2
# ==========================
# OpenAI Client additional chat/completions test
# ==========================
def test_non_streaming_chat_with_return_token_ids(openai_client, capsys):
"""
Test return_token_ids option in non-streaming chat functionality with the local service
"""
print("test_non_streaming_chat_with_return_token_ids")
# 设定 return_token_ids
response = openai_client.chat.completions.create(
model="default",
messages=[
{"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_images/example2.jpg",
"detail": "high",
},
},
{"type": "text", "text": "请描述图片内容"},
],
},
],
temperature=1,
max_tokens=53,
extra_body={"return_token_ids": True},
stream=False,
)
assert hasattr(response, "choices")
assert len(response.choices) > 0
assert hasattr(response.choices[0], "message")
assert hasattr(response.choices[0].message, "prompt_token_ids")
assert isinstance(response.choices[0].message.prompt_token_ids, list)
assert hasattr(response.choices[0].message, "completion_token_ids")
assert isinstance(response.choices[0].message.completion_token_ids, list)
# 不设定 return_token_ids
response = openai_client.chat.completions.create(
model="default",
messages=[
{"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_images/example2.jpg",
"detail": "high",
},
},
{"type": "text", "text": "请描述图片内容"},
],
},
],
temperature=1,
max_tokens=53,
extra_body={"return_token_ids": False},
stream=False,
)
assert hasattr(response, "choices")
assert len(response.choices) > 0
assert hasattr(response.choices[0], "message")
assert hasattr(response.choices[0].message, "prompt_token_ids")
assert response.choices[0].message.prompt_token_ids is None
assert hasattr(response.choices[0].message, "completion_token_ids")
assert response.choices[0].message.completion_token_ids is None
def test_streaming_chat_with_return_token_ids(openai_client, capsys):
"""
Test return_token_ids option in streaming chat functionality with the local service
"""
print("test_streaming_chat_with_return_token_ids")
# enable return_token_ids
response = openai_client.chat.completions.create(
model="default",
messages=[
{"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_images/example2.jpg",
"detail": "high",
},
},
{"type": "text", "text": "请描述图片内容"},
],
},
],
temperature=1,
max_tokens=53,
extra_body={"return_token_ids": True},
stream=True,
)
is_first_chunk = True
for chunk in response:
assert hasattr(chunk, "choices")
assert len(chunk.choices) > 0
assert hasattr(chunk.choices[0], "delta")
assert hasattr(chunk.choices[0].delta, "prompt_token_ids")
assert hasattr(chunk.choices[0].delta, "completion_token_ids")
if is_first_chunk:
is_first_chunk = False
assert isinstance(chunk.choices[0].delta.prompt_token_ids, list)
assert chunk.choices[0].delta.completion_token_ids is None
else:
assert chunk.choices[0].delta.prompt_token_ids is None
assert isinstance(chunk.choices[0].delta.completion_token_ids, list)
# disable return_token_ids
response = openai_client.chat.completions.create(
model="default",
messages=[
{"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_images/example2.jpg",
"detail": "high",
},
},
{"type": "text", "text": "请描述图片内容"},
],
},
],
temperature=1,
max_tokens=53,
extra_body={"return_token_ids": False},
stream=True,
)
for chunk in response:
assert hasattr(chunk, "choices")
assert len(chunk.choices) > 0
assert hasattr(chunk.choices[0], "delta")
assert hasattr(chunk.choices[0].delta, "prompt_token_ids")
assert chunk.choices[0].delta.prompt_token_ids is None
assert hasattr(chunk.choices[0].delta, "completion_token_ids")
assert chunk.choices[0].delta.completion_token_ids is None
+2 -3
View File
@@ -24,8 +24,8 @@ import pytest
import requests
from utils.serving_utils import (
FD_API_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
PORTS_TO_CLEAN,
clean_ports,
is_port_open,
)
@@ -41,8 +41,7 @@ def setup_and_run_server():
- Tears down server after all tests finish
"""
print("Pre-test port cleanup...")
FD_ENGINE_QUEUE_PORT = 8033
clean_ports(PORTS_TO_CLEAN.append(FD_ENGINE_QUEUE_PORT))
clean_ports()
model_path = "/ModelData/Qwen3-VL-4B-Instruct"