package controllers import ( "bufio" "bytes" "encoding/json" "fmt" "io" "net/http" "strings" "time" "shudao-chat-go/models" "shudao-chat-go/utils" beego "github.com/beego/beego/v2/server/web" ) // LiushiController 流式接口控制器 type LiushiController struct { beego.Controller } // StreamRequest 流式请求结构 type StreamRequest struct { Message string `json:"message"` Model string `json:"model,omitempty"` } // StreamChatWithDBRequest 流式聊天数据库集成请求结构(user_id从token中获取) type StreamChatWithDBRequest struct { Message string `json:"message"` AIConversationId uint64 `json:"ai_conversation_id"` BusinessType int `json:"business_type"` ExamName string `json:"exam_name"` AIMessageId uint64 `json:"ai_message_id"` OnlineSearchContent string `json:"online_search_content"` } // StreamResponse 流式响应结构 type StreamResponse struct { ID string `json:"id"` Object string `json:"object"` Created int64 `json:"created"` Model string `json:"model"` Choices []struct { Index int `json:"index"` Delta struct { Role string `json:"role,omitempty"` Content string `json:"content,omitempty"` } `json:"delta"` FinishReason *string `json:"finish_reason"` } `json:"choices"` } // StreamChat 流式聊天接口(两层流式输出:RAG检索 -> 流式回答) func (c *LiushiController) StreamChat() { // 设置响应头为SSE流式传输 c.Ctx.ResponseWriter.Header().Set("Content-Type", "text/event-stream; charset=utf-8") c.Ctx.ResponseWriter.Header().Set("Cache-Control", "no-cache") c.Ctx.ResponseWriter.Header().Set("Connection", "keep-alive") c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*") c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Headers", "Content-Type") // 获取请求参数 var request StreamRequest if err := json.Unmarshal(c.Ctx.Input.RequestBody, &request); err != nil { c.Ctx.ResponseWriter.WriteHeader(http.StatusBadRequest) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"请求参数解析失败\"}\n\n") return } if request.Message == "" { c.Ctx.ResponseWriter.WriteHeader(http.StatusBadRequest) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"消息内容不能为空\"}\n\n") return } userMessage := request.Message // 第一层:意图识别(非流式) intentPrompt := ` ` // 发送意图识别请求(非流式) intentReply, err := c.sendQwen3Message(intentPrompt, false) if err != nil { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"意图识别失败: %s\"}\n\n", err.Error()) return } fmt.Printf("意图识别回复: %s\n", intentReply) var aiResponse map[string]interface{} cleanReply := strings.TrimSpace(intentReply) cleanReply = strings.TrimPrefix(cleanReply, "```json") cleanReply = strings.TrimSuffix(cleanReply, "```") cleanReply = strings.TrimSpace(cleanReply) if err := json.Unmarshal([]byte(cleanReply), &aiResponse); err != nil { // 如果解析失败,可能是AI直接返回了文本格式(greeting、faq) fmt.Printf("JSON解析失败,AI返回了文本格式回复: %s\n", intentReply) // 直接流式输出AI的原始回复 c.streamTextResponse(intentReply) return } intent, ok := aiResponse["intent"].(string) if !ok { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"意图解析失败\"}\n\n") return } // 根据intent类型决定处理方式 if intent == "greeting" || intent == "faq" { // 对于greeting、faq,直接流式输出 if directAnswer, exists := aiResponse["direct_answer"].(string); exists && directAnswer != "" { c.streamTextResponse(intentReply) } else { c.streamTextResponse(intentReply) } return } // 第二层:RAG检索(query_knowledge_base) if intent == "query_knowledge_base" { searchQueries, ok := aiResponse["search_queries"].([]interface{}) if !ok || len(searchQueries) == 0 { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"未找到有效的查询内容\"}\n\n") return } // 使用第一个查询进行搜索 if len(searchQueries) > 0 { query := searchQueries[0].(string) // 构建搜索请求 searchRequest := map[string]interface{}{ "query": query, "n_results": 25, } requestBody, err := json.Marshal(searchRequest) if err != nil { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索请求构建失败\"}\n\n") return } // 从配置文件中读取搜索API地址 searchAPIURL, err := beego.AppConfig.String("search_api_url") if err != nil || searchAPIURL == "" { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"配置文件中未找到search_api_url\"}\n\n") return } // 发送HTTP请求到搜索服务 req, err := http.NewRequest("POST", searchAPIURL, bytes.NewBuffer(requestBody)) if err != nil { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"创建搜索请求失败\"}\n\n") return } req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 30 * time.Second} resp, err := client.Do(req) if err != nil { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索请求发送失败: %s\"}\n\n", err.Error()) return } defer resp.Body.Close() responseBody, err := io.ReadAll(resp.Body) if err != nil { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"读取搜索结果失败\"}\n\n") return } if resp.StatusCode != http.StatusOK { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索API错误: 状态码 %d\"}\n\n", resp.StatusCode) return } // 解析搜索响应 var searchResponse map[string]interface{} if err := json.Unmarshal(responseBody, &searchResponse); err != nil { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"解析搜索结果失败\"}\n\n") return } // 检查响应状态 status, ok := searchResponse["status"].(string) if !ok || status != "success" { message, _ := searchResponse["message"].(string) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索失败: %s\"}\n\n", message) return } // 获取搜索结果 results, ok := searchResponse["results"].([]interface{}) if !ok || len(results) == 0 { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"未找到相关文档\"}\n\n") return } // 第三层:流式输出最终回答 c.streamRAGResponse(userMessage, results) } } } // streamTextResponse 流式输出文本响应 func (c *LiushiController) streamTextResponse(text string) { fmt.Println("=" + strings.Repeat("=", 80)) fmt.Println("开始流式输出文本响应...") fmt.Printf("文本长度: %d 字符\n", len(text)) fmt.Println("=" + strings.Repeat("=", 80)) // 如果文本较短(小于200字符),直接发送完整文本 if len(text) < 200 { fmt.Printf("文本较短,直接发送完整内容\n") fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", text) c.Ctx.ResponseWriter.Flush() } else { // 文本较长,按块发送以模拟流式效果 fmt.Printf("文本较长,按块发送(每块50字符)\n") c.sendTextInChunks(text, 50) } // 结束标记 fmt.Fprintf(c.Ctx.ResponseWriter, "data: [DONE]\n\n") c.Ctx.ResponseWriter.Flush() // 计算数据块数 chunkCount := 1 if len(text) >= 200 { chunkCount = (len(text) + 49) / 50 // 向上取整 } // 打印完整的流式输出完成结果 c.printStreamCompleteResult(len(text), chunkCount, text) } // sendTextInChunks 按块发送文本以模拟流式效果 func (c *LiushiController) sendTextInChunks(text string, chunkSize int) int { runes := []rune(text) chunkCount := 0 for i := 0; i < len(runes); i += chunkSize { end := i + chunkSize if end > len(runes) { end = len(runes) } chunk := string(runes[i:end]) chunkCount++ fmt.Printf("发送第 %d 块: '%s' (长度: %d)\n", chunkCount, chunk, len(chunk)) fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", chunk) c.Ctx.ResponseWriter.Flush() time.Sleep(50 * time.Millisecond) // 调整延迟以控制速度 } fmt.Printf("总共发送了 %d 个数据块\n", chunkCount) return chunkCount } // streamRAGResponse 流式输出RAG响应 func (c *LiushiController) streamRAGResponse(userMessage string, results []interface{}) { // 将搜索结果转换为JSON字符串作为上下文 contextJSON, err := json.Marshal(results) if err != nil { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"处理搜索结果失败: %s\"}\n\n", err.Error()) return } // 构建最终回答的提示词(内容与原先 natural_language_answer 一致,但仅输出该段纯文本) finalPrompt := ` ` // 直接流式调用并透传 Markdown 正文(真正的流式输出) err = c.sendQwen3MessageStream(finalPrompt) if err != nil { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"生成最终回答失败: %s\"}\n\n", err.Error()) return } } // sendQwen3MessageStream 发送消息到千问模型并直接流式输出到客户端 func (c *LiushiController) sendQwen3MessageStream(userMessage string) error { apiURL, err := beego.AppConfig.String("qwen3_api_url") if err != nil || apiURL == "" { return fmt.Errorf("配置文件中未找到qwen3_api_url") } model, err := beego.AppConfig.String("qwen3_model") if err != nil || model == "" { return fmt.Errorf("配置文件中未找到qwen3_model") } qwen3Request := map[string]interface{}{ "model": model, "stream": true, "temperature": 0.7, "messages": []map[string]string{ {"role": "user", "content": userMessage}, }, } requestBody, err := json.Marshal(qwen3Request) if err != nil { return fmt.Errorf("请求序列化失败: %v", err) } req, err := http.NewRequest("POST", apiURL+"/v1/chat/completions", bytes.NewBuffer(requestBody)) if err != nil { return fmt.Errorf("创建HTTP请求失败: %v", err) } req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 600 * time.Second} resp, err := client.Do(req) if err != nil { return fmt.Errorf("请求发送失败: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { responseBody, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("千问API错误: 状态码 %d,读取响应失败: %v", resp.StatusCode, err) } return fmt.Errorf("千问API错误: %s", string(responseBody)) } // 直接流式处理并输出到客户端 return c.handleStreamResponseToClient(resp) } // sendQwen3Message 发送消息到千问模型 func (c *LiushiController) sendQwen3Message(userMessage string, useStream bool) (string, error) { apiURL, err := beego.AppConfig.String("qwen3_api_url") if err != nil || apiURL == "" { return "", fmt.Errorf("配置文件中未找到qwen3_api_url") } model, err := beego.AppConfig.String("qwen3_model") if err != nil || model == "" { return "", fmt.Errorf("配置文件中未找到qwen3_model") } qwen3Request := map[string]interface{}{ "model": model, "stream": useStream, "temperature": 0.7, "messages": []map[string]string{ {"role": "user", "content": userMessage}, }, } requestBody, err := json.Marshal(qwen3Request) if err != nil { return "", fmt.Errorf("请求序列化失败: %v", err) } req, err := http.NewRequest("POST", apiURL+"/v1/chat/completions", bytes.NewBuffer(requestBody)) if err != nil { return "", fmt.Errorf("创建HTTP请求失败: %v", err) } req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 600 * time.Second} resp, err := client.Do(req) if err != nil { return "", fmt.Errorf("请求发送失败: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { responseBody, err := io.ReadAll(resp.Body) if err != nil { return "", fmt.Errorf("千问API错误: 状态码 %d,读取响应失败: %v", resp.StatusCode, err) } return "", fmt.Errorf("千问API错误: %s", string(responseBody)) } if useStream { // 流式响应处理 type StreamResponse struct { ID string `json:"id"` Object string `json:"object"` Created int64 `json:"created"` Model string `json:"model"` Choices []struct { Index int `json:"index"` Delta struct { Role string `json:"role,omitempty"` Content string `json:"content,omitempty"` } `json:"delta"` FinishReason *string `json:"finish_reason"` } `json:"choices"` } var fullContent strings.Builder scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { line := scanner.Text() if line == "" || !strings.HasPrefix(line, "data: ") { continue } data := strings.TrimPrefix(line, "data: ") if data == "[DONE]" { break } var streamResp StreamResponse if err := json.Unmarshal([]byte(data), &streamResp); err != nil { continue } if len(streamResp.Choices) > 0 && streamResp.Choices[0].Delta.Content != "" { fullContent.WriteString(streamResp.Choices[0].Delta.Content) } } return fullContent.String(), nil } else { // 非流式响应处理 type NonStreamResponse struct { Choices []struct { Message struct { Content string `json:"content"` } `json:"message"` } `json:"choices"` } responseBody, err := io.ReadAll(resp.Body) if err != nil { return "", fmt.Errorf("读取响应失败: %v", err) } var response NonStreamResponse if err := json.Unmarshal(responseBody, &response); err != nil { return "", fmt.Errorf("解析响应失败: %v", err) } if len(response.Choices) > 0 { return response.Choices[0].Message.Content, nil } return "", fmt.Errorf("未找到有效响应") } } // handleStreamResponseToClient 处理流式响应并直接输出到客户端 func (c *LiushiController) handleStreamResponseToClient(resp *http.Response) error { type StreamResponse struct { ID string `json:"id"` Object string `json:"object"` Created int64 `json:"created"` Model string `json:"model"` Choices []struct { Index int `json:"index"` Delta struct { Role string `json:"role,omitempty"` Content string `json:"content,omitempty"` } `json:"delta"` FinishReason *string `json:"finish_reason"` } `json:"choices"` } fmt.Println("=" + strings.Repeat("=", 80)) fmt.Println("开始处理千问API流式响应(直接转发原始数据块)...") fmt.Println("=" + strings.Repeat("=", 80)) scanner := bufio.NewScanner(resp.Body) charCount := 0 blockCount := 0 fullContent := "" // 保存完整的响应内容 for scanner.Scan() { line := scanner.Text() if line == "" || !strings.HasPrefix(line, "data: ") { continue } data := strings.TrimPrefix(line, "data: ") if data == "[DONE]" { fmt.Println("\n" + strings.Repeat("-", 40)) fmt.Printf("千问API流式结束,总共输出了 %d 个字符\n", charCount) fmt.Println(strings.Repeat("-", 40)) // 发送结束标记 fmt.Fprintf(c.Ctx.ResponseWriter, "data: [DONE]\n\n") c.Ctx.ResponseWriter.Flush() break } var streamResp StreamResponse if err := json.Unmarshal([]byte(data), &streamResp); err != nil { continue } if len(streamResp.Choices) > 0 && streamResp.Choices[0].Delta.Content != "" { // 获取内容块 content := streamResp.Choices[0].Delta.Content charCount += len([]rune(content)) blockCount++ // 保存完整内容 fullContent += content fmt.Printf("收到第 %d 个内容块: '%s' (长度: %d)\n", blockCount, content, len(content)) fmt.Printf("直接转发完整内容块到客户端\n") // *** 关键修改:处理换行符,确保Markdown格式正确 *** // 将换行符替换为\n,确保SSE格式正确传输 escapedContent := strings.ReplaceAll(content, "\n", "\\n") fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", escapedContent) c.Ctx.ResponseWriter.Flush() // 移除逐字符输出和 time.Sleep(10 * time.Millisecond) } // 检查是否完成 if len(streamResp.Choices) > 0 && streamResp.Choices[0].FinishReason != nil { fmt.Println("\n" + strings.Repeat("-", 40)) fmt.Printf("流式完成,总共接收了 %d 个数据块,%d 个字符\n", blockCount, charCount) fmt.Println(strings.Repeat("-", 40)) fmt.Fprintf(c.Ctx.ResponseWriter, "data: [DONE]\n\n") c.Ctx.ResponseWriter.Flush() break } } // 打印完整的流式输出完成结果 c.printStreamCompleteResult(charCount, blockCount, fullContent) return scanner.Err() } // StreamChatWithDB 流式聊天接口(集成数据库操作) func (c *LiushiController) StreamChatWithDB() { // 设置响应头为SSE流式传输 c.Ctx.ResponseWriter.Header().Set("Content-Type", "text/event-stream; charset=utf-8") c.Ctx.ResponseWriter.Header().Set("Cache-Control", "no-cache") c.Ctx.ResponseWriter.Header().Set("Connection", "keep-alive") c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*") c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Headers", "Content-Type") // 从token中获取用户信息 userInfo, err := utils.GetUserInfoFromContext(c.Ctx.Input.GetData("userInfo")) if err != nil { c.Ctx.ResponseWriter.WriteHeader(http.StatusUnauthorized) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"获取用户信息失败: %s\"}\n\n", err.Error()) return } user_id := uint64(userInfo.ID) if user_id == 0 { user_id = 1 } // 获取请求参数 var requestData StreamChatWithDBRequest // 添加调试日志 requestBody := c.Ctx.Input.RequestBody fmt.Printf("🔍 请求体内容: %s\n", string(requestBody)) fmt.Printf("🔍 请求体长度: %d\n", len(requestBody)) fmt.Printf("🔍 Content-Type: %s\n", c.Ctx.Request.Header.Get("Content-Type")) if err := json.Unmarshal(requestBody, &requestData); err != nil { fmt.Printf("❌ JSON解析失败: %v\n", err) c.Ctx.ResponseWriter.WriteHeader(http.StatusBadRequest) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"请求参数解析失败: %s\"}\n\n", err.Error()) return } fmt.Println("流式聊天数据库集成请求数据:", requestData) // 数据库操作(保存用户消息) ai_conversation_id, user_message_id, err := c.saveUserMessage(&requestData, user_id) if err != nil { c.Ctx.ResponseWriter.WriteHeader(http.StatusInternalServerError) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"保存用户消息失败: %s\"}\n\n", err.Error()) return } // 流式输出AI回复(包含初始响应) c.streamAIReply(requestData.Message, user_id, ai_conversation_id, user_message_id, requestData.OnlineSearchContent) } // saveUserMessage 保存用户消息到数据库 func (c *LiushiController) saveUserMessage(requestData *StreamChatWithDBRequest, user_id uint64) (uint64, uint64, error) { userMessage := requestData.Message if user_id == 0 { user_id = 1 } ai_conversation_id := requestData.AIConversationId tx := models.DB.Begin() defer func() { if r := recover(); r != nil { tx.Rollback() } }() // 创建或获取对话 if ai_conversation_id == 0 { ai_conversation := models.AIConversation{ UserId: user_id, Content: userMessage, BusinessType: requestData.BusinessType, ExamName: requestData.ExamName, } if err := tx.Create(&ai_conversation).Error; err != nil { tx.Rollback() return 0, 0, err } ai_conversation_id = uint64(ai_conversation.ID) } // 保存用户消息 ai_message := models.AIMessage{ UserId: user_id, Content: userMessage, Type: "user", AIConversationId: ai_conversation_id, } if err := tx.Create(&ai_message).Error; err != nil { tx.Rollback() return 0, 0, err } tx.Commit() return ai_conversation_id, uint64(ai_message.ID), nil } // sendInitialResponse 发送初始响应(包含数据库ID) func (c *LiushiController) sendInitialResponse(ai_conversation_id, ai_message_id uint64) { initialResponse := map[string]interface{}{ "type": "initial", "ai_conversation_id": ai_conversation_id, "ai_message_id": ai_message_id, "status": "success", } responseJSON, _ := json.Marshal(initialResponse) fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", responseJSON) c.Ctx.ResponseWriter.Flush() } // streamAIReply 流式输出AI回复并保存到数据库 func (c *LiushiController) streamAIReply(userMessage string, user_id, ai_conversation_id, ai_message_id uint64, onlineSearchContent string) { fmt.Println("🚀 ========== 开始流式AI回复流程 ==========") fmt.Printf("📝 用户消息: %s\n", userMessage) fmt.Printf("👤 用户ID: %d\n", user_id) fmt.Printf("💬 对话ID: %d\n", ai_conversation_id) fmt.Printf("📨 消息ID: %d\n", ai_message_id) fmt.Println("🚀 ========================================") // 创建AI回复记录 ai_reply := models.AIMessage{ UserId: user_id, // 使用请求中的用户ID Content: "", // 初始为空,流式完成后更新 Type: "ai", AIConversationId: ai_conversation_id, PrevUserId: ai_message_id, } fmt.Println("📊 创建AI回复记录...") tx := models.DB.Begin() if err := tx.Create(&ai_reply).Error; err != nil { tx.Rollback() fmt.Printf("❌ 创建AI回复记录失败: %v\n", err) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"创建AI回复记录失败: %s\"}\n\n", err.Error()) return } tx.Commit() fmt.Printf("✅ AI回复记录创建成功,ID: %d\n", ai_reply.ID) // 发送初始响应(包含AI消息ID) c.sendInitialResponse(ai_conversation_id, uint64(ai_reply.ID)) // 直接使用RAG流程进行搜索和回答 fmt.Println("🔄 开始处理消息(RAG流程)...") c.processMessageWithRAG(userMessage, &ai_reply, onlineSearchContent) fmt.Println("🎉 ========== 流式AI回复流程完成 ==========") } // processMessageWithRAG 处理消息的完整流程(RAG+数据库更新) func (c *LiushiController) processMessageWithRAG(userMessage string, ai_reply *models.AIMessage, onlineSearchContent string) { fmt.Println("🔍 ========== 开始RAG检索流程 ==========") fmt.Printf("📝 处理消息: %s\n", userMessage) // 直接使用用户消息进行搜索 query := userMessage fmt.Printf("🎯 使用查询: %s\n", query) // 构建搜索请求 searchRequest := map[string]interface{}{ "query": query, "n_results": 25, } fmt.Printf("📦 搜索请求参数: query=%s, n_results=25\n", query) requestBody, err := json.Marshal(searchRequest) if err != nil { fmt.Printf("❌ 搜索请求构建失败: %v\n", err) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索请求构建失败: %s\"}\n\n", err.Error()) return } // 从配置文件中读取搜索API地址 searchAPIURL, err := beego.AppConfig.String("search_api_url") if err != nil || searchAPIURL == "" { fmt.Printf("❌ 配置文件中未找到search_api_url: %v\n", err) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"配置文件中未找到search_api_url: %s\"}\n\n", err.Error()) return } fmt.Printf("🌐 搜索API地址: %s\n", searchAPIURL) // 发送HTTP请求到搜索服务 fmt.Println("📤 发送搜索请求...") req, err := http.NewRequest("POST", searchAPIURL, bytes.NewBuffer(requestBody)) if err != nil { fmt.Printf("❌ 创建搜索请求失败: %v\n", err) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"创建搜索请求失败: %s\"}\n\n", err.Error()) return } req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 30 * time.Second} resp, err := client.Do(req) if err != nil { fmt.Printf("❌ 搜索请求发送失败: %v\n", err) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索请求发送失败: %s\"}\n\n", err.Error()) return } defer resp.Body.Close() fmt.Printf("📥 收到搜索响应,状态码: %d\n", resp.StatusCode) responseBody, err := io.ReadAll(resp.Body) if err != nil { fmt.Printf("❌ 读取搜索结果失败: %v\n", err) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"读取搜索结果失败: %s\"}\n\n", err.Error()) return } if resp.StatusCode != http.StatusOK { fmt.Printf("❌ 搜索API错误: 状态码 %d\n", resp.StatusCode) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索API错误: 状态码 %d\"}\n\n", resp.StatusCode) return } // 解析搜索响应 var searchResponse map[string]interface{} if err := json.Unmarshal(responseBody, &searchResponse); err != nil { fmt.Printf("❌ 解析搜索结果失败: %v\n", err) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"解析搜索结果失败: %s\"}\n\n", err.Error()) return } // 检查响应状态 status, ok := searchResponse["status"].(string) if !ok || status != "success" { message, _ := searchResponse["message"].(string) fmt.Printf("❌ 搜索失败: %s\n", message) fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索失败: %s\"}\n\n", message) return } // 获取搜索结果 results, ok := searchResponse["results"].([]interface{}) if !ok || len(results) == 0 { fmt.Printf("⚠️ 未找到相关文档\n") fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"未找到相关文档\"}\n\n") return } fmt.Printf("✅ 搜索成功,找到 %d 个相关文档\n", len(results)) fmt.Println("🔄 开始流式输出RAG响应...") // 流式输出最终回答并更新数据库 c.streamRAGResponseWithDB(userMessage, results, ai_reply, onlineSearchContent) } // streamRAGResponseWithDB 流式输出RAG响应并更新数据库 func (c *LiushiController) streamRAGResponseWithDB(userMessage string, results []interface{}, ai_reply *models.AIMessage, onlineSearchContent string) { // 将搜索结果转换为JSON字符串作为上下文 contextJSON, err := json.Marshal(results) if err != nil { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"处理搜索结果失败: %s\"}\n\n", err.Error()) return } // 获取历史对话上下文 var historyContext string if ai_reply.AIConversationId > 0 { var historyMessages []models.AIMessage // 获取当前对话的历史消息,按时间排序,排除当前消息 models.DB.Model(&models.AIMessage{}). Where("user_id = ? AND ai_conversation_id = ? AND is_deleted = ? AND id < ?", ai_reply.UserId, ai_reply.AIConversationId, 0, ai_reply.ID). Order("updated_at ASC"). Find(&historyMessages) // 限制为前两轮对话(每轮包含用户消息和AI回复) if len(historyMessages) > 0 { // 计算轮数:每2条消息为1轮(用户消息+AI回复) maxRounds := 2 maxMessages := maxRounds * 2 if len(historyMessages) > maxMessages { historyMessages = historyMessages[len(historyMessages)-maxMessages:] } // 构建历史对话上下文 historyContext = "\n\n# 历史对话上下文\n" for _, msg := range historyMessages { if msg.Type == "user" { historyContext += "用户: " + msg.Content + "\n" } else if msg.Type == "ai" { historyContext += "蜀安AI助手: " + msg.Content + "\n" } } historyContext += "\n" } } finalPrompt := ` ` // 直接流式调用并透传 Markdown 正文(真正的流式输出) fmt.Println("🌊 ========== 开始RAG流式输出 ==========") fmt.Printf("📝 用户问题: %s\n", userMessage) fmt.Printf("📚 检索到文档数量: %d\n", len(results)) var fullContent strings.Builder charCount := 0 blockCount := 0 fmt.Println("🔄 开始流式调用AI模型...") err = c.sendQwen3MessageStreamWithCallback(finalPrompt, func(content string) { fullContent.WriteString(content) charCount += len([]rune(content)) blockCount++ // 发送流式数据到前端 fmt.Printf("📤 流式块 %d: '%s' (长度: %d字符, 累计: %d字符)\n", blockCount, content, len(content), charCount) fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", content) c.Ctx.ResponseWriter.Flush() }) if err != nil { fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"生成最终回答失败: %s\"}\n\n", err.Error()) return } // 发送完成标记 fmt.Fprintf(c.Ctx.ResponseWriter, "data: [DONE]\n\n") c.Ctx.ResponseWriter.Flush() // 更新数据库中的AI回复内容 finalContent := fullContent.String() if err := models.DB.Model(ai_reply).Update("content", finalContent).Error; err != nil { fmt.Printf("更新AI回复内容失败: %v\n", err) } // 打印完成结果 c.printStreamCompleteResult(charCount, blockCount, finalContent) } // sendQwen3MessageStreamWithCallback 带回调的流式方法 func (c *LiushiController) sendQwen3MessageStreamWithCallback(userMessage string, callback func(string)) error { apiURL, err := beego.AppConfig.String("qwen3_api_url") if err != nil || apiURL == "" { return fmt.Errorf("配置文件中未找到qwen3_api_url") } model, err := beego.AppConfig.String("qwen3_model") if err != nil || model == "" { return fmt.Errorf("配置文件中未找到qwen3_model") } qwen3Request := map[string]interface{}{ "model": model, "stream": true, "temperature": 0.7, "messages": []map[string]string{ {"role": "user", "content": userMessage}, }, } requestBody, err := json.Marshal(qwen3Request) if err != nil { return fmt.Errorf("请求序列化失败: %v", err) } req, err := http.NewRequest("POST", apiURL+"/v1/chat/completions", bytes.NewBuffer(requestBody)) if err != nil { return fmt.Errorf("创建HTTP请求失败: %v", err) } req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 600 * time.Second} resp, err := client.Do(req) if err != nil { return fmt.Errorf("请求发送失败: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { responseBody, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("千问API错误: 状态码 %d,读取响应失败: %v", resp.StatusCode, err) } return fmt.Errorf("千问API错误: %s", string(responseBody)) } // 处理流式响应 fmt.Println("📡 开始处理流式响应...") scanner := bufio.NewScanner(resp.Body) lineCount := 0 validDataCount := 0 for scanner.Scan() { line := scanner.Text() lineCount++ if line == "" || !strings.HasPrefix(line, "data: ") { continue } data := strings.TrimPrefix(line, "data: ") if data == "[DONE]" { fmt.Printf("🏁 收到结束标记 [DONE],流式响应结束\n") break } var streamResp struct { Choices []struct { Delta struct { Content string `json:"content,omitempty"` } `json:"delta"` FinishReason *string `json:"finish_reason"` } `json:"choices"` } if err := json.Unmarshal([]byte(data), &streamResp); err != nil { fmt.Printf("⚠️ 解析流式数据失败 (行 %d): %v\n", lineCount, err) continue } if len(streamResp.Choices) > 0 && streamResp.Choices[0].Delta.Content != "" { content := streamResp.Choices[0].Delta.Content validDataCount++ // 处理换行符 escapedContent := strings.ReplaceAll(content, "\n", "\\n") fmt.Printf("📦 流式数据块 %d: '%s' (原始长度: %d)\n", validDataCount, content, len(content)) callback(escapedContent) } if len(streamResp.Choices) > 0 && streamResp.Choices[0].FinishReason != nil { fmt.Printf("🏁 收到完成原因: %s\n", *streamResp.Choices[0].FinishReason) break } } fmt.Printf("📊 流式处理统计: 总行数=%d, 有效数据块=%d\n", lineCount, validDataCount) return scanner.Err() } // printStreamCompleteResult 打印流式输出完成结果 func (c *LiushiController) printStreamCompleteResult(charCount, blockCount int, fullContent string) { fmt.Println("=" + strings.Repeat("=", 80)) fmt.Println("🎉 后端流式输出完成!") fmt.Println("=" + strings.Repeat("=", 80)) fmt.Println("📊 后端统计信息:") fmt.Printf("📝 总字符数: %d\n", charCount) fmt.Printf("📦 数据块数: %d\n", blockCount) fmt.Printf("⏱️ 完成时间: %s\n", time.Now().Format("2006/1/2 下午3:04:05")) fmt.Println("=" + strings.Repeat("=", 80)) fmt.Println("🔍 数据一致性检查:") fmt.Println("请对比前端控制台输出的统计信息,确保数据一致") fmt.Println("=" + strings.Repeat("=", 80)) fmt.Println("✅ 后端流式输出已完全结束,所有数据已发送到客户端") fmt.Println("=" + strings.Repeat("=", 80)) // 打印完整内容 if fullContent != "" { fmt.Println("📄 完整响应内容:") fmt.Println("=" + strings.Repeat("=", 80)) fmt.Println(fullContent) fmt.Println("=" + strings.Repeat("=", 80)) } }