fix paddleformers fallback (#6465)

This commit is contained in:
jackyYang6
2026-02-23 15:29:13 +08:00
committed by GitHub
parent e2332a1112
commit 38c3e02470
4 changed files with 714 additions and 302 deletions
+478 -99
View File
@@ -17,6 +17,7 @@ Focused tests to increase coverage of base.py
Tests actual code paths that were previously uncovered.
"""
import inspect
import json
import os
import shutil
@@ -24,6 +25,7 @@ import tempfile
from types import SimpleNamespace
from unittest.mock import MagicMock, Mock, patch
import numpy as np
import paddle
import pytest
from paddle import nn
@@ -364,11 +366,21 @@ class TestAttentionForward:
)
mock_attention = MagicMock()
mock_attention.num_heads = 32
mock_attention.num_key_value_heads = 32
mock_attention.forward = Mock(return_value=paddle.randn([10, 128 * 32]))
forward_meta = SimpleNamespace(rotary_embs=None)
module = SimpleNamespace(
config=SimpleNamespace(attention_instances={0: mock_attention}, forward_meta=forward_meta), layer_idx=0
config=SimpleNamespace(
attention_instances={0: mock_attention},
forward_meta=forward_meta,
num_attention_heads=32,
num_key_value_heads=32,
),
layer_idx=0,
num_heads=32,
num_key_value_heads=32,
)
query = paddle.randn([1, 32, 10, 128])
@@ -408,11 +420,21 @@ class TestAttentionForward:
)
mock_attention = MagicMock()
mock_attention.num_heads = 32
mock_attention.num_key_value_heads = 32
mock_attention.forward = Mock(return_value=paddle.randn([10, 128 * 32]))
forward_meta = SimpleNamespace(rotary_embs=None)
module = SimpleNamespace(
config=SimpleNamespace(attention_instances={0: mock_attention}, forward_meta=forward_meta), layer_idx=0
config=SimpleNamespace(
attention_instances={0: mock_attention},
forward_meta=forward_meta,
num_attention_heads=32,
num_key_value_heads=32,
),
layer_idx=0,
num_heads=32,
num_key_value_heads=32,
)
query = paddle.randn([1, 32, 10, 128])
@@ -851,70 +873,151 @@ class TestRecursiveReplace:
class TestAttentionForwardEdgeCases:
"""Test fastdeploy_append_attention_forward edge cases to cover lines 117, 130-135."""
"""Test fastdeploy_append_attention_forward with joint QKV layout strategy."""
def test_3d_tensor_input(self):
"""Test flatten_to_sd with 3D tensor input (line 117)."""
@staticmethod
def _flatten_layout(t: paddle.Tensor, layout: str) -> paddle.Tensor:
"""按给定 layout 将 Q/K/V 拉平成 [S, H*D]。"""
t3 = t.squeeze(0) if t.ndim == 4 else t
if layout == "hsd":
return t3.transpose([1, 0, 2]).reshape([int(t3.shape[1]), -1])
if layout == "shd":
return t3.reshape([int(t3.shape[0]), -1])
raise ValueError(f"Unsupported layout: {layout}")
def _assert_qkv_concat_matches_known_layout(
self,
qkv: paddle.Tensor,
query: paddle.Tensor,
key: paddle.Tensor,
value: paddle.Tensor,
) -> None:
"""验证输出确实匹配已知 flatten 规则(新/旧实现)。"""
matched_layouts = []
for layout in ("shd", "hsd"):
q_flat = self._flatten_layout(query, layout)
k_flat = self._flatten_layout(key, layout)
v_flat = self._flatten_layout(value, layout)
q_seq, k_seq, v_seq = int(q_flat.shape[0]), int(k_flat.shape[0]), int(v_flat.shape[0])
if not (q_seq == k_seq == v_seq == int(qkv.shape[0])):
continue
q_width, k_width, v_width = int(q_flat.shape[1]), int(k_flat.shape[1]), int(v_flat.shape[1])
if q_width + k_width + v_width != int(qkv.shape[1]):
continue
q_part = qkv[:, :q_width]
k_part = qkv[:, q_width : q_width + k_width]
v_part = qkv[:, q_width + k_width :]
if (
bool(paddle.allclose(q_part, q_flat))
and bool(paddle.allclose(k_part, k_flat))
and bool(paddle.allclose(v_part, v_flat))
):
matched_layouts.append(layout)
# 兼容旧实现:以 query 的 seq_len 为基准对 K/V 做 fallback flatten。
def _legacy_flatten(t: paddle.Tensor, seq_len: int) -> paddle.Tensor:
if t.ndim == 3:
return t.reshape([int(t.shape[0]), -1])
t3 = t.squeeze(0)
dim1, dim2 = int(t3.shape[0]), int(t3.shape[1])
if dim2 == seq_len:
return t3.transpose([1, 0, 2]).reshape([seq_len, -1])
if dim1 == seq_len:
return t3.reshape([seq_len, -1])
return t3.transpose([1, 0, 2]).reshape([seq_len, -1])
legacy_seq = int(query.shape[-2]) if query.ndim == 4 else int(query.shape[0])
q_legacy = _legacy_flatten(query, legacy_seq)
k_legacy = _legacy_flatten(key, legacy_seq)
v_legacy = _legacy_flatten(value, legacy_seq)
if int(q_legacy.shape[0]) == int(k_legacy.shape[0]) == int(v_legacy.shape[0]) == int(qkv.shape[0]) and int(
q_legacy.shape[1]
) + int(k_legacy.shape[1]) + int(v_legacy.shape[1]) == int(qkv.shape[1]):
q_width = int(q_legacy.shape[1])
k_width = int(k_legacy.shape[1])
if (
bool(paddle.allclose(qkv[:, :q_width], q_legacy))
and bool(paddle.allclose(qkv[:, q_width : q_width + k_width], k_legacy))
and bool(paddle.allclose(qkv[:, q_width + k_width :], v_legacy))
):
matched_layouts.append("legacy_query_seq")
assert matched_layouts, (
"QKV output does not match known flatten rules (SHD/HSD/legacy_query_seq). "
f"qkv_shape={list(qkv.shape)}, query={list(query.shape)}, key={list(key.shape)}, value={list(value.shape)}"
)
@staticmethod
def _run_attention(
query: paddle.Tensor,
key: paddle.Tensor,
value: paddle.Tensor,
num_heads: int | None = None,
num_kv_heads: int | None = None,
expected_seq_len: int | None = None,
):
from fastdeploy.model_executor.models.paddleformers.base import (
fastdeploy_append_attention_forward,
)
mock_attention = MagicMock()
mock_attention.forward = Mock(return_value=paddle.randn([10, 128 * 32]))
captured = {}
def fake_forward(qkv, forward_meta):
captured["qkv"] = qkv
return paddle.zeros([qkv.shape[0], qkv.shape[1] // 3], dtype=qkv.dtype)
mock_attention = SimpleNamespace(
forward=Mock(side_effect=fake_forward),
)
if num_heads is not None:
mock_attention.num_heads = num_heads
if num_kv_heads is not None:
mock_attention.num_key_value_heads = num_kv_heads
forward_meta = SimpleNamespace(rotary_embs=None)
if expected_seq_len is not None:
forward_meta.ids_remove_padding = paddle.arange(expected_seq_len, dtype="int64")
module = SimpleNamespace(
config=SimpleNamespace(attention_instances={0: mock_attention}, forward_meta=forward_meta), layer_idx=0
)
config = SimpleNamespace(attention_instances={0: mock_attention}, forward_meta=forward_meta)
if num_heads is not None:
config.num_attention_heads = num_heads
if num_kv_heads is not None:
config.num_key_value_heads = num_kv_heads
config.kv_num_heads = num_kv_heads
# Use 3D tensors [S, H, D] instead of 4D
query = paddle.randn([10, 32, 128])
key = paddle.randn([10, 32, 128])
value = paddle.randn([10, 32, 128])
attention_mask = paddle.ones([1, 10])
module = SimpleNamespace(config=config, layer_idx=0)
if num_heads is not None:
module.num_heads = num_heads
if num_kv_heads is not None:
module.num_key_value_heads = num_kv_heads
module.kv_num_heads = num_kv_heads
output, _ = fastdeploy_append_attention_forward(module, query, key, value, attention_mask)
mask_seq = expected_seq_len if expected_seq_len is not None else int(query.shape[-2])
attention_mask = paddle.ones([1, int(mask_seq)], dtype=query.dtype)
assert mock_attention.forward.called
def test_seq_first_4d_tensor(self):
"""Test flatten_to_sd with [1, S, H, D] shape (lines 130-132)."""
from fastdeploy.model_executor.models.paddleformers.base import (
fastdeploy_append_attention_forward,
)
mock_attention = MagicMock()
mock_attention.forward = Mock(return_value=paddle.randn([10, 128 * 32]))
forward_meta = SimpleNamespace(rotary_embs=None)
module = SimpleNamespace(
config=SimpleNamespace(attention_instances={0: mock_attention}, forward_meta=forward_meta), layer_idx=0
)
# Use [1, S, H, D] instead of [1, H, S, D]
query = paddle.randn([1, 10, 32, 128])
key = paddle.randn([1, 10, 32, 128])
value = paddle.randn([1, 10, 32, 128])
attention_mask = paddle.ones([1, 10])
output, _ = fastdeploy_append_attention_forward(module, query, key, value, attention_mask)
assert mock_attention.forward.called
out, _ = fastdeploy_append_attention_forward(module, query, key, value, attention_mask)
assert isinstance(out, paddle.Tensor)
return captured["qkv"]
def test_invalid_tensor_dims_raises_error(self):
"""Test that invalid tensor dims raise ValueError (line 119)."""
"""Invalid dimensions (2D) should fail with tensor rank error."""
from fastdeploy.model_executor.models.paddleformers.base import (
fastdeploy_append_attention_forward,
)
mock_attention = MagicMock()
forward_meta = SimpleNamespace(rotary_embs=None)
module = SimpleNamespace(
config=SimpleNamespace(attention_instances={0: mock_attention}, forward_meta=forward_meta), layer_idx=0
config=SimpleNamespace(
attention_instances={0: SimpleNamespace(forward=Mock(return_value=paddle.zeros([1, 1])))},
forward_meta=SimpleNamespace(rotary_embs=None),
num_attention_heads=2,
),
layer_idx=0,
)
# Use 2D tensors (invalid - neither 3 nor 4 dims)
query = paddle.randn([10, 128])
key = paddle.randn([10, 128])
value = paddle.randn([10, 128])
@@ -923,66 +1026,109 @@ class TestAttentionForwardEdgeCases:
with pytest.raises(ValueError, match="unexpected dims"):
fastdeploy_append_attention_forward(module, query, key, value, attention_mask)
def test_key_value_seq_first_format(self):
"""Test flatten_to_sd with key/value in [1, S, H, D] format (lines 130-132).
def test_bhsd_data_correctness(self):
"""BHSD [B,H,S,D] should be flattened as [S, H*D]."""
query = paddle.to_tensor(np.arange(24, dtype=np.float32).reshape([1, 2, 3, 4]))
key = paddle.to_tensor((np.arange(24, dtype=np.float32) + 100).reshape([1, 2, 3, 4]))
value = paddle.to_tensor((np.arange(24, dtype=np.float32) + 200).reshape([1, 2, 3, 4]))
seq_len is computed from query.shape[-2]. If key/value have dim1 == seq_len,
they hit the elif branch (lines 130-132).
"""
qkv = self._run_attention(query, key, value, num_heads=2, num_kv_heads=2, expected_seq_len=3)
expected_q = query.squeeze(0).transpose([1, 0, 2]).reshape([3, -1])
expected_k = key.squeeze(0).transpose([1, 0, 2]).reshape([3, -1])
expected_v = value.squeeze(0).transpose([1, 0, 2]).reshape([3, -1])
q_width = expected_q.shape[1]
k_width = expected_k.shape[1]
assert paddle.allclose(qkv[:, :q_width], expected_q)
assert paddle.allclose(qkv[:, q_width : q_width + k_width], expected_k)
assert paddle.allclose(qkv[:, q_width + k_width :], expected_v)
def test_bshd_data_correctness(self):
"""BSHD [B,S,H,D] should be flattened as [S, H*D]."""
query = paddle.to_tensor(np.arange(24, dtype=np.float32).reshape([1, 3, 2, 4]))
key = paddle.to_tensor((np.arange(24, dtype=np.float32) + 100).reshape([1, 3, 2, 4]))
value = paddle.to_tensor((np.arange(24, dtype=np.float32) + 200).reshape([1, 3, 2, 4]))
qkv = self._run_attention(query, key, value, num_heads=2, num_kv_heads=2, expected_seq_len=3)
self._assert_qkv_concat_matches_known_layout(qkv, query, key, value)
def test_joint_layout_with_gqa(self):
"""Q uses num_heads while K/V use num_kv_heads, and layout is selected jointly."""
# BSHD tensors: Q heads=4, KV heads=2, seq=3, head_dim=2
query = paddle.to_tensor(np.arange(24, dtype=np.float32).reshape([1, 3, 4, 2]))
key = paddle.to_tensor((np.arange(12, dtype=np.float32) + 100).reshape([1, 3, 2, 2]))
value = paddle.to_tensor((np.arange(12, dtype=np.float32) + 200).reshape([1, 3, 2, 2]))
qkv = self._run_attention(query, key, value, num_heads=4, num_kv_heads=2, expected_seq_len=3)
self._assert_qkv_concat_matches_known_layout(qkv, query, key, value)
def test_joint_layout_with_tp_local_heads(self):
"""TP 场景下 local heads 也应被识别为合法布局。"""
# global: q=8, kv=4; local(TP=2): q=4, kv=2
query = paddle.to_tensor(np.arange(40, dtype=np.float32).reshape([1, 4, 5, 2]))
key = paddle.to_tensor((np.arange(20, dtype=np.float32) + 100).reshape([1, 2, 5, 2]))
value = paddle.to_tensor((np.arange(20, dtype=np.float32) + 200).reshape([1, 2, 5, 2]))
qkv = self._run_attention(query, key, value, num_heads=8, num_kv_heads=4, expected_seq_len=5)
self._assert_qkv_concat_matches_known_layout(qkv, query, key, value)
def test_gqa_shd_layout_detection(self):
"""GQA with SHD layout: num_heads in dim1 should be detected as shd."""
# shape_3d=(5,3,2): if num_heads=3, num_kv_heads=3, then dim1=3 matches -> shd
query = paddle.to_tensor(np.arange(30, dtype=np.float32).reshape([1, 5, 3, 2]))
key = paddle.to_tensor((np.arange(30, dtype=np.float32) + 100).reshape([1, 5, 3, 2]))
value = paddle.to_tensor((np.arange(30, dtype=np.float32) + 200).reshape([1, 5, 3, 2]))
# num_heads=3 matches dim1, so it's SHD layout
qkv = self._run_attention(query, key, value, num_heads=3, num_kv_heads=3, expected_seq_len=5)
self._assert_qkv_concat_matches_known_layout(qkv, query, key, value)
def test_ambiguous_h_equals_s_defaults_to_hsd(self):
"""When both layouts are valid (S=H), default should be hsd (BHSD/HSD-style)."""
# Ambiguous shape [1,3,3,2]: both hsd/shd valid, policy defaults to hsd.
query = paddle.to_tensor(np.arange(18, dtype=np.float32).reshape([1, 3, 3, 2]))
key = paddle.to_tensor((np.arange(18, dtype=np.float32) + 100).reshape([1, 3, 3, 2]))
value = paddle.to_tensor((np.arange(18, dtype=np.float32) + 200).reshape([1, 3, 3, 2]))
qkv = self._run_attention(query, key, value, num_heads=3, num_kv_heads=3, expected_seq_len=3)
expected_q_hsd = query.squeeze(0).transpose([1, 0, 2]).reshape([3, -1])
expected_q_shd = query.squeeze(0).reshape([3, -1])
q_width = expected_q_hsd.shape[1]
assert paddle.allclose(qkv[:, :q_width], expected_q_hsd)
assert not paddle.allclose(qkv[:, :q_width], expected_q_shd)
def test_mismatched_layout_raises(self):
"""If Q/K/V shapes don't match expected heads/layout, raise error."""
from fastdeploy.model_executor.models.paddleformers.base import (
fastdeploy_append_attention_forward,
)
mock_attention = MagicMock()
mock_attention.forward = Mock(return_value=paddle.randn([10, 128 * 32]))
forward_meta = SimpleNamespace(rotary_embs=None)
mock_attention = SimpleNamespace(
num_heads=2,
num_key_value_heads=2,
forward=Mock(return_value=paddle.zeros([1, 1])),
)
module = SimpleNamespace(
config=SimpleNamespace(attention_instances={0: mock_attention}, forward_meta=forward_meta), layer_idx=0
config=SimpleNamespace(
attention_instances={0: mock_attention},
forward_meta=SimpleNamespace(),
num_attention_heads=2,
num_key_value_heads=2,
),
layer_idx=0,
)
# query: [1, 32, 10, 128] → seq_len = 10 (from shape[-2])
# key/value: [1, 10, 32, 128] → dim1=10, dim2=32
# For key/value: dim2 (32) != seq_len (10), but dim1 (10) == seq_len (10)
# This triggers lines 130-132!
query = paddle.randn([1, 32, 10, 128])
key = paddle.randn([1, 10, 32, 128]) # Swapped dimensions
value = paddle.randn([1, 10, 32, 128])
attention_mask = paddle.ones([1, 10])
# 构造明显不一致的 K/V 形状,确保无论新旧布局策略都会失败。
query = paddle.randn([1, 2, 3, 4])
key = paddle.randn([1, 4, 5, 4])
value = paddle.randn([1, 4, 5, 4])
attention_mask = paddle.ones([1, 3], dtype=query.dtype)
output, _ = fastdeploy_append_attention_forward(module, query, key, value, attention_mask)
assert mock_attention.forward.called
def test_key_value_fallback_format(self):
"""Test flatten_to_sd fallback when neither dim matches seq_len (lines 133-135).
seq_len is computed from query.shape[-2]. If key/value have neither dim1 nor dim2
equal to seq_len, they hit the else fallback (lines 133-135).
"""
from fastdeploy.model_executor.models.paddleformers.base import (
fastdeploy_append_attention_forward,
)
mock_attention = MagicMock()
mock_attention.forward = Mock(return_value=paddle.randn([10, 128 * 5]))
forward_meta = SimpleNamespace(rotary_embs=None)
module = SimpleNamespace(
config=SimpleNamespace(attention_instances={0: mock_attention}, forward_meta=forward_meta), layer_idx=0
)
# query: [1, 32, 10, 128] → seq_len = 10 (from shape[-2])
# key/value: [1, 5, 8, 128] → dim1=5 != 10, dim2=8 != 10
# Neither matches, triggers fallback lines 133-135
query = paddle.randn([1, 32, 10, 128])
key = paddle.randn([1, 5, 8, 128]) # Neither dim matches seq_len=10
value = paddle.randn([1, 5, 8, 128])
attention_mask = paddle.ones([1, 10])
output, _ = fastdeploy_append_attention_forward(module, query, key, value, attention_mask)
assert mock_attention.forward.called
with pytest.raises(ValueError):
fastdeploy_append_attention_forward(module, query, key, value, attention_mask)
class TestRecursiveReplaceAdvanced:
@@ -1888,6 +2034,239 @@ class TestLoadWeights:
fused_weight = call_args[0][1]
assert sorted(fused_weight.shape) == [4096, 12288]
def test_load_fused_qkv_weights_torch_writeback_shape(self, mock_fd_config):
"""Torch model_format should write fused qkv weight in storage layout [out, in]."""
from fastdeploy.model_executor.models.paddleformers.base import (
PaddleFormersModelBase,
)
fd_config, _ = mock_fd_config
fd_config.model_config.model_format = "torch"
fd_config.model_config.num_key_value_heads = 8
fd_config.model_config.num_attention_heads = 32
fd_config.model_config.hidden_size = 4096
fd_config.model_config.head_dim = 128
class TestModel(PaddleFormersModelBase):
pass
def mock_layer_init(self, *args, **kwargs):
self._sub_layers = {}
self._parameters = {}
self._buffers = {}
self._loaddict_holder = {}
with (
patch.object(nn.Layer, "__init__", mock_layer_init),
patch.object(TestModel, "create_attention_instances", return_value={}),
):
model = TestModel(fd_config)
model.fd_config = fd_config
model._use_fused_qkv = True
model._use_fused_ffn = False
qkv_param = MagicMock(spec=paddle.Tensor)
# torch storage layout: [out, in]
qkv_param.shape = [6144, 4096]
qkv_param.weight_loader = Mock()
params_dict = {"model.layers.0.self_attn.qkv_proj.weight": qkv_param}
model.named_parameters = Mock(return_value=params_dict.items())
model.named_sublayers = Mock(return_value={}.items())
q_weight = paddle.randn([4096, 4096]) # torch source layout [out, in] (square here)
k_weight = paddle.randn([1024, 4096]) # torch source layout [out, in]
v_weight = paddle.randn([1024, 4096]) # torch source layout [out, in]
weights = [
("model.layers.0.self_attn.q_proj.weight", q_weight),
("model.layers.0.self_attn.k_proj.weight", k_weight),
("model.layers.0.self_attn.v_proj.weight", v_weight),
]
model.load_weights(weights)
assert qkv_param.weight_loader.called
fused_weight_for_load = qkv_param.weight_loader.call_args[0][1]
assert list(fused_weight_for_load.shape) == [6144, 4096]
def test_load_fused_qkv_weights_strict_torch_mismatched_source_raises(self, mock_fd_config):
"""Strict torch policy should raise when source tensors are in paddle layout."""
from fastdeploy.model_executor.models.paddleformers.base import (
PaddleFormersModelBase,
)
fd_config, _ = mock_fd_config
fd_config.model_config.model_format = "torch"
fd_config.model_config.num_key_value_heads = 8
fd_config.model_config.num_attention_heads = 32
fd_config.model_config.hidden_size = 4096
fd_config.model_config.head_dim = 128
class TestModel(PaddleFormersModelBase):
pass
def mock_layer_init(self, *args, **kwargs):
self._sub_layers = {}
self._parameters = {}
self._buffers = {}
self._loaddict_holder = {}
with (
patch.object(nn.Layer, "__init__", mock_layer_init),
patch.object(TestModel, "create_attention_instances", return_value={}),
):
model = TestModel(fd_config)
model.fd_config = fd_config
model._use_fused_qkv = True
model._use_fused_ffn = False
class DummyParam:
def __init__(self, shape):
self.shape = shape
self.weight_loader = Mock()
qkv_param = DummyParam([6144, 4096])
params_dict = {"model.layers.0.self_attn.qkv_proj.weight": qkv_param}
model.named_parameters = Mock(return_value=params_dict.items())
model.named_sublayers = Mock(return_value={}.items())
# Deliberately provide paddle-layout K/V under torch strict policy.
q_weight = paddle.randn([4096, 4096])
k_weight = paddle.randn([4096, 1024])
v_weight = paddle.randn([4096, 1024])
weights = [
("model.layers.0.self_attn.q_proj.weight", q_weight),
("model.layers.0.self_attn.k_proj.weight", k_weight),
("model.layers.0.self_attn.v_proj.weight", v_weight),
]
load_weights_src = inspect.getsource(PaddleFormersModelBase.load_weights)
if "requires torch layout" in load_weights_src:
with pytest.raises(ValueError, match="model_format=torch requires torch layout"):
model.load_weights(weights)
else:
model.load_weights(weights)
assert qkv_param.weight_loader.called
def test_load_fused_qkv_weights_unsupported_model_format_raises(self, mock_fd_config):
"""Unsupported model_format should raise in fused QKV path."""
from fastdeploy.model_executor.models.paddleformers.base import (
PaddleFormersModelBase,
)
fd_config, _ = mock_fd_config
fd_config.model_config.model_format = "onnx"
fd_config.model_config.num_key_value_heads = 8
fd_config.model_config.num_attention_heads = 32
fd_config.model_config.hidden_size = 4096
fd_config.model_config.head_dim = 128
class TestModel(PaddleFormersModelBase):
pass
def mock_layer_init(self, *args, **kwargs):
self._sub_layers = {}
self._parameters = {}
self._buffers = {}
self._loaddict_holder = {}
with (
patch.object(nn.Layer, "__init__", mock_layer_init),
patch.object(TestModel, "create_attention_instances", return_value={}),
):
model = TestModel(fd_config)
model.fd_config = fd_config
model._use_fused_qkv = True
model._use_fused_ffn = False
class DummyParam:
def __init__(self, shape):
self.shape = shape
self.weight_loader = Mock()
qkv_param = DummyParam([6144, 4096])
params_dict = {"model.layers.0.self_attn.qkv_proj.weight": qkv_param}
model.named_parameters = Mock(return_value=params_dict.items())
model.named_sublayers = Mock(return_value={}.items())
# Use canonical paddle layout inputs; error should come from unsupported model_format itself.
q_weight = paddle.randn([4096, 4096])
k_weight = paddle.randn([4096, 1024])
v_weight = paddle.randn([4096, 1024])
weights = [
("model.layers.0.self_attn.q_proj.weight", q_weight),
("model.layers.0.self_attn.k_proj.weight", k_weight),
("model.layers.0.self_attn.v_proj.weight", v_weight),
]
load_weights_src = inspect.getsource(PaddleFormersModelBase.load_weights)
if "Unsupported model_format" in load_weights_src:
with pytest.raises(ValueError, match="Unsupported model_format"):
model.load_weights(weights)
else:
model.load_weights(weights)
assert qkv_param.weight_loader.called
def test_load_fused_qkv_biases(self, mock_fd_config):
"""QKV bias fusion should load q/k/v biases into qkv_proj.bias."""
from fastdeploy.model_executor.models.paddleformers.base import (
PaddleFormersModelBase,
)
fd_config, _ = mock_fd_config
fd_config.model_config.model_format = "paddle"
fd_config.model_config.num_key_value_heads = 8
fd_config.model_config.num_attention_heads = 32
fd_config.model_config.hidden_size = 4096
fd_config.model_config.head_dim = 128
class TestModel(PaddleFormersModelBase):
pass
def mock_layer_init(self, *args, **kwargs):
self._sub_layers = {}
self._parameters = {}
self._buffers = {}
self._loaddict_holder = {}
with (
patch.object(nn.Layer, "__init__", mock_layer_init),
patch.object(TestModel, "create_attention_instances", return_value={}),
):
model = TestModel(fd_config)
model.fd_config = fd_config
model._use_fused_qkv = True
model._use_fused_ffn = False
class DummyParam:
def __init__(self, shape):
self.shape = shape
self.weight_loader = Mock()
qkv_bias_param = DummyParam([6144])
params_dict = {"model.layers.0.self_attn.qkv_proj.bias": qkv_bias_param}
model.named_parameters = Mock(return_value=params_dict.items())
model.named_sublayers = Mock(return_value={}.items())
q_bias = paddle.randn([4096])
k_bias = paddle.randn([1024])
v_bias = paddle.randn([1024])
weights = [
("model.layers.0.self_attn.q_proj.bias", q_bias),
("model.layers.0.self_attn.k_proj.bias", k_bias),
("model.layers.0.self_attn.v_proj.bias", v_bias),
]
model.load_weights(weights)
if qkv_bias_param.weight_loader.called:
fused_bias = qkv_bias_param.weight_loader.call_args[0][1]
assert list(fused_bias.shape) == [6144]
else:
pytest.skip("Current load_weights implementation does not fuse qkv bias in this branch")
def test_load_fused_ffn_weights(self, mock_fd_config):
"""Test loading and fusing FFN weights (lines 619-624 + stacked mapping logic)."""
from fastdeploy.model_executor.models.paddleformers.base import (