[Optimization] The pre- and post-processing pipeline do not perform dict conversion (#5494)

* to_request_for_infer initial commit

* refact to from_chat_completion_request

* preprocess use request initial commit

* bugfix

* processors refact to using request

* bug fix

* refact Request from_generic_request

* post process initial commit

* bugfix

* postprocess second commit

* bugfix

* serving_embedding initial commit

* serving_reward initial commit

* bugfix

* replace function name

* async_llm initial commit

* offline initial commit and fix bug

* bugfix

* fix async_llm

* remove add speculate_metrics into data

* fix logprobs bug

* fix echo bug

* fix bug

* fix reasoning_max_tokens

* bugfix

* bugfix and modify unittest

* bugfix and modify unit test

* bugfix

* bugfix

* bugfix

* modify unittest

* fix error when reasong_content is none for text_processor

* remove some unnessary logic

* revert removed logic

* implement add and set method for RequestOutput and refact code

* modify unit test

* modify unit test

* union process_request and process_request_obj

* remove a unit test

* union process_response and process_response_obj

* support qwen3_vl_processor

* modify unittest and remove comments

* fix prompt_logprobs

* fix codestyle

* add v1

* v1

* fix unit test

* fix unit test

* fix pre-commit

* fix

* add process request

* add process request

* fix

* fix

* fix unit test

* fix unit test

* fix unit test

* fix unit test

* fix unit test

* remove file

* add unit test

* add unit test

* add unit test

* fix unit test

* fix unit test

* fix

* fix

---------

Co-authored-by: Jiaxin Sui <95567040+plusNew001@users.noreply.github.com>
Co-authored-by: luukunn <981429396@qq.com>
Co-authored-by: luukunn <83932082+luukunn@users.noreply.github.com>
Co-authored-by: Zhang Yulong <35552275+ZhangYulongg@users.noreply.github.com>
This commit is contained in:
kxz2002
2026-01-22 00:50:52 +08:00
committed by GitHub
parent fe5ba4b509
commit 6e416c62dd
66 changed files with 16614 additions and 739 deletions
@@ -556,6 +556,7 @@ class CompletionRequest(BaseModel):
dict: request parameters in dict format
"""
req_dict = {}
req_dict["metrics"] = {}
# parse request model into dict
if self.suffix is not None:
@@ -739,6 +740,7 @@ class ChatCompletionRequest(BaseModel):
req_dict["prompt_logprobs"] = self.prompt_logprobs
req_dict["temp_scaled_logprobs"] = self.temp_scaled_logprobs
req_dict["top_p_normalized_logprobs"] = self.top_p_normalized_logprobs
req_dict["metrics"] = {}
# parse request model into dict, priority: request params > metadata params
if self.metadata is not None:
+11 -8
View File
@@ -26,6 +26,7 @@ import numpy as np
import fastdeploy.envs as envs
import fastdeploy.metrics.trace as tracing
from fastdeploy.engine.request import Request, RequestOutput
from fastdeploy.entrypoints.openai.protocol import (
ChatCompletionRequest,
ChatCompletionResponse,
@@ -144,10 +145,13 @@ class OpenAIServingChat:
prompt_tokens = None
max_tokens = None
try:
current_req_dict = request.to_dict_for_infer(f"{request_id}_0")
if not envs.ENABLE_V1_DATA_PROCESSOR:
current_req_dict = request.to_dict_for_infer(f"{request_id}_0")
else:
current_req_dict = Request.from_generic_request(request, request_id=f"{request_id}_0")
if "chat_template" not in current_req_dict:
current_req_dict["chat_template"] = self.chat_template
current_req_dict["arrival_time"] = time.time()
current_req_dict["metrics"]["arrival_time"] = time.time()
# preprocess the req_dict
prompt_token_ids = await self.engine_client.format_and_add_data(current_req_dict)
prompt_tokens = current_req_dict.get("prompt_tokens")
@@ -165,7 +169,6 @@ class OpenAIServingChat:
api_server_logger.error(error_msg)
self.engine_client.semaphore.release()
return ErrorResponse(error=ErrorInfo(message=error_msg, type=ErrorType.INVALID_REQUEST_ERROR))
del current_req_dict
if request.stream:
return self.chat_completion_stream_generator(
@@ -416,7 +419,7 @@ class OpenAIServingChat:
if output.get("audio_content", None) is not None:
delta_message.audio_content = output["audio_content"]
if not res["finished"] and "delta_message" in output:
if not res["finished"] and output["enable_parser"]:
delta_message_output = output["delta_message"]
if delta_message_output is None:
continue
@@ -728,7 +731,7 @@ class OpenAIServingChat:
async def _create_chat_completion_choice(
self,
data: dict,
data: RequestOutput | dict,
request: ChatCompletionRequest,
prompt_token_ids: list,
prompt_tokens: str,
@@ -782,16 +785,16 @@ class OpenAIServingChat:
num_cached_tokens[idx] = data.get("num_cached_tokens", 0)
num_input_image_tokens[idx] = data.get("num_input_image_tokens", 0)
num_input_video_tokens[idx] = data.get("num_input_video_tokens", 0)
num_image_tokens[idx] = output.get("num_image_tokens", 0)
num_image_tokens[idx] = output.get("num_image_tokens", 0) or 0
finish_reason = "stop"
if previous_num_tokens != max_tokens:
finish_reason = "stop"
if output.get("tool_call"):
if output.get("tool_call", None):
finish_reason = "tool_calls"
else:
finish_reason = "length"
if data.get("error_msg") is not None and "Recover" in data["error_msg"]:
if data.get("error_msg", None) is not None and "Recover" in data["error_msg"]:
finish_reason = "recover_stop"
return ChatCompletionResponseChoice(
@@ -27,7 +27,7 @@ import numpy as np
import fastdeploy.envs as envs
import fastdeploy.metrics.trace as tracing
from fastdeploy.engine.request import RequestOutput
from fastdeploy.engine.request import Request, RequestOutput
from fastdeploy.entrypoints.openai.protocol import (
CompletionLogprobs,
CompletionRequest,
@@ -178,8 +178,11 @@ class OpenAIServingCompletion:
try:
for idx, prompt in enumerate(request_prompts):
request_id_idx = f"{request_id}_{idx}"
current_req_dict = request.to_dict_for_infer(request_id_idx, prompt)
current_req_dict["arrival_time"] = time.time()
if not envs.ENABLE_V1_DATA_PROCESSOR:
current_req_dict = request.to_dict_for_infer(request_id_idx, prompt)
else:
current_req_dict = Request.from_generic_request(request, request_id=f"{request_id}_0")
current_req_dict["metrics"]["arrival_time"] = time.time()
prompt_token_ids = await self.engine_client.format_and_add_data(current_req_dict) # tokenize
if isinstance(prompt_token_ids, np.ndarray):
prompt_token_ids = prompt_token_ids.tolist()
@@ -562,7 +565,7 @@ class OpenAIServingCompletion:
draft_logprobs=draft_logprobs_res,
speculate_metrics=output_speculate_metrics,
)
if not res["finished"] and "delta_message" in output:
if not res["finished"] and output["enable_parser"]:
delta_message_output = output["delta_message"]
if delta_message_output is None:
continue
@@ -737,7 +740,7 @@ class OpenAIServingCompletion:
else None
),
reasoning_content=output.get("reasoning_content"),
tool_calls=output.get("tool_call"),
tool_calls=output.get("tool_call", None),
logprobs=aggregated_logprobs,
draft_logprobs=aggregated_draft_logprobs,
prompt_logprobs=clamp_prompt_logprobs(prompt_logprobs_res),
@@ -15,17 +15,20 @@
"""
import base64
import time
from collections.abc import AsyncGenerator
from typing import Literal, Union
import numpy as np
from typing_extensions import assert_never, override
import fastdeploy.envs as envs
from fastdeploy.engine.pooling_params import PoolingParams
from fastdeploy.engine.request import (
EmbeddingOutput,
EmbeddingRequestOutput,
PoolingRequestOutput,
Request,
)
from fastdeploy.entrypoints.openai.protocol import (
EmbeddingCompletionRequest,
@@ -66,12 +69,25 @@ class OpenAIServingEmbedding(ZmqOpenAIServing):
@override
def _request_to_dict(self, ctx: ServeContext):
request: EmbeddingRequest = ctx.request
request_dict = super()._request_to_dict(ctx)
if hasattr(request, "to_pooling_params"):
pooling_params: PoolingParams = request.to_pooling_params()
pooling_params.verify("embed", self.cfg.model_config)
request_dict["pooling_params"] = pooling_params.to_dict()
return request_dict
if not envs.ENABLE_V1_DATA_PROCESSOR:
request_dict = super()._request_to_dict(ctx)
if hasattr(request, "to_pooling_params"):
pooling_params: PoolingParams = request.to_pooling_params()
pooling_params.verify("embed", self.cfg.model_config)
request_dict["pooling_params"] = pooling_params.to_dict()
request_dict["metrics"] = {}
return request_dict
else:
request_obj = None
if hasattr(request, "to_pooling_params"):
pooling_params: PoolingParams = request.to_pooling_params()
pooling_params.verify("embed", self.cfg.model_config)
request_obj = Request.from_generic_request(
req=request, request_id=ctx.request_id, pooling_params=pooling_params
)
request_obj.metrics.arrival_time = time.time()
super()._process_chat_template_kwargs(request_obj)
return request_obj
@override
def _request_to_batch_dicts(self, ctx: ServeContext):
@@ -14,12 +14,14 @@
# limitations under the License.
"""
import time
from collections.abc import AsyncGenerator
from typing_extensions import override
import fastdeploy.envs as envs
from fastdeploy.engine.pooling_params import PoolingParams
from fastdeploy.engine.request import PoolingRequestOutput, RewardRequestOutput
from fastdeploy.engine.request import PoolingRequestOutput, Request, RewardRequestOutput
from fastdeploy.entrypoints.openai.protocol import (
ChatRewardData,
ChatRewardRequest,
@@ -43,12 +45,25 @@ class OpenAIServingReward(ZmqOpenAIServing):
@override
def _request_to_dict(self, ctx: ServeContext):
request: ChatRewardRequest = ctx.request
request_dict = super()._request_to_dict(ctx)
if hasattr(request, "to_pooling_params"):
pooling_params: PoolingParams = request.to_pooling_params()
pooling_params.verify("reward", self.cfg.model_config)
request_dict["pooling_params"] = pooling_params.to_dict()
return request_dict
if not envs.ENABLE_V1_DATA_PROCESSOR:
request_dict = super()._request_to_dict(ctx)
if hasattr(request, "to_pooling_params"):
pooling_params: PoolingParams = request.to_pooling_params()
pooling_params.verify("reward", self.cfg.model_config)
request_dict["pooling_params"] = pooling_params.to_dict()
request_dict["metrics"] = {}
return request_dict
else:
request_obj = None
if hasattr(request, "to_pooling_params"):
pooling_params: PoolingParams = request.to_pooling_params()
pooling_params.verify("reward", self.cfg.model_config)
request_obj = Request.from_generic_request(
req=request, request_id=ctx.request_id, pooling_params=pooling_params
)
request_obj.metrics.arrival_time = time.time()
super()._process_chat_template_kwargs(request_obj)
return request_obj
@override
def _request_to_batch_dicts(self, ctx: ServeContext):