| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003 |
- package controllers
- import (
- "bufio"
- "bytes"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "strings"
- "time"
- "shudao-chat-go/models"
- "shudao-chat-go/utils"
- beego "github.com/beego/beego/v2/server/web"
- )
- // LiushiController 流式接口控制器
- type LiushiController struct {
- beego.Controller
- }
- // StreamRequest 流式请求结构
- type StreamRequest struct {
- Message string `json:"message"`
- Model string `json:"model,omitempty"`
- }
- // StreamChatWithDBRequest 流式聊天数据库集成请求结构(user_id从token中获取)
- type StreamChatWithDBRequest struct {
- Message string `json:"message"`
- AIConversationId uint64 `json:"ai_conversation_id"`
- BusinessType int `json:"business_type"`
- ExamName string `json:"exam_name"`
- AIMessageId uint64 `json:"ai_message_id"`
- OnlineSearchContent string `json:"online_search_content"`
- }
- // StreamResponse 流式响应结构
- type StreamResponse struct {
- ID string `json:"id"`
- Object string `json:"object"`
- Created int64 `json:"created"`
- Model string `json:"model"`
- Choices []struct {
- Index int `json:"index"`
- Delta struct {
- Role string `json:"role,omitempty"`
- Content string `json:"content,omitempty"`
- } `json:"delta"`
- FinishReason *string `json:"finish_reason"`
- } `json:"choices"`
- }
- // StreamChat 流式聊天接口(两层流式输出:RAG检索 -> 流式回答)
- func (c *LiushiController) StreamChat() {
- // 设置响应头为SSE流式传输
- c.Ctx.ResponseWriter.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
- c.Ctx.ResponseWriter.Header().Set("Cache-Control", "no-cache")
- c.Ctx.ResponseWriter.Header().Set("Connection", "keep-alive")
- c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*")
- c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
- c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Headers", "Content-Type")
- // 获取请求参数
- var request StreamRequest
- if err := json.Unmarshal(c.Ctx.Input.RequestBody, &request); err != nil {
- c.Ctx.ResponseWriter.WriteHeader(http.StatusBadRequest)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"请求参数解析失败\"}\n\n")
- return
- }
- if request.Message == "" {
- c.Ctx.ResponseWriter.WriteHeader(http.StatusBadRequest)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"消息内容不能为空\"}\n\n")
- return
- }
- userMessage := request.Message
- // 第一层:意图识别(非流式)
- intentPrompt := `
- `
- // 发送意图识别请求(非流式)
- intentReply, err := c.sendQwen3Message(intentPrompt, false)
- if err != nil {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"意图识别失败: %s\"}\n\n", err.Error())
- return
- }
- fmt.Printf("意图识别回复: %s\n", intentReply)
- var aiResponse map[string]interface{}
- cleanReply := strings.TrimSpace(intentReply)
- cleanReply = strings.TrimPrefix(cleanReply, "```json")
- cleanReply = strings.TrimSuffix(cleanReply, "```")
- cleanReply = strings.TrimSpace(cleanReply)
- if err := json.Unmarshal([]byte(cleanReply), &aiResponse); err != nil {
- // 如果解析失败,可能是AI直接返回了文本格式(greeting、faq)
- fmt.Printf("JSON解析失败,AI返回了文本格式回复: %s\n", intentReply)
- // 直接流式输出AI的原始回复
- c.streamTextResponse(intentReply)
- return
- }
- intent, ok := aiResponse["intent"].(string)
- if !ok {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"意图解析失败\"}\n\n")
- return
- }
- // 根据intent类型决定处理方式
- if intent == "greeting" || intent == "faq" {
- // 对于greeting、faq,直接流式输出
- if directAnswer, exists := aiResponse["direct_answer"].(string); exists && directAnswer != "" {
- c.streamTextResponse(intentReply)
- } else {
- c.streamTextResponse(intentReply)
- }
- return
- }
- // 第二层:RAG检索(query_knowledge_base)
- if intent == "query_knowledge_base" {
- searchQueries, ok := aiResponse["search_queries"].([]interface{})
- if !ok || len(searchQueries) == 0 {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"未找到有效的查询内容\"}\n\n")
- return
- }
- // 使用第一个查询进行搜索
- if len(searchQueries) > 0 {
- query := searchQueries[0].(string)
- // 构建搜索请求
- searchRequest := map[string]interface{}{
- "query": query,
- "n_results": 25,
- }
- requestBody, err := json.Marshal(searchRequest)
- if err != nil {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索请求构建失败\"}\n\n")
- return
- }
- // 从配置文件中读取搜索API地址
- searchAPIURL, err := beego.AppConfig.String("search_api_url")
- if err != nil || searchAPIURL == "" {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"配置文件中未找到search_api_url\"}\n\n")
- return
- }
- // 发送HTTP请求到搜索服务
- req, err := http.NewRequest("POST", searchAPIURL, bytes.NewBuffer(requestBody))
- if err != nil {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"创建搜索请求失败\"}\n\n")
- return
- }
- req.Header.Set("Content-Type", "application/json")
- client := &http.Client{Timeout: 30 * time.Second}
- resp, err := client.Do(req)
- if err != nil {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索请求发送失败: %s\"}\n\n", err.Error())
- return
- }
- defer resp.Body.Close()
- responseBody, err := io.ReadAll(resp.Body)
- if err != nil {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"读取搜索结果失败\"}\n\n")
- return
- }
- if resp.StatusCode != http.StatusOK {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索API错误: 状态码 %d\"}\n\n", resp.StatusCode)
- return
- }
- // 解析搜索响应
- var searchResponse map[string]interface{}
- if err := json.Unmarshal(responseBody, &searchResponse); err != nil {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"解析搜索结果失败\"}\n\n")
- return
- }
- // 检查响应状态
- status, ok := searchResponse["status"].(string)
- if !ok || status != "success" {
- message, _ := searchResponse["message"].(string)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索失败: %s\"}\n\n", message)
- return
- }
- // 获取搜索结果
- results, ok := searchResponse["results"].([]interface{})
- if !ok || len(results) == 0 {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"未找到相关文档\"}\n\n")
- return
- }
- // 第三层:流式输出最终回答
- c.streamRAGResponse(userMessage, results)
- }
- }
- }
- // streamTextResponse 流式输出文本响应
- func (c *LiushiController) streamTextResponse(text string) {
- fmt.Println("=" + strings.Repeat("=", 80))
- fmt.Println("开始流式输出文本响应...")
- fmt.Printf("文本长度: %d 字符\n", len(text))
- fmt.Println("=" + strings.Repeat("=", 80))
- // 如果文本较短(小于200字符),直接发送完整文本
- if len(text) < 200 {
- fmt.Printf("文本较短,直接发送完整内容\n")
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", text)
- c.Ctx.ResponseWriter.Flush()
- } else {
- // 文本较长,按块发送以模拟流式效果
- fmt.Printf("文本较长,按块发送(每块50字符)\n")
- c.sendTextInChunks(text, 50)
- }
- // 结束标记
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: [DONE]\n\n")
- c.Ctx.ResponseWriter.Flush()
- // 计算数据块数
- chunkCount := 1
- if len(text) >= 200 {
- chunkCount = (len(text) + 49) / 50 // 向上取整
- }
- // 打印完整的流式输出完成结果
- c.printStreamCompleteResult(len(text), chunkCount, text)
- }
- // sendTextInChunks 按块发送文本以模拟流式效果
- func (c *LiushiController) sendTextInChunks(text string, chunkSize int) int {
- runes := []rune(text)
- chunkCount := 0
- for i := 0; i < len(runes); i += chunkSize {
- end := i + chunkSize
- if end > len(runes) {
- end = len(runes)
- }
- chunk := string(runes[i:end])
- chunkCount++
- fmt.Printf("发送第 %d 块: '%s' (长度: %d)\n", chunkCount, chunk, len(chunk))
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", chunk)
- c.Ctx.ResponseWriter.Flush()
- time.Sleep(50 * time.Millisecond) // 调整延迟以控制速度
- }
- fmt.Printf("总共发送了 %d 个数据块\n", chunkCount)
- return chunkCount
- }
- // streamRAGResponse 流式输出RAG响应
- func (c *LiushiController) streamRAGResponse(userMessage string, results []interface{}) {
- // 将搜索结果转换为JSON字符串作为上下文
- contextJSON, err := json.Marshal(results)
- if err != nil {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"处理搜索结果失败: %s\"}\n\n", err.Error())
- return
- }
- // 构建最终回答的提示词(内容与原先 natural_language_answer 一致,但仅输出该段纯文本)
- finalPrompt := `
- `
- // 直接流式调用并透传 Markdown 正文(真正的流式输出)
- err = c.sendQwen3MessageStream(finalPrompt)
- if err != nil {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"生成最终回答失败: %s\"}\n\n", err.Error())
- return
- }
- }
- // sendQwen3MessageStream 发送消息到千问模型并直接流式输出到客户端
- func (c *LiushiController) sendQwen3MessageStream(userMessage string) error {
- apiURL, err := beego.AppConfig.String("qwen3_api_url")
- if err != nil || apiURL == "" {
- return fmt.Errorf("配置文件中未找到qwen3_api_url")
- }
- model, err := beego.AppConfig.String("qwen3_model")
- if err != nil || model == "" {
- return fmt.Errorf("配置文件中未找到qwen3_model")
- }
- qwen3Request := map[string]interface{}{
- "model": model,
- "stream": true,
- "temperature": 0.7,
- "messages": []map[string]string{
- {"role": "user", "content": userMessage},
- },
- }
- requestBody, err := json.Marshal(qwen3Request)
- if err != nil {
- return fmt.Errorf("请求序列化失败: %v", err)
- }
- req, err := http.NewRequest("POST", apiURL+"/v1/chat/completions", bytes.NewBuffer(requestBody))
- if err != nil {
- return fmt.Errorf("创建HTTP请求失败: %v", err)
- }
- req.Header.Set("Content-Type", "application/json")
- client := &http.Client{Timeout: 600 * time.Second}
- resp, err := client.Do(req)
- if err != nil {
- return fmt.Errorf("请求发送失败: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- responseBody, err := io.ReadAll(resp.Body)
- if err != nil {
- return fmt.Errorf("千问API错误: 状态码 %d,读取响应失败: %v", resp.StatusCode, err)
- }
- return fmt.Errorf("千问API错误: %s", string(responseBody))
- }
- // 直接流式处理并输出到客户端
- return c.handleStreamResponseToClient(resp)
- }
- // sendQwen3Message 发送消息到千问模型
- func (c *LiushiController) sendQwen3Message(userMessage string, useStream bool) (string, error) {
- apiURL, err := beego.AppConfig.String("qwen3_api_url")
- if err != nil || apiURL == "" {
- return "", fmt.Errorf("配置文件中未找到qwen3_api_url")
- }
- model, err := beego.AppConfig.String("qwen3_model")
- if err != nil || model == "" {
- return "", fmt.Errorf("配置文件中未找到qwen3_model")
- }
- qwen3Request := map[string]interface{}{
- "model": model,
- "stream": useStream,
- "temperature": 0.7,
- "messages": []map[string]string{
- {"role": "user", "content": userMessage},
- },
- }
- requestBody, err := json.Marshal(qwen3Request)
- if err != nil {
- return "", fmt.Errorf("请求序列化失败: %v", err)
- }
- req, err := http.NewRequest("POST", apiURL+"/v1/chat/completions", bytes.NewBuffer(requestBody))
- if err != nil {
- return "", fmt.Errorf("创建HTTP请求失败: %v", err)
- }
- req.Header.Set("Content-Type", "application/json")
- client := &http.Client{Timeout: 600 * time.Second}
- resp, err := client.Do(req)
- if err != nil {
- return "", fmt.Errorf("请求发送失败: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- responseBody, err := io.ReadAll(resp.Body)
- if err != nil {
- return "", fmt.Errorf("千问API错误: 状态码 %d,读取响应失败: %v", resp.StatusCode, err)
- }
- return "", fmt.Errorf("千问API错误: %s", string(responseBody))
- }
- if useStream {
- // 流式响应处理
- type StreamResponse struct {
- ID string `json:"id"`
- Object string `json:"object"`
- Created int64 `json:"created"`
- Model string `json:"model"`
- Choices []struct {
- Index int `json:"index"`
- Delta struct {
- Role string `json:"role,omitempty"`
- Content string `json:"content,omitempty"`
- } `json:"delta"`
- FinishReason *string `json:"finish_reason"`
- } `json:"choices"`
- }
- var fullContent strings.Builder
- scanner := bufio.NewScanner(resp.Body)
- for scanner.Scan() {
- line := scanner.Text()
- if line == "" || !strings.HasPrefix(line, "data: ") {
- continue
- }
- data := strings.TrimPrefix(line, "data: ")
- if data == "[DONE]" {
- break
- }
- var streamResp StreamResponse
- if err := json.Unmarshal([]byte(data), &streamResp); err != nil {
- continue
- }
- if len(streamResp.Choices) > 0 && streamResp.Choices[0].Delta.Content != "" {
- fullContent.WriteString(streamResp.Choices[0].Delta.Content)
- }
- }
- return fullContent.String(), nil
- } else {
- // 非流式响应处理
- type NonStreamResponse struct {
- Choices []struct {
- Message struct {
- Content string `json:"content"`
- } `json:"message"`
- } `json:"choices"`
- }
- responseBody, err := io.ReadAll(resp.Body)
- if err != nil {
- return "", fmt.Errorf("读取响应失败: %v", err)
- }
- var response NonStreamResponse
- if err := json.Unmarshal(responseBody, &response); err != nil {
- return "", fmt.Errorf("解析响应失败: %v", err)
- }
- if len(response.Choices) > 0 {
- return response.Choices[0].Message.Content, nil
- }
- return "", fmt.Errorf("未找到有效响应")
- }
- }
- // handleStreamResponseToClient 处理流式响应并直接输出到客户端
- func (c *LiushiController) handleStreamResponseToClient(resp *http.Response) error {
- type StreamResponse struct {
- ID string `json:"id"`
- Object string `json:"object"`
- Created int64 `json:"created"`
- Model string `json:"model"`
- Choices []struct {
- Index int `json:"index"`
- Delta struct {
- Role string `json:"role,omitempty"`
- Content string `json:"content,omitempty"`
- } `json:"delta"`
- FinishReason *string `json:"finish_reason"`
- } `json:"choices"`
- }
- fmt.Println("=" + strings.Repeat("=", 80))
- fmt.Println("开始处理千问API流式响应(直接转发原始数据块)...")
- fmt.Println("=" + strings.Repeat("=", 80))
- scanner := bufio.NewScanner(resp.Body)
- charCount := 0
- blockCount := 0
- fullContent := "" // 保存完整的响应内容
- for scanner.Scan() {
- line := scanner.Text()
- if line == "" || !strings.HasPrefix(line, "data: ") {
- continue
- }
- data := strings.TrimPrefix(line, "data: ")
- if data == "[DONE]" {
- fmt.Println("\n" + strings.Repeat("-", 40))
- fmt.Printf("千问API流式结束,总共输出了 %d 个字符\n", charCount)
- fmt.Println(strings.Repeat("-", 40))
- // 发送结束标记
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: [DONE]\n\n")
- c.Ctx.ResponseWriter.Flush()
- break
- }
- var streamResp StreamResponse
- if err := json.Unmarshal([]byte(data), &streamResp); err != nil {
- continue
- }
- if len(streamResp.Choices) > 0 && streamResp.Choices[0].Delta.Content != "" {
- // 获取内容块
- content := streamResp.Choices[0].Delta.Content
- charCount += len([]rune(content))
- blockCount++
- // 保存完整内容
- fullContent += content
- fmt.Printf("收到第 %d 个内容块: '%s' (长度: %d)\n", blockCount, content, len(content))
- fmt.Printf("直接转发完整内容块到客户端\n")
- // *** 关键修改:处理换行符,确保Markdown格式正确 ***
- // 将换行符替换为\n,确保SSE格式正确传输
- escapedContent := strings.ReplaceAll(content, "\n", "\\n")
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", escapedContent)
- c.Ctx.ResponseWriter.Flush()
- // 移除逐字符输出和 time.Sleep(10 * time.Millisecond)
- }
- // 检查是否完成
- if len(streamResp.Choices) > 0 && streamResp.Choices[0].FinishReason != nil {
- fmt.Println("\n" + strings.Repeat("-", 40))
- fmt.Printf("流式完成,总共接收了 %d 个数据块,%d 个字符\n", blockCount, charCount)
- fmt.Println(strings.Repeat("-", 40))
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: [DONE]\n\n")
- c.Ctx.ResponseWriter.Flush()
- break
- }
- }
- // 打印完整的流式输出完成结果
- c.printStreamCompleteResult(charCount, blockCount, fullContent)
- return scanner.Err()
- }
- // StreamChatWithDB 流式聊天接口(集成数据库操作)
- func (c *LiushiController) StreamChatWithDB() {
- // 设置响应头为SSE流式传输
- c.Ctx.ResponseWriter.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
- c.Ctx.ResponseWriter.Header().Set("Cache-Control", "no-cache")
- c.Ctx.ResponseWriter.Header().Set("Connection", "keep-alive")
- c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*")
- c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
- c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Headers", "Content-Type")
- // 从token中获取用户信息
- userInfo, err := utils.GetUserInfoFromContext(c.Ctx.Input.GetData("userInfo"))
- if err != nil {
- c.Ctx.ResponseWriter.WriteHeader(http.StatusUnauthorized)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"获取用户信息失败: %s\"}\n\n", err.Error())
- return
- }
- user_id := uint64(userInfo.ID)
- if user_id == 0 {
- user_id = 1
- }
- // 获取请求参数
- var requestData StreamChatWithDBRequest
- // 添加调试日志
- requestBody := c.Ctx.Input.RequestBody
- fmt.Printf("🔍 请求体内容: %s\n", string(requestBody))
- fmt.Printf("🔍 请求体长度: %d\n", len(requestBody))
- fmt.Printf("🔍 Content-Type: %s\n", c.Ctx.Request.Header.Get("Content-Type"))
- if err := json.Unmarshal(requestBody, &requestData); err != nil {
- fmt.Printf("❌ JSON解析失败: %v\n", err)
- c.Ctx.ResponseWriter.WriteHeader(http.StatusBadRequest)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"请求参数解析失败: %s\"}\n\n", err.Error())
- return
- }
- fmt.Println("流式聊天数据库集成请求数据:", requestData)
- // 数据库操作(保存用户消息)
- ai_conversation_id, user_message_id, err := c.saveUserMessage(&requestData, user_id)
- if err != nil {
- c.Ctx.ResponseWriter.WriteHeader(http.StatusInternalServerError)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"保存用户消息失败: %s\"}\n\n", err.Error())
- return
- }
- // 流式输出AI回复(包含初始响应)
- c.streamAIReply(requestData.Message, user_id, ai_conversation_id, user_message_id, requestData.OnlineSearchContent)
- }
- // saveUserMessage 保存用户消息到数据库
- func (c *LiushiController) saveUserMessage(requestData *StreamChatWithDBRequest, user_id uint64) (uint64, uint64, error) {
- userMessage := requestData.Message
- if user_id == 0 {
- user_id = 1
- }
- ai_conversation_id := requestData.AIConversationId
- tx := models.DB.Begin()
- defer func() {
- if r := recover(); r != nil {
- tx.Rollback()
- }
- }()
- // 创建或获取对话
- if ai_conversation_id == 0 {
- ai_conversation := models.AIConversation{
- UserId: user_id,
- Content: userMessage,
- BusinessType: requestData.BusinessType,
- ExamName: requestData.ExamName,
- }
- if err := tx.Create(&ai_conversation).Error; err != nil {
- tx.Rollback()
- return 0, 0, err
- }
- ai_conversation_id = uint64(ai_conversation.ID)
- }
- // 保存用户消息
- ai_message := models.AIMessage{
- UserId: user_id,
- Content: userMessage,
- Type: "user",
- AIConversationId: ai_conversation_id,
- }
- if err := tx.Create(&ai_message).Error; err != nil {
- tx.Rollback()
- return 0, 0, err
- }
- tx.Commit()
- return ai_conversation_id, uint64(ai_message.ID), nil
- }
- // sendInitialResponse 发送初始响应(包含数据库ID)
- func (c *LiushiController) sendInitialResponse(ai_conversation_id, ai_message_id uint64) {
- initialResponse := map[string]interface{}{
- "type": "initial",
- "ai_conversation_id": ai_conversation_id,
- "ai_message_id": ai_message_id,
- "status": "success",
- }
- responseJSON, _ := json.Marshal(initialResponse)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", responseJSON)
- c.Ctx.ResponseWriter.Flush()
- }
- // streamAIReply 流式输出AI回复并保存到数据库
- func (c *LiushiController) streamAIReply(userMessage string, user_id, ai_conversation_id, ai_message_id uint64, onlineSearchContent string) {
- fmt.Println("🚀 ========== 开始流式AI回复流程 ==========")
- fmt.Printf("📝 用户消息: %s\n", userMessage)
- fmt.Printf("👤 用户ID: %d\n", user_id)
- fmt.Printf("💬 对话ID: %d\n", ai_conversation_id)
- fmt.Printf("📨 消息ID: %d\n", ai_message_id)
- fmt.Println("🚀 ========================================")
- // 创建AI回复记录
- ai_reply := models.AIMessage{
- UserId: user_id, // 使用请求中的用户ID
- Content: "", // 初始为空,流式完成后更新
- Type: "ai",
- AIConversationId: ai_conversation_id,
- PrevUserId: ai_message_id,
- }
- fmt.Println("📊 创建AI回复记录...")
- tx := models.DB.Begin()
- if err := tx.Create(&ai_reply).Error; err != nil {
- tx.Rollback()
- fmt.Printf("❌ 创建AI回复记录失败: %v\n", err)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"创建AI回复记录失败: %s\"}\n\n", err.Error())
- return
- }
- tx.Commit()
- fmt.Printf("✅ AI回复记录创建成功,ID: %d\n", ai_reply.ID)
- // 发送初始响应(包含AI消息ID)
- c.sendInitialResponse(ai_conversation_id, uint64(ai_reply.ID))
- // 直接使用RAG流程进行搜索和回答
- fmt.Println("🔄 开始处理消息(RAG流程)...")
- c.processMessageWithRAG(userMessage, &ai_reply, onlineSearchContent)
- fmt.Println("🎉 ========== 流式AI回复流程完成 ==========")
- }
- // processMessageWithRAG 处理消息的完整流程(RAG+数据库更新)
- func (c *LiushiController) processMessageWithRAG(userMessage string, ai_reply *models.AIMessage, onlineSearchContent string) {
- fmt.Println("🔍 ========== 开始RAG检索流程 ==========")
- fmt.Printf("📝 处理消息: %s\n", userMessage)
- // 直接使用用户消息进行搜索
- query := userMessage
- fmt.Printf("🎯 使用查询: %s\n", query)
- // 构建搜索请求
- searchRequest := map[string]interface{}{
- "query": query,
- "n_results": 25,
- }
- fmt.Printf("📦 搜索请求参数: query=%s, n_results=25\n", query)
- requestBody, err := json.Marshal(searchRequest)
- if err != nil {
- fmt.Printf("❌ 搜索请求构建失败: %v\n", err)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索请求构建失败: %s\"}\n\n", err.Error())
- return
- }
- // 从配置文件中读取搜索API地址
- searchAPIURL, err := beego.AppConfig.String("search_api_url")
- if err != nil || searchAPIURL == "" {
- fmt.Printf("❌ 配置文件中未找到search_api_url: %v\n", err)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"配置文件中未找到search_api_url: %s\"}\n\n", err.Error())
- return
- }
- fmt.Printf("🌐 搜索API地址: %s\n", searchAPIURL)
- // 发送HTTP请求到搜索服务
- fmt.Println("📤 发送搜索请求...")
- req, err := http.NewRequest("POST", searchAPIURL, bytes.NewBuffer(requestBody))
- if err != nil {
- fmt.Printf("❌ 创建搜索请求失败: %v\n", err)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"创建搜索请求失败: %s\"}\n\n", err.Error())
- return
- }
- req.Header.Set("Content-Type", "application/json")
- client := &http.Client{Timeout: 30 * time.Second}
- resp, err := client.Do(req)
- if err != nil {
- fmt.Printf("❌ 搜索请求发送失败: %v\n", err)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索请求发送失败: %s\"}\n\n", err.Error())
- return
- }
- defer resp.Body.Close()
- fmt.Printf("📥 收到搜索响应,状态码: %d\n", resp.StatusCode)
- responseBody, err := io.ReadAll(resp.Body)
- if err != nil {
- fmt.Printf("❌ 读取搜索结果失败: %v\n", err)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"读取搜索结果失败: %s\"}\n\n", err.Error())
- return
- }
- if resp.StatusCode != http.StatusOK {
- fmt.Printf("❌ 搜索API错误: 状态码 %d\n", resp.StatusCode)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索API错误: 状态码 %d\"}\n\n", resp.StatusCode)
- return
- }
- // 解析搜索响应
- var searchResponse map[string]interface{}
- if err := json.Unmarshal(responseBody, &searchResponse); err != nil {
- fmt.Printf("❌ 解析搜索结果失败: %v\n", err)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"解析搜索结果失败: %s\"}\n\n", err.Error())
- return
- }
- // 检查响应状态
- status, ok := searchResponse["status"].(string)
- if !ok || status != "success" {
- message, _ := searchResponse["message"].(string)
- fmt.Printf("❌ 搜索失败: %s\n", message)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索失败: %s\"}\n\n", message)
- return
- }
- // 获取搜索结果
- results, ok := searchResponse["results"].([]interface{})
- if !ok || len(results) == 0 {
- fmt.Printf("⚠️ 未找到相关文档\n")
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"未找到相关文档\"}\n\n")
- return
- }
- fmt.Printf("✅ 搜索成功,找到 %d 个相关文档\n", len(results))
- fmt.Println("🔄 开始流式输出RAG响应...")
- // 流式输出最终回答并更新数据库
- c.streamRAGResponseWithDB(userMessage, results, ai_reply, onlineSearchContent)
- }
- // streamRAGResponseWithDB 流式输出RAG响应并更新数据库
- func (c *LiushiController) streamRAGResponseWithDB(userMessage string, results []interface{}, ai_reply *models.AIMessage, onlineSearchContent string) {
- // 将搜索结果转换为JSON字符串作为上下文
- contextJSON, err := json.Marshal(results)
- if err != nil {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"处理搜索结果失败: %s\"}\n\n", err.Error())
- return
- }
- // 获取历史对话上下文
- var historyContext string
- if ai_reply.AIConversationId > 0 {
- var historyMessages []models.AIMessage
- // 获取当前对话的历史消息,按时间排序,排除当前消息
- models.DB.Model(&models.AIMessage{}).
- Where("user_id = ? AND ai_conversation_id = ? AND is_deleted = ? AND id < ?",
- ai_reply.UserId, ai_reply.AIConversationId, 0, ai_reply.ID).
- Order("updated_at ASC").
- Find(&historyMessages)
- // 限制为前两轮对话(每轮包含用户消息和AI回复)
- if len(historyMessages) > 0 {
- // 计算轮数:每2条消息为1轮(用户消息+AI回复)
- maxRounds := 2
- maxMessages := maxRounds * 2
- if len(historyMessages) > maxMessages {
- historyMessages = historyMessages[len(historyMessages)-maxMessages:]
- }
- // 构建历史对话上下文
- historyContext = "\n\n# 历史对话上下文\n"
- for _, msg := range historyMessages {
- if msg.Type == "user" {
- historyContext += "用户: " + msg.Content + "\n"
- } else if msg.Type == "ai" {
- historyContext += "蜀安AI助手: " + msg.Content + "\n"
- }
- }
- historyContext += "\n"
- }
- }
- finalPrompt := `
- `
- // 直接流式调用并透传 Markdown 正文(真正的流式输出)
- fmt.Println("🌊 ========== 开始RAG流式输出 ==========")
- fmt.Printf("📝 用户问题: %s\n", userMessage)
- fmt.Printf("📚 检索到文档数量: %d\n", len(results))
- var fullContent strings.Builder
- charCount := 0
- blockCount := 0
- fmt.Println("🔄 开始流式调用AI模型...")
- err = c.sendQwen3MessageStreamWithCallback(finalPrompt, func(content string) {
- fullContent.WriteString(content)
- charCount += len([]rune(content))
- blockCount++
- // 发送流式数据到前端
- fmt.Printf("📤 流式块 %d: '%s' (长度: %d字符, 累计: %d字符)\n",
- blockCount, content, len(content), charCount)
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", content)
- c.Ctx.ResponseWriter.Flush()
- })
- if err != nil {
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"生成最终回答失败: %s\"}\n\n", err.Error())
- return
- }
- // 发送完成标记
- fmt.Fprintf(c.Ctx.ResponseWriter, "data: [DONE]\n\n")
- c.Ctx.ResponseWriter.Flush()
- // 更新数据库中的AI回复内容
- finalContent := fullContent.String()
- if err := models.DB.Model(ai_reply).Update("content", finalContent).Error; err != nil {
- fmt.Printf("更新AI回复内容失败: %v\n", err)
- }
- // 打印完成结果
- c.printStreamCompleteResult(charCount, blockCount, finalContent)
- }
- // sendQwen3MessageStreamWithCallback 带回调的流式方法
- func (c *LiushiController) sendQwen3MessageStreamWithCallback(userMessage string, callback func(string)) error {
- apiURL, err := beego.AppConfig.String("qwen3_api_url")
- if err != nil || apiURL == "" {
- return fmt.Errorf("配置文件中未找到qwen3_api_url")
- }
- model, err := beego.AppConfig.String("qwen3_model")
- if err != nil || model == "" {
- return fmt.Errorf("配置文件中未找到qwen3_model")
- }
- qwen3Request := map[string]interface{}{
- "model": model,
- "stream": true,
- "temperature": 0.7,
- "messages": []map[string]string{
- {"role": "user", "content": userMessage},
- },
- }
- requestBody, err := json.Marshal(qwen3Request)
- if err != nil {
- return fmt.Errorf("请求序列化失败: %v", err)
- }
- req, err := http.NewRequest("POST", apiURL+"/v1/chat/completions", bytes.NewBuffer(requestBody))
- if err != nil {
- return fmt.Errorf("创建HTTP请求失败: %v", err)
- }
- req.Header.Set("Content-Type", "application/json")
- client := &http.Client{Timeout: 600 * time.Second}
- resp, err := client.Do(req)
- if err != nil {
- return fmt.Errorf("请求发送失败: %v", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- responseBody, err := io.ReadAll(resp.Body)
- if err != nil {
- return fmt.Errorf("千问API错误: 状态码 %d,读取响应失败: %v", resp.StatusCode, err)
- }
- return fmt.Errorf("千问API错误: %s", string(responseBody))
- }
- // 处理流式响应
- fmt.Println("📡 开始处理流式响应...")
- scanner := bufio.NewScanner(resp.Body)
- lineCount := 0
- validDataCount := 0
- for scanner.Scan() {
- line := scanner.Text()
- lineCount++
- if line == "" || !strings.HasPrefix(line, "data: ") {
- continue
- }
- data := strings.TrimPrefix(line, "data: ")
- if data == "[DONE]" {
- fmt.Printf("🏁 收到结束标记 [DONE],流式响应结束\n")
- break
- }
- var streamResp struct {
- Choices []struct {
- Delta struct {
- Content string `json:"content,omitempty"`
- } `json:"delta"`
- FinishReason *string `json:"finish_reason"`
- } `json:"choices"`
- }
- if err := json.Unmarshal([]byte(data), &streamResp); err != nil {
- fmt.Printf("⚠️ 解析流式数据失败 (行 %d): %v\n", lineCount, err)
- continue
- }
- if len(streamResp.Choices) > 0 && streamResp.Choices[0].Delta.Content != "" {
- content := streamResp.Choices[0].Delta.Content
- validDataCount++
- // 处理换行符
- escapedContent := strings.ReplaceAll(content, "\n", "\\n")
- fmt.Printf("📦 流式数据块 %d: '%s' (原始长度: %d)\n", validDataCount, content, len(content))
- callback(escapedContent)
- }
- if len(streamResp.Choices) > 0 && streamResp.Choices[0].FinishReason != nil {
- fmt.Printf("🏁 收到完成原因: %s\n", *streamResp.Choices[0].FinishReason)
- break
- }
- }
- fmt.Printf("📊 流式处理统计: 总行数=%d, 有效数据块=%d\n", lineCount, validDataCount)
- return scanner.Err()
- }
- // printStreamCompleteResult 打印流式输出完成结果
- func (c *LiushiController) printStreamCompleteResult(charCount, blockCount int, fullContent string) {
- fmt.Println("=" + strings.Repeat("=", 80))
- fmt.Println("🎉 后端流式输出完成!")
- fmt.Println("=" + strings.Repeat("=", 80))
- fmt.Println("📊 后端统计信息:")
- fmt.Printf("📝 总字符数: %d\n", charCount)
- fmt.Printf("📦 数据块数: %d\n", blockCount)
- fmt.Printf("⏱️ 完成时间: %s\n", time.Now().Format("2006/1/2 下午3:04:05"))
- fmt.Println("=" + strings.Repeat("=", 80))
- fmt.Println("🔍 数据一致性检查:")
- fmt.Println("请对比前端控制台输出的统计信息,确保数据一致")
- fmt.Println("=" + strings.Repeat("=", 80))
- fmt.Println("✅ 后端流式输出已完全结束,所有数据已发送到客户端")
- fmt.Println("=" + strings.Repeat("=", 80))
- // 打印完整内容
- if fullContent != "" {
- fmt.Println("📄 完整响应内容:")
- fmt.Println("=" + strings.Repeat("=", 80))
- fmt.Println(fullContent)
- fmt.Println("=" + strings.Repeat("=", 80))
- }
- }
|