This commit is contained in:
BlueSkyXN
2025-02-20 10:33:50 +08:00
parent 4cde4c314e
commit 03935d33f6
+17 -22
View File
@@ -46,11 +46,11 @@ type ThinkingService struct {
Retry int `mapstructure:"retry"`
Weight int `mapstructure:"weight"`
Proxy string `mapstructure:"proxy"`
Mode string `mapstructure:"mode"` // "standard" or "full"
Mode string `mapstructure:"mode"` // "standard" "full"
ReasoningEffort string `mapstructure:"reasoning_effort"`
ReasoningFormat string `mapstructure:"reasoning_format"`
Temperature *float64 `mapstructure:"temperature"`
ForceStopDeepThinking bool `mapstructure:"force_stop_deep_thinking"` // 新增字段
ForceStopDeepThinking bool `mapstructure:"force_stop_deep_thinking"` // 配置项:标准模式下遇到 content 时是否立即停止
}
func (s *ThinkingService) GetFullURL() string {
@@ -619,7 +619,7 @@ func (s *Server) forwardModelsRequest(w http.ResponseWriter, ctx context.Context
// ---------------------- 流式处理 ----------------------
// collectedReasoningBuffer 用于收集思考服务返回的 reasoning_content对 standard 只收集 reasoning_contentfull 收集全部)
// collectedReasoningBuffer 用于收集思考服务返回的 reasoning_content标准模式只收集 reasoning_contentfull 模式收集全部)
type collectedReasoningBuffer struct {
builder strings.Builder
mode string
@@ -655,7 +655,8 @@ func NewStreamHandler(w http.ResponseWriter, thinkingService ThinkingService, ta
}, nil
}
// HandleRequest 分两阶段:先连接思考服务,原样转发 SSE 并收集 reasoning;再请求目标 Channel 并转发 SSE。
// HandleRequest 分两阶段:先流式接收思考服务 SSE 并转发给客户端(只转发包含 reasoning_content 的 chunk,标准模式遇到纯 content 则中断);
// 然后使用收集到的 reasoning_content 构造最终请求,发起目标 Channel 的 SSE 并转发给客户端。
func (h *StreamHandler) HandleRequest(ctx context.Context, req *ChatCompletionRequest) error {
h.writer.Header().Set("Content-Type", "text/event-stream")
h.writer.Header().Set("Cache-Control", "no-cache")
@@ -669,7 +670,7 @@ func (h *StreamHandler) HandleRequest(ctx context.Context, req *ChatCompletionRe
return err
}
// 阶段二:构造最终请求,并请求目标 Channel SSE转发给客户端
// 阶段二:构造最终请求,并流式转发目标 Channel SSE
finalReq := h.prepareFinalRequest(req, reasonBuf.get())
if err := h.streamFinalChannel(ctx, finalReq, logger); err != nil {
return err
@@ -677,7 +678,8 @@ func (h *StreamHandler) HandleRequest(ctx context.Context, req *ChatCompletionRe
return nil
}
// streamThinkingService 连接思考服务 SSE,原样转发每个 chunk,同时在标准模式下遇到只包含 content 的 chunk时停止转发
// streamThinkingService 连接思考服务 SSE,原样转发包含 reasoning_content 的 SSE chunk给客户端,同时收集 reasoning_content
// 在标准模式下,一旦遇到只返回 non-empty 的 content(且 reasoning_content 为空),则立即中断,不转发该 chunk。
func (h *StreamHandler) streamThinkingService(ctx context.Context, req *ChatCompletionRequest, reasonBuf *collectedReasoningBuffer, logger *RequestLogger) error {
thinkingReq := *req
thinkingReq.Model = h.thinkingService.Model
@@ -741,6 +743,7 @@ func (h *StreamHandler) streamThinkingService(ctx context.Context, req *ChatComp
reader := bufio.NewReader(resp.Body)
var lastLine string
// 标识是否需中断思考 SSE(标准模式下)
forceStop := false
for {
@@ -767,10 +770,7 @@ func (h *StreamHandler) streamThinkingService(ctx context.Context, req *ChatComp
}
dataPart := strings.TrimPrefix(line, "data: ")
if dataPart == "[DONE]" {
// 思考服务结束,将标识改为 [THINKING_DONE]
donePayload := "data: [THINKING_DONE]\n\n"
h.writer.Write([]byte(donePayload))
h.flusher.Flush()
// 思考服务结束:直接中断(不发送特殊标记)
break
}
@@ -784,7 +784,7 @@ func (h *StreamHandler) streamThinkingService(ctx context.Context, req *ChatComp
} `json:"choices"`
}
if err := json.Unmarshal([]byte(dataPart), &chunk); err != nil {
// 转发原始数据
// 如果解析失败,直接原样转发
h.writer.Write([]byte("data: " + dataPart + "\n\n"))
h.flusher.Flush()
continue
@@ -797,14 +797,14 @@ func (h *StreamHandler) streamThinkingService(ctx context.Context, req *ChatComp
}
if h.thinkingService.Mode == "full" {
// full 模式:收集全部
// full 模式:收集所有内容
if c.Delta.ReasoningContent != "" {
reasonBuf.append(c.Delta.ReasoningContent)
}
if c.Delta.Content != "" {
reasonBuf.append(c.Delta.Content)
}
// 转发原始 chunk
// 原样转发整 chunk
forwardLine := "data: " + dataPart + "\n\n"
h.writer.Write([]byte(forwardLine))
h.flusher.Flush()
@@ -812,33 +812,28 @@ func (h *StreamHandler) streamThinkingService(ctx context.Context, req *ChatComp
// standard 模式:只收集 reasoning_content
if c.Delta.ReasoningContent != "" {
reasonBuf.append(c.Delta.ReasoningContent)
// 转发该 chunk原样
// 转发该 chunk
forwardLine := "data: " + dataPart + "\n\n"
h.writer.Write([]byte(forwardLine))
h.flusher.Flush()
}
// 如果遇到只包含 content(或 content 非空且 reasoning_content 为空),认为思考链结束
// 如果遇到 chunk 中只有 content且 reasoning_content 为空),认为思考链结束,不转发该 chunk
if c.Delta.Content != "" && strings.TrimSpace(c.Delta.ReasoningContent) == "" {
forceStop = true
}
}
// 如果 finishReason 不为空,也为结束
// 如果 finishReason 空,也为结束
if c.FinishReason != nil && *c.FinishReason != "" {
forceStop = true
}
}
if forceStop {
donePayload := "data: [THINKING_DONE]\n\n"
h.writer.Write([]byte(donePayload))
h.flusher.Flush()
_ = resp.Body.Close() // 断开连接,避免后续浪费
break
}
}
// 清空剩余数据
// 读空剩余数据
io.Copy(io.Discard, reader)
return nil
}