refactor pt loading (#4532)
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled
Publish Job / publish_pre_check (push) Has been cancelled
Publish Job / print_publish_pre_check_outputs (push) Has been cancelled
Publish Job / FD-Clone-Linux (push) Has been cancelled
Publish Job / Show Code Archive Output (push) Has been cancelled
Publish Job / BUILD_SM8090 (push) Has been cancelled
Publish Job / BUILD_SM8689 (push) Has been cancelled
Publish Job / PADDLE_PYPI_UPLOAD_8090 (push) Has been cancelled
Publish Job / PADDLE_PYPI_UPLOAD_8689 (push) Has been cancelled
Publish Job / Run FD Image Build (push) Has been cancelled
Publish Job / Run FastDeploy Unit Tests and Coverage (push) Has been cancelled
Publish Job / Run FastDeploy LogProb Tests (push) Has been cancelled
Publish Job / Extracted partial CE model tasks to run in CI. (push) Has been cancelled
Publish Job / Run Base Tests (push) Has been cancelled
Publish Job / Run Accuracy Tests (push) Has been cancelled
Publish Job / Run Stable Tests (push) Has been cancelled
CI Images Build / FD-Clone-Linux (push) Has been cancelled
CI Images Build / Show Code Archive Output (push) Has been cancelled
CI Images Build / CI Images Build (push) Has been cancelled
CI Images Build / BUILD_SM8090 (push) Has been cancelled
CI Images Build / Run FastDeploy Unit Tests and Coverage (push) Has been cancelled
CI Images Build / Run FastDeploy LogProb Tests (push) Has been cancelled
CI Images Build / Extracted partial CE model tasks to run in CI. (push) Has been cancelled
CI Images Build / Run Base Tests (push) Has been cancelled
CI Images Build / Publish Docker Images Pre Check (push) Has been cancelled

This commit is contained in:
bukejiyu
2025-11-11 21:30:39 +08:00
committed by GitHub
parent 4c911ecb74
commit b09ebb2813
35 changed files with 1094 additions and 797 deletions
+56 -65
View File
@@ -16,14 +16,13 @@
from typing import Optional
import numpy as np
import paddle
from paddle import nn
from paddleformers.utils.log import logger
from fastdeploy import envs
from fastdeploy.model_executor.layers.utils import get_tensor
from fastdeploy.model_executor.utils import slice_fn
from fastdeploy.model_executor.utils import h2d_copy, slice_fn
from fastdeploy.platforms import current_platform
from fastdeploy.worker.experts_manager import RedundantExpertManger
@@ -31,6 +30,7 @@ try:
from fastdeploy.model_executor.ops.gpu import noaux_tc
except:
logger.warning("import noaux_tc Failed!")
import numpy as np
def get_moe_method():
@@ -118,6 +118,7 @@ class FusedMoE(nn.Layer):
weight_key_map: dict = {},
with_bias: bool = False,
activation="swiglu",
model_format: Optional[str] = None,
):
"""
Initialize the Moe layer with given parameters.
@@ -201,7 +202,7 @@ class FusedMoE(nn.Layer):
self.quant_method.create_weights(
self,
weight_loader=self.weight_loader,
model_format=fd_config.model_config.model_format,
model_format=fd_config.model_config.model_format if model_format is None else model_format,
num_experts=self.num_local_experts if self.ep_size > 1 else self.num_experts,
hidden_size=self.hidden_size,
moe_intermediate_size=self.moe_intermediate_size,
@@ -214,72 +215,68 @@ class FusedMoE(nn.Layer):
tp_size={self.tp_size}."
)
def weight_loader(self, param, loaded_weight, expert_id, shard_id: Optional[str] = None):
def weight_loader(
self, param, loaded_weight, expert_id, shard_id: Optional[str] = None, source: Optional[str] = None
):
"""
source:Avoid redundant transpose of fused weights when weight_loader is called iteratively
"""
if expert_id is None and shard_id is None:
# MoE experts has been fused in disk
self._load_fused_experts_weight(param, loaded_weight)
return
if hasattr(param, "SHARD_ID_TO_SHARDED_DIM"):
SHARD_ID_TO_SHARDED_DIM = param.SHARD_ID_TO_SHARDED_DIM
elif current_platform.is_cuda() or current_platform.is_iluvatar():
SHARD_ID_TO_SHARDED_DIM = {"gate": 1, "down": 0, "up": 1}
else:
SHARD_ID_TO_SHARDED_DIM = {"gate": 0, "down": 1, "up": 0}
if expert_id - self.expert_id_offset >= 0 and expert_id - self.expert_id_offset < self.num_local_experts:
if hasattr(param, "SHARD_ID_TO_SHARDED_DIM"):
SHARD_ID_TO_SHARDED_DIM = param.SHARD_ID_TO_SHARDED_DIM
elif current_platform.is_cuda() or current_platform.is_iluvatar():
SHARD_ID_TO_SHARDED_DIM = {"gate": 1, "down": 0, "up": 1}
else:
SHARD_ID_TO_SHARDED_DIM = {"gate": 0, "down": 1, "up": 0}
if not param._is_initialized():
param.initialize()
if not (expert_id - self.expert_id_offset >= 0 and expert_id - self.expert_id_offset < self.num_local_experts):
return
weight_need_transpose = getattr(param, "weight_need_transpose", False)
if shard_id is None:
# 1.gate up fused in disk
if weight_need_transpose:
loaded_weight = get_tensor(loaded_weight)
loaded_weight = loaded_weight.transpose([1, 0])
output_size = param[expert_id - self.expert_id_offset].shape[SHARD_ID_TO_SHARDED_DIM["gate"]]
shard_offsets = [
# (shard_id, shard_offset, shard_size)
("gate", 0, output_size // 2 * self.tp_size),
("up", output_size // 2 * self.tp_size, output_size // 2 * self.tp_size),
]
if not param._is_initialized():
param.initialize()
if shard_id is None:
# 1.gate up fused in disk
weight_need_transpose = getattr(param, "weight_need_transpose", False)
output_size = param[expert_id - self.expert_id_offset].shape[SHARD_ID_TO_SHARDED_DIM["gate"]]
per_rank = output_size // 2
start = self.tp_rank * per_rank
loaded_weight_shard_gate = slice_fn(
loaded_weight, weight_need_transpose ^ SHARD_ID_TO_SHARDED_DIM["gate"], start, start + per_rank
)
self._load_gate_up_weight(
param,
expert_id,
loaded_weight_shard_gate,
"gate",
SHARD_ID_TO_SHARDED_DIM["gate"],
is_sharded=True,
)
start_up = output_size // 2 * self.tp_size + self.tp_rank * per_rank
loaded_weight_shard_up = slice_fn(
loaded_weight, weight_need_transpose ^ SHARD_ID_TO_SHARDED_DIM["up"], start_up, start_up + per_rank
)
self._load_gate_up_weight(
param, expert_id, loaded_weight_shard_up, "up", SHARD_ID_TO_SHARDED_DIM["up"], is_sharded=True
)
else:
# 2.gate up splited in disk
assert shard_id in ["gate", "down", "up"]
self._load_expert_weight(
param=param,
expert_id=expert_id,
loaded_weight=loaded_weight,
shard_id=shard_id,
shard_dim=SHARD_ID_TO_SHARDED_DIM[shard_id],
for shard_id, shard_offset, shard_size in shard_offsets:
loaded_weight_shard = slice_fn(
loaded_weight, SHARD_ID_TO_SHARDED_DIM[shard_id], shard_offset, shard_offset + shard_size
)
self.weight_loader(param, loaded_weight_shard, expert_id, shard_id, "fused")
else:
if weight_need_transpose and source != "fused":
loaded_weight = get_tensor(loaded_weight)
loaded_weight = loaded_weight.transpose([1, 0])
# 2.gate up splited in disk
assert shard_id in ["gate", "down", "up"]
self._load_expert_weight(
param=param,
expert_id=expert_id,
loaded_weight=loaded_weight,
shard_id=shard_id,
shard_dim=SHARD_ID_TO_SHARDED_DIM[shard_id],
)
def _load_gate_up_weight(self, param, expert_id, loaded_weight, shard_id, shard_dim=None, is_sharded=False):
weight_need_transpose = getattr(param, "weight_need_transpose", False)
if self.tp_size > 1 and not is_sharded:
tp_shard_dim = weight_need_transpose ^ shard_dim
tp_shard_dim = shard_dim
weight_dim = -1 if tp_shard_dim else 0
if isinstance(loaded_weight, (np.ndarray, paddle.Tensor)):
size = loaded_weight.shape[weight_dim]
else:
size = loaded_weight.get_shape()[weight_dim]
size = loaded_weight.shape[weight_dim]
block_size = size // self.tp_size
shard_offset = self.tp_rank * block_size
shard_size = (self.tp_rank + 1) * block_size
loaded_weight = slice_fn(loaded_weight, tp_shard_dim, shard_offset, shard_size)
loaded_weight = get_tensor(loaded_weight)
expert_param = param[expert_id - self.expert_id_offset]
dim = -1 if shard_dim else 0
param_shard_size = expert_param.shape[dim] // 2
@@ -310,22 +307,17 @@ class FusedMoE(nn.Layer):
loaded_weight = loaded_weight.view(expert_param.dtype)
else:
loaded_weight = loaded_weight.cast(expert_param.dtype)
expert_param.copy_(loaded_weight, False)
h2d_copy(dst=expert_param, src=loaded_weight)
def _load_down_weight(self, param, expert_id, loaded_weight, shard_id, shard_dim=None):
weight_need_transpose = getattr(param, "weight_need_transpose", False)
if self.tp_size > 1 and shard_dim is not None:
tp_shard_dim = weight_need_transpose ^ shard_dim
tp_shard_dim = shard_dim
dim = -1 if tp_shard_dim else 0
if isinstance(loaded_weight, paddle.Tensor):
size = loaded_weight.shape[dim]
else:
size = loaded_weight.get_shape()[dim]
size = loaded_weight.shape[dim]
block_size = size // self.tp_size
shard_offset = self.tp_rank * block_size
shard_size = (self.tp_rank + 1) * block_size
loaded_weight = slice_fn(loaded_weight, tp_shard_dim, shard_offset, shard_size)
loaded_weight = get_tensor(loaded_weight)
expert_param = param[expert_id - self.expert_id_offset]
if hasattr(param, "tensor_track"):
# for dyn quant
@@ -341,7 +333,7 @@ class FusedMoE(nn.Layer):
loaded_weight = loaded_weight.view(expert_param.dtype)
else:
loaded_weight = loaded_weight.cast(expert_param.dtype)
expert_param.copy_(loaded_weight, False)
h2d_copy(dst=expert_param, src=loaded_weight)
def _load_fused_experts_weight(self, param, loaded_weight):
if self.tp_size > 1:
@@ -357,8 +349,7 @@ class FusedMoE(nn.Layer):
assert param.shape == loaded_weight.shape, (
f"Attempted to load weight ({loaded_weight.shape}) " f"into parameter ({param.shape})"
)
loaded_weight = get_tensor(loaded_weight)
param.copy_(loaded_weight, False)
h2d_copy(dst=param, src=loaded_weight)
if hasattr(param, "tensor_track"):
for i in range(self.num_local_experts):