mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2026-04-23 00:17:25 +08:00
1210 lines
46 KiB
Python
1210 lines
46 KiB
Python
"""
|
|
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
|
|
# This file is modified from https://github.com/vllm-project/vllm/blob/main/benchmarks/backend_request_func.py
|
|
|
|
|
|
import copy
|
|
import io
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
import aiohttp
|
|
from tqdm.asyncio import tqdm
|
|
|
|
AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60)
|
|
|
|
|
|
@dataclass
|
|
class RequestFuncInput:
|
|
"""Input for requesting LLMs via API"""
|
|
|
|
no: int
|
|
prompt: str
|
|
history_QA: Optional[dict]
|
|
hyper_parameters: dict
|
|
api_url: str
|
|
prompt_len: int
|
|
output_len: int
|
|
model: str
|
|
model_name: Optional[str] = None
|
|
logprobs: Optional[int] = None
|
|
extra_body: Optional[dict] = None
|
|
multi_modal_content: Optional[dict] = None
|
|
ignore_eos: bool = False
|
|
language: Optional[str] = None
|
|
debug: bool = False
|
|
pd_metrics: bool = False
|
|
response_format: Optional[dict] = None
|
|
random_flag: bool = False
|
|
json_data: Optional[dict] = None
|
|
|
|
|
|
@dataclass
|
|
class RequestFuncOutput:
|
|
"""Output for requesting LLMs via API"""
|
|
|
|
no: int = 0
|
|
request_id: str = ""
|
|
generated_text: str = ""
|
|
reasoning_content: str = ""
|
|
success: bool = False
|
|
latency: float = 0.0
|
|
end_timestamp: float = 0.0 # 模型完全返回的时间戳(秒, perf_counter基准)
|
|
output_tokens: int = 0
|
|
ttft: float = 0.0 # Time to first token
|
|
arrival_time: list = field(default_factory=list) # arrival_time
|
|
itl: list = field(default_factory=list) # list of inter-token latencies
|
|
tpot: float = 0.0 # avg next-token latencies
|
|
prompt_len: int = 0
|
|
prompt_tokens: int = 0 # 推理侧返回输入token数
|
|
reasoning_tokens: int = 0 # 思考长度
|
|
res_ttft: int = 0 # 包含思考首token时延
|
|
error: str = ""
|
|
metrics: dict = field(default_factory=dict)
|
|
tool_calls: list = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class SessionMetrics:
|
|
"""多轮对话指标"""
|
|
|
|
session_no: int
|
|
session_e2e_time: float
|
|
pure_llm_time: float
|
|
input_tokens: int
|
|
output_tokens: int
|
|
tool_calls: int
|
|
|
|
|
|
def safe_cost(a, b):
|
|
"""时间差计算"""
|
|
if a is None or b is None:
|
|
return None
|
|
return a - b
|
|
|
|
|
|
def metrics_summary(metrics, token_timestamps):
|
|
"""Summarize metrics"""
|
|
if not metrics or len(token_timestamps) < 2:
|
|
return {}
|
|
|
|
m0 = metrics[0]
|
|
m_last = metrics[-1]
|
|
|
|
summary = {}
|
|
|
|
arrival_time = m0.get("arrival_time")
|
|
inference_start_time = m0.get("inference_start_time")
|
|
|
|
# prefill 总耗时
|
|
summary["prefill_cost_time"] = safe_cost(m0.get("send_request_output_to_decode_time"), arrival_time)
|
|
# prefill准备总耗时
|
|
summary["prefill_prepare_cost_time"] = safe_cost(inference_start_time, arrival_time)
|
|
# 预处理耗时
|
|
summary["preprocess_cost_time"] = safe_cost(m0.get("scheduler_recv_req_time"), arrival_time)
|
|
# 请求缓存耗时
|
|
summary["cache_in_scheduler_cost_time"] = safe_cost(
|
|
m0.get("engine_get_req_time"), m0.get("scheduler_recv_req_time")
|
|
)
|
|
# 申请 decode资源耗时
|
|
summary["ask_decode_resource_cost_time"] = safe_cost(
|
|
m0.get("ask_decode_resource_finish_time"), m0.get("ask_decode_resource_start_time")
|
|
)
|
|
# scheduler调度耗时
|
|
summary["schedule_cost_time"] = safe_cost(
|
|
m0.get("inference_start_time"), m0.get("ask_decode_resource_finish_time")
|
|
)
|
|
# prefill 的首 token 推理耗时
|
|
summary["prefill_first_token_infer_cost_time"] = safe_cost(
|
|
m0.get("engine_recv_first_token_time"), inference_start_time
|
|
)
|
|
# prefill 等待 cache 传输耗时
|
|
summary["wait_sending_cache_cost_time"] = safe_cost(
|
|
m0.get("send_request_output_to_decode_time"), m0.get("wait_for_sending_cache_time")
|
|
)
|
|
# decode分配资源耗时
|
|
summary["decode_preallocate_cost_time"] = safe_cost(
|
|
m_last.get("decode_preallocate_req_time"), m_last.get("decode_recv_req_time")
|
|
)
|
|
# decode准备推理耗时
|
|
summary["decode_prepare_cost_time"] = safe_cost(
|
|
m_last.get("decode_inference_start_time"), m_last.get("decode_recv_first_token_time")
|
|
)
|
|
# decode次token推理耗时
|
|
summary["decode_second_token_infer_cost_time"] = safe_cost(
|
|
m_last.get("decode_recv_second_token_time"), m_last.get("decode_inference_start_time")
|
|
)
|
|
# 返回首 token 链路耗时
|
|
summary["first_token_transmission_cost_time"] = safe_cost(
|
|
token_timestamps[0], m_last.get("decode_recv_first_token_time")
|
|
)
|
|
# 返回次 token 链路耗时
|
|
summary["second_token_transmission_cost_time"] = safe_cost(
|
|
token_timestamps[1], m_last.get("decode_recv_second_token_time")
|
|
)
|
|
|
|
# MIX 模式下,scheduler调度耗时
|
|
summary["mixed_schedule_cost_time"] = safe_cost(m0.get("inference_start_time"), m0.get("engine_get_req_time"))
|
|
# MIX 模式下,返回首 token 链路耗时
|
|
summary["mixed_first_token_transmission_cost_time"] = safe_cost(
|
|
token_timestamps[0], m0.get("engine_recv_first_token_time")
|
|
)
|
|
|
|
summary["gpu_cache_token_num"] = m0.get("gpu_cache_token_num")
|
|
summary["cpu_cache_token_num"] = m0.get("cpu_cache_token_num")
|
|
summary["storage_cache_token_num"] = m0.get("storage_cache_token_num")
|
|
summary["cpu_cache_prepare_time"] = m0.get("cpu_cache_prepare_time")
|
|
summary["storage_cache_prepare_time"] = m0.get("storage_cache_prepare_time")
|
|
|
|
return summary
|
|
|
|
|
|
async def async_request_eb_openai_chat_completions(
|
|
request_func_input: RequestFuncInput,
|
|
pbar: Optional[tqdm] = None,
|
|
session: aiohttp.ClientSession | None = None,
|
|
) -> RequestFuncOutput:
|
|
"""Request an LLM using EB OpenAI"""
|
|
api_url = request_func_input.api_url
|
|
assert api_url.endswith(("completions", "profile")), "OpenAI Chat Completions API URL must end with 'completions'."
|
|
|
|
own_session = session is None
|
|
if own_session:
|
|
session = aiohttp.ClientSession(
|
|
trust_env=True,
|
|
read_bufsize=10 * 1024 * 1024,
|
|
timeout=AIOHTTP_TIMEOUT,
|
|
)
|
|
|
|
content = [{"type": "text", "text": request_func_input.prompt}]
|
|
if request_func_input.multi_modal_content:
|
|
content.append(request_func_input.multi_modal_content)
|
|
# print("######json_data:", request_func_input.json_data)
|
|
payload = {
|
|
"model": request_func_input.model,
|
|
"messages": request_func_input.history_QA,
|
|
"stream": True,
|
|
"stream_options": {
|
|
"include_usage": True,
|
|
"continuous_usage_stats": True,
|
|
},
|
|
"max_tokens": request_func_input.output_len,
|
|
"collect_metrics": request_func_input.pd_metrics,
|
|
}
|
|
if request_func_input.json_data:
|
|
json_data = request_func_input.json_data
|
|
|
|
if json_data.get("max_tokens"):
|
|
payload["max_tokens"] = json_data["max_tokens"]
|
|
|
|
if json_data.get("min_tokens"):
|
|
payload["min_tokens"] = json_data["min_tokens"]
|
|
if request_func_input.response_format:
|
|
payload["response_format"] = request_func_input.response_format
|
|
|
|
# 超参由yaml传入
|
|
payload.update(request_func_input.hyper_parameters)
|
|
|
|
# tools信息,yaml优先级最高
|
|
json_data = request_func_input.json_data or {}
|
|
hyper = request_func_input.hyper_parameters or {}
|
|
|
|
tools = None
|
|
tool_choice = None
|
|
|
|
if hyper.get("tools"):
|
|
tools = hyper.get("tools")
|
|
tool_choice = hyper.get("tool_choice", "auto")
|
|
elif json_data.get("tools"):
|
|
tools = json_data.get("tools")
|
|
tool_choice = json_data.get("tool_choice", "auto")
|
|
|
|
if tools:
|
|
payload["tools"] = tools
|
|
payload["tool_choice"] = tool_choice
|
|
|
|
# 随机输入开关
|
|
if request_func_input.random_flag:
|
|
payload["max_tokens"] = request_func_input.output_len
|
|
metadata = payload.get("metadata", {})
|
|
metadata["min_tokens"] = request_func_input.output_len
|
|
payload["metadata"] = metadata
|
|
|
|
if request_func_input.ignore_eos:
|
|
payload["ignore_eos"] = request_func_input.ignore_eos
|
|
|
|
if request_func_input.debug:
|
|
print(f"payload:{json.dumps(payload, ensure_ascii=False)}")
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
|
|
}
|
|
|
|
output = RequestFuncOutput()
|
|
output.prompt_len = 0
|
|
output.no = request_func_input.no
|
|
metrics_list = []
|
|
request_id = "None"
|
|
|
|
ttft = 0.0
|
|
res_ttft = 0.0
|
|
st = time.perf_counter()
|
|
most_recent_timestamp = st
|
|
token_timestamps = []
|
|
tool_call_buffer = {}
|
|
try:
|
|
async with session.post(url=api_url, json=payload, headers=headers, read_bufsize=10 * 1024 * 1024) as response:
|
|
data = {}
|
|
if response.status == 200:
|
|
async for chunk_bytes in response.content:
|
|
chunk_bytes = chunk_bytes.strip()
|
|
if not chunk_bytes:
|
|
continue
|
|
|
|
chunk = chunk_bytes.decode("utf-8").removeprefix("data: ")
|
|
if chunk != "[DONE]":
|
|
# print("####chunk:", chunk, type(chunk))
|
|
timestamp = time.perf_counter()
|
|
data = json.loads(chunk)
|
|
# print("####data:", json.dumps(data, indent=2, ensure_ascii=False))
|
|
|
|
if "metrics" in data:
|
|
metrics_list.append(data["metrics"])
|
|
|
|
if request_id == "None" and "id" in data:
|
|
request_id = data["id"]
|
|
|
|
if choices := data.get("choices"):
|
|
content = choices[0]["delta"].get("content")
|
|
reason_content = choices[0]["delta"].get("reasoning_content")
|
|
tool_calls = choices[0]["delta"].get("tool_calls")
|
|
if tool_calls:
|
|
for tc in tool_calls:
|
|
idx = tc.get("index", 0)
|
|
|
|
if idx not in tool_call_buffer:
|
|
tool_call_buffer[idx] = {
|
|
"id": tc.get("id"),
|
|
"name": "",
|
|
"arguments": "",
|
|
}
|
|
|
|
func = tc.get("function", {})
|
|
|
|
if "name" in func:
|
|
tool_call_buffer[idx]["name"] = func["name"]
|
|
|
|
if "arguments" in func:
|
|
tool_call_buffer[idx]["arguments"] += func["arguments"]
|
|
|
|
# First token
|
|
if ttft == 0.0:
|
|
ttft = timestamp - st
|
|
output.ttft = ttft
|
|
# cached_tokens
|
|
if data["usage"] and data["usage"].get("prompt_tokens_details", {}):
|
|
output.prompt_len = (
|
|
data["usage"].get("prompt_tokens_details", {}).get("cached_tokens", 0)
|
|
)
|
|
else:
|
|
output.prompt_len = 0
|
|
|
|
# Decoding phase
|
|
else:
|
|
output.itl.append(timestamp - most_recent_timestamp)
|
|
|
|
# response首token
|
|
if res_ttft == 0.0:
|
|
if content:
|
|
res_ttft = choices[0].get("arrival_time", timestamp)
|
|
output.res_ttft = res_ttft
|
|
usage = data.get("usage") or {}
|
|
output.reasoning_tokens = max(usage.get("completion_tokens", 0) - 1, 0)
|
|
|
|
output.generated_text += content or ""
|
|
output.reasoning_content += reason_content or ""
|
|
# print(f"####content:{data}")
|
|
output.arrival_time.append(choices[0].get("arrival_time", timestamp))
|
|
elif usage := data.get("usage", {}):
|
|
output.output_tokens = usage.get("completion_tokens", 0)
|
|
output.prompt_tokens = usage.get("prompt_tokens", 0)
|
|
if output.prompt_len == 0:
|
|
if data["usage"] and data["usage"].get("prompt_tokens_details", {}):
|
|
output.prompt_len = (
|
|
data["usage"].get("prompt_tokens_details", {}).get("cached_tokens", 0)
|
|
)
|
|
|
|
most_recent_timestamp = timestamp
|
|
token_timestamps.append(time.time())
|
|
|
|
# output.generated_text = generated_text
|
|
# 在流式结束时,记录最后一个 chunk 收到的时间戳
|
|
output.end_timestamp = most_recent_timestamp
|
|
|
|
if tool_call_buffer:
|
|
for _, tc in tool_call_buffer.items():
|
|
try:
|
|
args = json.loads(tc["arguments"]) if tc["arguments"] else {}
|
|
except:
|
|
args = {}
|
|
|
|
output.tool_calls.append({"id": tc["id"], "name": tc["name"], "arguments": args})
|
|
|
|
# 新增metrics统计,计算首token过滤空包
|
|
output.metrics = metrics_summary(metrics_list, token_timestamps[1:])
|
|
|
|
has_text = output.generated_text.strip() or output.reasoning_content.strip()
|
|
has_tool = getattr(output, "tool_calls", None)
|
|
|
|
# 兼容思考内容超长截断的情况,此时回复内容为空
|
|
if not has_text and not has_tool:
|
|
output.success = False
|
|
output.reasoning_tokens = output.output_tokens
|
|
output.error = "No generated text found!"
|
|
else:
|
|
output.success = True
|
|
output.latency = most_recent_timestamp - st
|
|
else:
|
|
error_text = await response.text()
|
|
print(
|
|
"####error response:",
|
|
error_text,
|
|
"####payload:",
|
|
payload,
|
|
)
|
|
output.error = error_text or ""
|
|
output.success = False
|
|
except Exception:
|
|
output.success = False
|
|
exc_info = sys.exc_info()
|
|
output.error = "".join(traceback.format_exception(*exc_info))
|
|
finally:
|
|
if own_session:
|
|
await session.close()
|
|
|
|
output.request_id = request_id
|
|
|
|
# 保存失败请求结果
|
|
if not output.success or output.output_tokens == 0:
|
|
with open("error_output.txt", "a") as f:
|
|
f.write(str(output) + "\n")
|
|
if pbar:
|
|
pbar.update(1)
|
|
if request_func_input.debug:
|
|
print("#####final_output:", output)
|
|
return output
|
|
|
|
|
|
async def simple_tool_call(model_output, tool_url: str, timeout=60):
|
|
"""调用工具函数"""
|
|
import re
|
|
|
|
import httpx
|
|
|
|
tool_id = None
|
|
|
|
if getattr(model_output, "tool_calls", None):
|
|
tc = model_output.tool_calls[0]
|
|
tool_name = tc["name"]
|
|
args = tc.get("arguments", {})
|
|
tool_id = tc.get("id")
|
|
else:
|
|
match = re.search(r"<tool_call>(.*?)</tool_call>", model_output.generated_text, re.S)
|
|
if not match:
|
|
return "", False, "", tool_id
|
|
|
|
block = match.group(1).strip()
|
|
lines = block.splitlines()
|
|
tool_name = lines[0].strip()
|
|
|
|
key = re.search(r"<arg_key>(.*?)</arg_key>", block)
|
|
val = re.search(r"<arg_value>(.*?)</arg_value>", block)
|
|
|
|
args = {key.group(1): val.group(1)} if key and val else {}
|
|
|
|
if not tool_name:
|
|
return "", False, "", tool_id
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
resp = await client.post(
|
|
tool_url,
|
|
headers=headers,
|
|
json={"tool_name": tool_name, "arguments": args},
|
|
)
|
|
|
|
resp.raise_for_status()
|
|
obj = resp.json()
|
|
|
|
return obj.get("result", resp.text), "result" in obj, tool_name, tool_id
|
|
|
|
except Exception as e:
|
|
print(f"[TOOL ERROR] {tool_name}: {repr(e)}")
|
|
return str(e), False, tool_name, tool_id
|
|
|
|
|
|
async def async_request_eb_openai_chat_completions_multi_turn(
|
|
request_func_input: RequestFuncInput,
|
|
pbar: Optional[tqdm] = None,
|
|
):
|
|
# yaml中或数据集中带tools才走工具调用逻辑
|
|
json_data = request_func_input.json_data or {}
|
|
hyper = request_func_input.hyper_parameters or {}
|
|
enable_tools = bool(json_data.get("tools") or hyper.get("tools"))
|
|
|
|
outputs = []
|
|
|
|
tool_call_count = 0
|
|
llm_time = 0.0
|
|
tool_time = 0.0
|
|
input_tokens = 0
|
|
output_tokens = 0
|
|
|
|
ori_history = request_func_input.history_QA
|
|
user_count = sum(msg.get("role") == "user" for msg in ori_history)
|
|
print("START", request_func_input.no, "user对话轮数:", user_count, flush=True)
|
|
history = []
|
|
prompt_no = 0
|
|
|
|
# 只创建一次 session
|
|
session_start = time.perf_counter()
|
|
connector = aiohttp.TCPConnector(
|
|
limit=0,
|
|
limit_per_host=0,
|
|
keepalive_timeout=60,
|
|
)
|
|
|
|
async with aiohttp.ClientSession(
|
|
connector=connector,
|
|
trust_env=True,
|
|
read_bufsize=10 * 1024 * 1024,
|
|
timeout=AIOHTTP_TIMEOUT,
|
|
) as session:
|
|
for i, message in enumerate(ori_history):
|
|
if message["role"] == "user" or message["role"] == "tool":
|
|
history.append(message)
|
|
round_input = copy.deepcopy(request_func_input)
|
|
round_input.history_QA = history
|
|
round_input.no = f"{round_input.no}_{prompt_no}"
|
|
# 复用 session
|
|
s0 = time.perf_counter()
|
|
output = await async_request_eb_openai_chat_completions(
|
|
round_input,
|
|
pbar=None,
|
|
session=session,
|
|
)
|
|
s1 = time.perf_counter()
|
|
llm_time += s1 - s0
|
|
|
|
outputs.append(output)
|
|
|
|
if not output.success:
|
|
session_end = time.perf_counter()
|
|
metrics = SessionMetrics(
|
|
session_no=request_func_input.no,
|
|
session_e2e_time=session_end - session_start,
|
|
pure_llm_time=llm_time,
|
|
input_tokens=input_tokens,
|
|
output_tokens=output_tokens,
|
|
tool_calls=tool_call_count,
|
|
)
|
|
return outputs, metrics
|
|
|
|
# llm_cost = s1 - s0
|
|
input_tokens += output.prompt_tokens
|
|
output_tokens += output.output_tokens
|
|
|
|
if enable_tools:
|
|
# 循环调用工具
|
|
max_loop = json_data.get("max_loop", 10)
|
|
tool_url = json_data.get("tool_url", "")
|
|
max_prompt_len = json_data.get("max_prompt_len")
|
|
if not tool_url:
|
|
raise ValueError("tool_url is empty.")
|
|
for _ in range(max_loop):
|
|
t0 = time.perf_counter()
|
|
tool_result, is_tool_result, tool_name, tool_id = await simple_tool_call(
|
|
output,
|
|
tool_url,
|
|
)
|
|
t1 = time.perf_counter()
|
|
tool_time += t1 - t0
|
|
# print(f"#### tool_time: {t1 - t0:.3f}")
|
|
# print(f"#### tool_result: {tool_result}")
|
|
# print(f"#### is_tool_result: {is_tool_result}")
|
|
|
|
# 工具调用失败
|
|
if tool_name and not is_tool_result:
|
|
print(f"[SESSION FAIL] tool call failed: {tool_name}")
|
|
|
|
output.success = False
|
|
outputs.append(output)
|
|
|
|
session_end = time.perf_counter()
|
|
session_e2e_time = session_end - session_start
|
|
tool_call_count += 1
|
|
|
|
metrics = SessionMetrics(
|
|
session_no=request_func_input.no,
|
|
session_e2e_time=session_e2e_time,
|
|
pure_llm_time=llm_time,
|
|
input_tokens=input_tokens,
|
|
output_tokens=output_tokens,
|
|
tool_calls=tool_call_count,
|
|
)
|
|
|
|
return outputs, metrics
|
|
|
|
if not is_tool_result:
|
|
history.append(
|
|
{
|
|
"role": "assistant",
|
|
"content": output.generated_text,
|
|
}
|
|
)
|
|
break
|
|
|
|
assistant_msg = {
|
|
"role": "assistant",
|
|
"content": output.generated_text,
|
|
}
|
|
|
|
if getattr(output, "tool_calls", None):
|
|
assistant_msg["tool_calls"] = [
|
|
{
|
|
"id": tc["id"],
|
|
"type": "function",
|
|
"function": {
|
|
"name": tc["name"],
|
|
"arguments": json.dumps(tc["arguments"], ensure_ascii=False),
|
|
},
|
|
}
|
|
for tc in output.tool_calls
|
|
]
|
|
|
|
history.append(assistant_msg)
|
|
|
|
history.append(
|
|
{
|
|
"role": "tool",
|
|
"content": json.dumps(tool_result, ensure_ascii=False),
|
|
"tool_call_id": tool_id or tool_name,
|
|
}
|
|
)
|
|
tool_call_count += 1
|
|
|
|
round_input.history_QA = history
|
|
|
|
s0 = time.perf_counter()
|
|
output = await async_request_eb_openai_chat_completions(
|
|
round_input,
|
|
pbar=None,
|
|
session=session,
|
|
)
|
|
s1 = time.perf_counter()
|
|
llm_time += s1 - s0
|
|
|
|
outputs.append(output)
|
|
|
|
if not output.success:
|
|
session_end = time.perf_counter()
|
|
metrics = SessionMetrics(
|
|
session_no=request_func_input.no,
|
|
session_e2e_time=session_end - session_start,
|
|
pure_llm_time=llm_time,
|
|
input_tokens=input_tokens,
|
|
output_tokens=output_tokens,
|
|
tool_calls=tool_call_count,
|
|
)
|
|
return outputs, metrics
|
|
|
|
input_tokens += output.prompt_tokens
|
|
output_tokens += output.output_tokens
|
|
# 若session输入长度超过max_prompt_len,则停止session
|
|
if max_prompt_len and input_tokens >= max_prompt_len:
|
|
print(f"[SESSION STOP] reach max_prompt_len={max_prompt_len}, stop session")
|
|
session_end = time.perf_counter()
|
|
metrics = SessionMetrics(
|
|
session_no=request_func_input.no,
|
|
session_e2e_time=session_end - session_start,
|
|
pure_llm_time=llm_time,
|
|
input_tokens=input_tokens,
|
|
output_tokens=output_tokens,
|
|
tool_calls=tool_call_count,
|
|
)
|
|
return outputs, metrics
|
|
else:
|
|
print(f"Warning exceed max_loop={max_loop}, force stop tool loop")
|
|
|
|
else:
|
|
# 无tools
|
|
history.append(
|
|
{
|
|
"role": "assistant",
|
|
"content": output.generated_text,
|
|
}
|
|
)
|
|
prompt_no += 1
|
|
elif message["role"] == "assistant":
|
|
continue
|
|
else:
|
|
history.append(message)
|
|
|
|
session_end = time.perf_counter()
|
|
session_e2e_time = session_end - session_start
|
|
|
|
if pbar:
|
|
pbar.update(1)
|
|
|
|
metrics = SessionMetrics(
|
|
session_no=request_func_input.no,
|
|
session_e2e_time=session_e2e_time,
|
|
pure_llm_time=llm_time,
|
|
input_tokens=input_tokens,
|
|
output_tokens=output_tokens,
|
|
tool_calls=tool_call_count,
|
|
)
|
|
|
|
return outputs, metrics
|
|
|
|
|
|
async def async_request_eb_openai_completions(
|
|
request_func_input: RequestFuncInput,
|
|
pbar: Optional[tqdm] = None,
|
|
) -> RequestFuncOutput:
|
|
"""Request an LLM using EB OpenAI"""
|
|
api_url = request_func_input.api_url
|
|
assert api_url.endswith(
|
|
("completions", "profile")
|
|
), "OpenAI Completions API URL must end with 'completions' or 'profile'."
|
|
|
|
async with aiohttp.ClientSession(
|
|
trust_env=True, read_bufsize=10 * 1024 * 1024, timeout=AIOHTTP_TIMEOUT
|
|
) as session:
|
|
payload = {
|
|
"model": request_func_input.model,
|
|
"prompt": request_func_input.prompt,
|
|
"stream": True,
|
|
"stream_options": {
|
|
"include_usage": True,
|
|
"continuous_usage_stats": True,
|
|
},
|
|
}
|
|
# 超参由yaml传入
|
|
payload.update(request_func_input.hyper_parameters)
|
|
|
|
if request_func_input.ignore_eos:
|
|
payload["ignore_eos"] = request_func_input.ignore_eos
|
|
|
|
if request_func_input.debug:
|
|
print("payload:", json.dumps(payload, ensure_ascii=False))
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
output = RequestFuncOutput()
|
|
output.prompt_len = request_func_input.prompt_len
|
|
output.no = request_func_input.no
|
|
|
|
generated_text = ""
|
|
ttft = 0.0
|
|
st = time.perf_counter()
|
|
most_recent_timestamp = st
|
|
try:
|
|
async with session.post(url=api_url, json=payload, headers=headers) as response:
|
|
if response.status == 200:
|
|
first_chunk_received = False
|
|
async for chunk_bytes in response.content:
|
|
chunk_bytes = chunk_bytes.strip()
|
|
if not chunk_bytes:
|
|
continue
|
|
|
|
chunk = chunk_bytes.decode("utf-8").removeprefix("data: ")
|
|
if chunk != "[DONE]":
|
|
# print("####chunk:", chunk, chunk.usage)
|
|
timestamp = time.perf_counter()
|
|
data = json.loads(chunk)
|
|
|
|
# NOTE: Some completion API might have a last
|
|
# usage summary response without a token so we
|
|
# want to check a token was generated
|
|
if choices := data.get("choices"):
|
|
# Note that text could be empty here
|
|
# e.g. for special tokens
|
|
text = choices[0].get("text")
|
|
|
|
# First token
|
|
if not first_chunk_received:
|
|
first_chunk_received = True
|
|
ttft = timestamp - st
|
|
output.ttft = ttft
|
|
|
|
# Decoding phase
|
|
else:
|
|
output.itl.append(timestamp - most_recent_timestamp)
|
|
|
|
generated_text += text or ""
|
|
|
|
most_recent_timestamp = timestamp
|
|
output.arrival_time.append(choices[0].get("arrival_time", timestamp))
|
|
elif usage := data.get("usage"):
|
|
output.prompt_tokens = usage.get("prompt_tokens")
|
|
output.output_tokens = usage.get("completion_tokens")
|
|
if first_chunk_received:
|
|
output.success = True
|
|
else:
|
|
output.success = False
|
|
output.error = (
|
|
"Never received a valid chunk to calculate TTFT." "This response will be marked as failed!"
|
|
)
|
|
|
|
output.generated_text = generated_text
|
|
output.latency = most_recent_timestamp - st
|
|
|
|
if output.generated_text == "":
|
|
output.success = False
|
|
output.error = "No generated text found!"
|
|
else:
|
|
output.success = True
|
|
else:
|
|
output.error = response.reason or ""
|
|
output.success = False
|
|
except Exception:
|
|
output.success = False
|
|
exc_info = sys.exc_info()
|
|
output.error = "".join(traceback.format_exception(*exc_info))
|
|
|
|
if request_func_input.debug:
|
|
print(f"final_output:{output}")
|
|
|
|
if pbar:
|
|
pbar.update(1)
|
|
return output
|
|
|
|
|
|
async def async_request_tgi(
|
|
request_func_input: RequestFuncInput,
|
|
pbar: Optional[tqdm] = None,
|
|
) -> RequestFuncOutput:
|
|
"""Request an LLM using the TGI API"""
|
|
api_url = request_func_input.api_url
|
|
assert api_url.endswith("generate_stream")
|
|
|
|
async with aiohttp.ClientSession(trust_env=True, timeout=AIOHTTP_TIMEOUT) as session:
|
|
params = {
|
|
"max_new_tokens": request_func_input.output_len,
|
|
"do_sample": True,
|
|
"temperature": 0.01, # TGI does not accept 0.0 temperature.
|
|
"top_p": 0.99, # TGI does not accept 1.0 top_p.
|
|
"truncate": request_func_input.prompt_len,
|
|
"ignore_eos_token": request_func_input.ignore_eos,
|
|
}
|
|
payload = {
|
|
"inputs": request_func_input.prompt,
|
|
"parameters": params,
|
|
}
|
|
output = RequestFuncOutput()
|
|
output.prompt_len = request_func_input.prompt_len
|
|
if request_func_input.ignore_eos:
|
|
output.output_tokens = request_func_input.output_len
|
|
else:
|
|
output.output_tokens = None
|
|
|
|
ttft = 0.0
|
|
st = time.perf_counter()
|
|
most_recent_timestamp = st
|
|
try:
|
|
async with session.post(url=api_url, json=payload) as response:
|
|
if response.status == 200:
|
|
async for chunk_bytes in response.content:
|
|
chunk_bytes = chunk_bytes.strip()
|
|
if not chunk_bytes:
|
|
continue
|
|
chunk_bytes = chunk_bytes.decode("utf-8")
|
|
|
|
# NOTE: Sometimes TGI returns a ping response without
|
|
# any data, we should skip it.
|
|
if chunk_bytes.startswith(":"):
|
|
continue
|
|
chunk = chunk_bytes.removeprefix("data:")
|
|
|
|
data = json.loads(chunk)
|
|
timestamp = time.perf_counter()
|
|
# First token
|
|
if ttft == 0.0:
|
|
ttft = time.perf_counter() - st
|
|
output.ttft = ttft
|
|
|
|
# Decoding phase
|
|
else:
|
|
output.itl.append(timestamp - most_recent_timestamp)
|
|
|
|
most_recent_timestamp = timestamp
|
|
output.arrival_time.append(data["arrival_time"])
|
|
|
|
output.latency = most_recent_timestamp - st
|
|
output.success = True
|
|
output.generated_text = data["generated_text"]
|
|
else:
|
|
output.error = response.reason or ""
|
|
output.success = False
|
|
except Exception:
|
|
output.success = False
|
|
exc_info = sys.exc_info()
|
|
output.error = "".join(traceback.format_exception(*exc_info))
|
|
|
|
if pbar:
|
|
pbar.update(1)
|
|
return output
|
|
|
|
|
|
async def async_request_trt_llm(
|
|
request_func_input: RequestFuncInput,
|
|
pbar: Optional[tqdm] = None,
|
|
) -> RequestFuncOutput:
|
|
"""Request an LLM using TRT's llm_server"""
|
|
api_url = request_func_input.api_url
|
|
assert api_url.endswith("generate_stream")
|
|
|
|
async with aiohttp.ClientSession(trust_env=True, timeout=AIOHTTP_TIMEOUT) as session:
|
|
payload = {
|
|
"accumulate_tokens": True,
|
|
"text_input": request_func_input.prompt,
|
|
"temperature": 0.0,
|
|
"top_p": 1.0,
|
|
"max_tokens": request_func_input.output_len,
|
|
"stream": True,
|
|
}
|
|
if request_func_input.ignore_eos:
|
|
payload["min_length"] = request_func_input.output_len
|
|
output = RequestFuncOutput()
|
|
output.prompt_len = request_func_input.prompt_len
|
|
|
|
ttft = 0.0
|
|
st = time.perf_counter()
|
|
most_recent_timestamp = st
|
|
try:
|
|
async with session.post(url=api_url, json=payload) as response:
|
|
if response.status == 200:
|
|
async for chunk_bytes in response.content:
|
|
chunk_bytes = chunk_bytes.strip()
|
|
if not chunk_bytes:
|
|
continue
|
|
|
|
chunk = chunk_bytes.decode("utf-8").removeprefix("data:")
|
|
|
|
data = json.loads(chunk)
|
|
output.generated_text += data["text_output"]
|
|
timestamp = time.perf_counter()
|
|
# First token
|
|
if ttft == 0.0:
|
|
ttft = timestamp - st
|
|
output.ttft = ttft
|
|
|
|
# Decoding phase
|
|
else:
|
|
output.itl.append(timestamp - most_recent_timestamp)
|
|
|
|
most_recent_timestamp = timestamp
|
|
|
|
output.latency = most_recent_timestamp - st
|
|
output.success = True
|
|
|
|
else:
|
|
output.error = response.reason or ""
|
|
output.success = False
|
|
except Exception:
|
|
output.success = False
|
|
exc_info = sys.exc_info()
|
|
output.error = "".join(traceback.format_exception(*exc_info))
|
|
|
|
if pbar:
|
|
pbar.update(1)
|
|
return output
|
|
|
|
|
|
async def async_request_deepspeed_mii(
|
|
request_func_input: RequestFuncInput,
|
|
pbar: Optional[tqdm] = None,
|
|
) -> RequestFuncOutput:
|
|
"""Request an LLM using Deepspeed MII"""
|
|
async with aiohttp.ClientSession(trust_env=True, timeout=AIOHTTP_TIMEOUT) as session:
|
|
|
|
payload = {
|
|
"prompt": request_func_input.prompt,
|
|
"max_tokens": request_func_input.output_len,
|
|
"temperature": 0.01, # deepspeed-mii does not accept 0.0 temp.
|
|
"top_p": 1.0,
|
|
}
|
|
output = RequestFuncOutput()
|
|
output.prompt_len = request_func_input.prompt_len
|
|
|
|
# NOTE: DeepSpeed-MII doesn't support streaming as of Jan 28 2024,
|
|
# will use 0 as placeholder.
|
|
# See https://github.com/microsoft/DeepSpeed-MII/pull/311
|
|
output.ttft = 0
|
|
|
|
st = time.perf_counter()
|
|
try:
|
|
async with session.post(url=request_func_input.api_url, json=payload) as response:
|
|
if response.status == 200:
|
|
parsed_resp = await response.json()
|
|
output.latency = time.perf_counter() - st
|
|
if "choices" in parsed_resp:
|
|
output.generated_text = parsed_resp["choices"][0]["text"]
|
|
elif "text" in parsed_resp:
|
|
output.generated_text = parsed_resp["text"][0]
|
|
else:
|
|
output.error = "Unexpected response format: " "neither 'choices' nor 'text' found"
|
|
output.success = False
|
|
output.success = True
|
|
else:
|
|
output.error = response.reason or ""
|
|
output.success = False
|
|
except Exception:
|
|
output.success = False
|
|
exc_info = sys.exc_info()
|
|
output.error = "".join(traceback.format_exception(*exc_info))
|
|
|
|
if pbar:
|
|
pbar.update(1)
|
|
return output
|
|
|
|
|
|
async def async_request_openai_completions(
|
|
request_func_input: RequestFuncInput,
|
|
pbar: Optional[tqdm] = None,
|
|
) -> RequestFuncOutput:
|
|
"""Request an LLM using OpenAI"""
|
|
api_url = request_func_input.api_url
|
|
assert api_url.endswith(
|
|
("completions", "profile")
|
|
), "OpenAI Completions API URL must end with 'completions' or 'profile'."
|
|
|
|
async with aiohttp.ClientSession(trust_env=True, timeout=AIOHTTP_TIMEOUT) as session:
|
|
payload = {
|
|
"model": (request_func_input.model_name if request_func_input.model_name else request_func_input.model),
|
|
"prompt": request_func_input.prompt,
|
|
# "temperature": 0.0,
|
|
"max_tokens": request_func_input.output_len,
|
|
"logprobs": request_func_input.logprobs,
|
|
"stream": True,
|
|
# "stream_options": {
|
|
# "include_usage": True,
|
|
# },
|
|
}
|
|
if request_func_input.ignore_eos:
|
|
payload["ignore_eos"] = request_func_input.ignore_eos
|
|
|
|
headers = {"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"}
|
|
|
|
output = RequestFuncOutput()
|
|
output.prompt_len = request_func_input.prompt_len
|
|
|
|
generated_text = ""
|
|
st = time.perf_counter()
|
|
most_recent_timestamp = st
|
|
try:
|
|
async with session.post(url=api_url, json=payload, headers=headers) as response:
|
|
if response.status == 200:
|
|
first_chunk_received = False
|
|
async for chunk_bytes in response.content:
|
|
chunk_bytes = chunk_bytes.strip()
|
|
if not chunk_bytes:
|
|
continue
|
|
|
|
chunk = chunk_bytes.decode("utf-8").removeprefix("data: ")
|
|
if chunk != "[DONE]":
|
|
# print("####chunk:", chunk, type(chunk))
|
|
data = json.loads(chunk)
|
|
|
|
# NOTE: Some completion API might have a last
|
|
# usage summary response without a token so we
|
|
# want to check a token was generated
|
|
if choices := data.get("choices"):
|
|
# Note that text could be empty here
|
|
# e.g. for special tokens
|
|
text = choices[0].get("text")
|
|
timestamp = time.perf_counter()
|
|
# First token
|
|
if not first_chunk_received:
|
|
first_chunk_received = True
|
|
ttft = time.perf_counter() - st
|
|
output.ttft = ttft
|
|
|
|
# Decoding phase
|
|
else:
|
|
output.itl.append(timestamp - most_recent_timestamp)
|
|
|
|
most_recent_timestamp = timestamp
|
|
generated_text += text or ""
|
|
elif usage := data.get("usage"):
|
|
output.output_tokens = usage.get("completion_tokens")
|
|
if first_chunk_received:
|
|
output.success = True
|
|
else:
|
|
output.success = False
|
|
output.error = (
|
|
"Never received a valid chunk to calculate TTFT." "This response will be marked as failed!"
|
|
)
|
|
output.generated_text = generated_text
|
|
output.latency = most_recent_timestamp - st
|
|
else:
|
|
output.error = response.reason or ""
|
|
output.success = False
|
|
except Exception:
|
|
output.success = False
|
|
exc_info = sys.exc_info()
|
|
output.error = "".join(traceback.format_exception(*exc_info))
|
|
|
|
if pbar:
|
|
pbar.update(1)
|
|
return output
|
|
|
|
|
|
async def async_request_openai_audio(
|
|
request_func_input: RequestFuncInput,
|
|
pbar: Optional[tqdm] = None,
|
|
) -> RequestFuncOutput:
|
|
"""Request an LLM using OpenAI"""
|
|
# Lazy import without PlaceholderModule to avoid vllm dep.
|
|
import soundfile
|
|
|
|
api_url = request_func_input.api_url
|
|
assert api_url.endswith(
|
|
("transcriptions", "translations")
|
|
), "OpenAI Chat Completions API URL must end with 'transcriptions' "
|
|
"or `translations`."
|
|
|
|
async with aiohttp.ClientSession(trust_env=True, timeout=AIOHTTP_TIMEOUT) as session:
|
|
content = [{"type": "text", "text": request_func_input.prompt}]
|
|
payload = {
|
|
"model": (request_func_input.model_name if request_func_input.model_name else request_func_input.model),
|
|
"temperature": 0.0,
|
|
"max_completion_tokens": request_func_input.output_len,
|
|
"stream": True,
|
|
"language": "en",
|
|
# Flattened due to multipart/form-data
|
|
"stream_include_usage": True,
|
|
"stream_continuous_usage_stats": True,
|
|
}
|
|
if request_func_input.extra_body:
|
|
payload.update(request_func_input.extra_body)
|
|
headers = {
|
|
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
|
|
}
|
|
|
|
# Send audio file
|
|
def to_bytes(y, sr):
|
|
buffer = io.BytesIO()
|
|
soundfile.write(buffer, y, sr, format="WAV")
|
|
buffer.seek(0)
|
|
return buffer
|
|
|
|
with to_bytes(*request_func_input.multi_modal_content["audio"]) as f:
|
|
form = aiohttp.FormData()
|
|
form.add_field("file", f, content_type="audio/wav")
|
|
for key, value in payload.items():
|
|
form.add_field(key, str(value))
|
|
|
|
output = RequestFuncOutput()
|
|
output.prompt_len = request_func_input.prompt_len
|
|
|
|
generated_text = ""
|
|
ttft = 0.0
|
|
st = time.perf_counter()
|
|
most_recent_timestamp = st
|
|
try:
|
|
async with session.post(url=api_url, data=form, headers=headers) as response:
|
|
if response.status == 200:
|
|
async for chunk_bytes in response.content:
|
|
chunk_bytes = chunk_bytes.strip()
|
|
if not chunk_bytes:
|
|
continue
|
|
|
|
chunk = chunk_bytes.decode("utf-8").removeprefix("data: ")
|
|
if chunk != "[DONE]":
|
|
timestamp = time.perf_counter()
|
|
data = json.loads(chunk)
|
|
|
|
if choices := data.get("choices"):
|
|
content = choices[0]["delta"].get("content")
|
|
# First token
|
|
if ttft == 0.0:
|
|
ttft = timestamp - st
|
|
output.ttft = ttft
|
|
|
|
# Decoding phase
|
|
else:
|
|
output.itl.append(timestamp - most_recent_timestamp)
|
|
|
|
generated_text += content or ""
|
|
elif usage := data.get("usage"):
|
|
output.output_tokens = usage.get("completion_tokens")
|
|
|
|
most_recent_timestamp = timestamp
|
|
|
|
output.generated_text = generated_text
|
|
output.success = True
|
|
output.latency = most_recent_timestamp - st
|
|
else:
|
|
output.error = response.reason or ""
|
|
output.success = False
|
|
except Exception:
|
|
output.success = False
|
|
exc_info = sys.exc_info()
|
|
output.error = "".join(traceback.format_exception(*exc_info))
|
|
|
|
if pbar:
|
|
pbar.update(1)
|
|
return output
|
|
|
|
|
|
ASYNC_REQUEST_FUNCS = {
|
|
"tgi": async_request_tgi,
|
|
"vllm": async_request_openai_completions,
|
|
"lmdeploy": async_request_openai_completions,
|
|
"deepspeed-mii": async_request_deepspeed_mii,
|
|
"openai": async_request_eb_openai_completions,
|
|
"openai-chat": async_request_eb_openai_chat_completions,
|
|
"openai-chat-multi-turn": async_request_eb_openai_chat_completions_multi_turn,
|
|
"openai-audio": async_request_openai_audio,
|
|
"tensorrt-llm": async_request_trt_llm,
|
|
"scalellm": async_request_openai_completions,
|
|
"sglang": async_request_openai_completions,
|
|
}
|
|
|
|
OPENAI_COMPATIBLE_BACKENDS = [
|
|
k
|
|
for k, v in ASYNC_REQUEST_FUNCS.items()
|
|
if v
|
|
in (
|
|
async_request_openai_completions,
|
|
async_request_eb_openai_chat_completions,
|
|
)
|
|
]
|