liushi.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003
  1. package controllers
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "strings"
  10. "time"
  11. "shudao-chat-go/models"
  12. "shudao-chat-go/utils"
  13. beego "github.com/beego/beego/v2/server/web"
  14. )
  15. // LiushiController 流式接口控制器
  16. type LiushiController struct {
  17. beego.Controller
  18. }
  19. // StreamRequest 流式请求结构
  20. type StreamRequest struct {
  21. Message string `json:"message"`
  22. Model string `json:"model,omitempty"`
  23. }
  24. // StreamChatWithDBRequest 流式聊天数据库集成请求结构(user_id从token中获取)
  25. type StreamChatWithDBRequest struct {
  26. Message string `json:"message"`
  27. AIConversationId uint64 `json:"ai_conversation_id"`
  28. BusinessType int `json:"business_type"`
  29. ExamName string `json:"exam_name"`
  30. AIMessageId uint64 `json:"ai_message_id"`
  31. OnlineSearchContent string `json:"online_search_content"`
  32. }
  33. // StreamResponse 流式响应结构
  34. type StreamResponse struct {
  35. ID string `json:"id"`
  36. Object string `json:"object"`
  37. Created int64 `json:"created"`
  38. Model string `json:"model"`
  39. Choices []struct {
  40. Index int `json:"index"`
  41. Delta struct {
  42. Role string `json:"role,omitempty"`
  43. Content string `json:"content,omitempty"`
  44. } `json:"delta"`
  45. FinishReason *string `json:"finish_reason"`
  46. } `json:"choices"`
  47. }
  48. // StreamChat 流式聊天接口(两层流式输出:RAG检索 -> 流式回答)
  49. func (c *LiushiController) StreamChat() {
  50. // 设置响应头为SSE流式传输
  51. c.Ctx.ResponseWriter.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
  52. c.Ctx.ResponseWriter.Header().Set("Cache-Control", "no-cache")
  53. c.Ctx.ResponseWriter.Header().Set("Connection", "keep-alive")
  54. c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*")
  55. c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
  56. c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  57. // 获取请求参数
  58. var request StreamRequest
  59. if err := json.Unmarshal(c.Ctx.Input.RequestBody, &request); err != nil {
  60. c.Ctx.ResponseWriter.WriteHeader(http.StatusBadRequest)
  61. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"请求参数解析失败\"}\n\n")
  62. return
  63. }
  64. if request.Message == "" {
  65. c.Ctx.ResponseWriter.WriteHeader(http.StatusBadRequest)
  66. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"消息内容不能为空\"}\n\n")
  67. return
  68. }
  69. userMessage := request.Message
  70. // 第一层:意图识别(非流式)
  71. intentPrompt := `
  72. `
  73. // 发送意图识别请求(非流式)
  74. intentReply, err := c.sendQwen3Message(intentPrompt, false)
  75. if err != nil {
  76. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"意图识别失败: %s\"}\n\n", err.Error())
  77. return
  78. }
  79. fmt.Printf("意图识别回复: %s\n", intentReply)
  80. var aiResponse map[string]interface{}
  81. cleanReply := strings.TrimSpace(intentReply)
  82. cleanReply = strings.TrimPrefix(cleanReply, "```json")
  83. cleanReply = strings.TrimSuffix(cleanReply, "```")
  84. cleanReply = strings.TrimSpace(cleanReply)
  85. if err := json.Unmarshal([]byte(cleanReply), &aiResponse); err != nil {
  86. // 如果解析失败,可能是AI直接返回了文本格式(greeting、faq)
  87. fmt.Printf("JSON解析失败,AI返回了文本格式回复: %s\n", intentReply)
  88. // 直接流式输出AI的原始回复
  89. c.streamTextResponse(intentReply)
  90. return
  91. }
  92. intent, ok := aiResponse["intent"].(string)
  93. if !ok {
  94. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"意图解析失败\"}\n\n")
  95. return
  96. }
  97. // 根据intent类型决定处理方式
  98. if intent == "greeting" || intent == "faq" {
  99. // 对于greeting、faq,直接流式输出
  100. if directAnswer, exists := aiResponse["direct_answer"].(string); exists && directAnswer != "" {
  101. c.streamTextResponse(intentReply)
  102. } else {
  103. c.streamTextResponse(intentReply)
  104. }
  105. return
  106. }
  107. // 第二层:RAG检索(query_knowledge_base)
  108. if intent == "query_knowledge_base" {
  109. searchQueries, ok := aiResponse["search_queries"].([]interface{})
  110. if !ok || len(searchQueries) == 0 {
  111. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"未找到有效的查询内容\"}\n\n")
  112. return
  113. }
  114. // 使用第一个查询进行搜索
  115. if len(searchQueries) > 0 {
  116. query := searchQueries[0].(string)
  117. // 构建搜索请求
  118. searchRequest := map[string]interface{}{
  119. "query": query,
  120. "n_results": 25,
  121. }
  122. requestBody, err := json.Marshal(searchRequest)
  123. if err != nil {
  124. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索请求构建失败\"}\n\n")
  125. return
  126. }
  127. // 从配置文件中读取搜索API地址
  128. searchAPIURL, err := beego.AppConfig.String("search_api_url")
  129. if err != nil || searchAPIURL == "" {
  130. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"配置文件中未找到search_api_url\"}\n\n")
  131. return
  132. }
  133. // 发送HTTP请求到搜索服务
  134. req, err := http.NewRequest("POST", searchAPIURL, bytes.NewBuffer(requestBody))
  135. if err != nil {
  136. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"创建搜索请求失败\"}\n\n")
  137. return
  138. }
  139. req.Header.Set("Content-Type", "application/json")
  140. client := &http.Client{Timeout: 30 * time.Second}
  141. resp, err := client.Do(req)
  142. if err != nil {
  143. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索请求发送失败: %s\"}\n\n", err.Error())
  144. return
  145. }
  146. defer resp.Body.Close()
  147. responseBody, err := io.ReadAll(resp.Body)
  148. if err != nil {
  149. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"读取搜索结果失败\"}\n\n")
  150. return
  151. }
  152. if resp.StatusCode != http.StatusOK {
  153. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索API错误: 状态码 %d\"}\n\n", resp.StatusCode)
  154. return
  155. }
  156. // 解析搜索响应
  157. var searchResponse map[string]interface{}
  158. if err := json.Unmarshal(responseBody, &searchResponse); err != nil {
  159. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"解析搜索结果失败\"}\n\n")
  160. return
  161. }
  162. // 检查响应状态
  163. status, ok := searchResponse["status"].(string)
  164. if !ok || status != "success" {
  165. message, _ := searchResponse["message"].(string)
  166. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索失败: %s\"}\n\n", message)
  167. return
  168. }
  169. // 获取搜索结果
  170. results, ok := searchResponse["results"].([]interface{})
  171. if !ok || len(results) == 0 {
  172. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"未找到相关文档\"}\n\n")
  173. return
  174. }
  175. // 第三层:流式输出最终回答
  176. c.streamRAGResponse(userMessage, results)
  177. }
  178. }
  179. }
  180. // streamTextResponse 流式输出文本响应
  181. func (c *LiushiController) streamTextResponse(text string) {
  182. fmt.Println("=" + strings.Repeat("=", 80))
  183. fmt.Println("开始流式输出文本响应...")
  184. fmt.Printf("文本长度: %d 字符\n", len(text))
  185. fmt.Println("=" + strings.Repeat("=", 80))
  186. // 如果文本较短(小于200字符),直接发送完整文本
  187. if len(text) < 200 {
  188. fmt.Printf("文本较短,直接发送完整内容\n")
  189. fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", text)
  190. c.Ctx.ResponseWriter.Flush()
  191. } else {
  192. // 文本较长,按块发送以模拟流式效果
  193. fmt.Printf("文本较长,按块发送(每块50字符)\n")
  194. c.sendTextInChunks(text, 50)
  195. }
  196. // 结束标记
  197. fmt.Fprintf(c.Ctx.ResponseWriter, "data: [DONE]\n\n")
  198. c.Ctx.ResponseWriter.Flush()
  199. // 计算数据块数
  200. chunkCount := 1
  201. if len(text) >= 200 {
  202. chunkCount = (len(text) + 49) / 50 // 向上取整
  203. }
  204. // 打印完整的流式输出完成结果
  205. c.printStreamCompleteResult(len(text), chunkCount, text)
  206. }
  207. // sendTextInChunks 按块发送文本以模拟流式效果
  208. func (c *LiushiController) sendTextInChunks(text string, chunkSize int) int {
  209. runes := []rune(text)
  210. chunkCount := 0
  211. for i := 0; i < len(runes); i += chunkSize {
  212. end := i + chunkSize
  213. if end > len(runes) {
  214. end = len(runes)
  215. }
  216. chunk := string(runes[i:end])
  217. chunkCount++
  218. fmt.Printf("发送第 %d 块: '%s' (长度: %d)\n", chunkCount, chunk, len(chunk))
  219. fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", chunk)
  220. c.Ctx.ResponseWriter.Flush()
  221. time.Sleep(50 * time.Millisecond) // 调整延迟以控制速度
  222. }
  223. fmt.Printf("总共发送了 %d 个数据块\n", chunkCount)
  224. return chunkCount
  225. }
  226. // streamRAGResponse 流式输出RAG响应
  227. func (c *LiushiController) streamRAGResponse(userMessage string, results []interface{}) {
  228. // 将搜索结果转换为JSON字符串作为上下文
  229. contextJSON, err := json.Marshal(results)
  230. if err != nil {
  231. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"处理搜索结果失败: %s\"}\n\n", err.Error())
  232. return
  233. }
  234. // 构建最终回答的提示词(内容与原先 natural_language_answer 一致,但仅输出该段纯文本)
  235. finalPrompt := `
  236. `
  237. // 直接流式调用并透传 Markdown 正文(真正的流式输出)
  238. err = c.sendQwen3MessageStream(finalPrompt)
  239. if err != nil {
  240. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"生成最终回答失败: %s\"}\n\n", err.Error())
  241. return
  242. }
  243. }
  244. // sendQwen3MessageStream 发送消息到千问模型并直接流式输出到客户端
  245. func (c *LiushiController) sendQwen3MessageStream(userMessage string) error {
  246. apiURL, err := beego.AppConfig.String("qwen3_api_url")
  247. if err != nil || apiURL == "" {
  248. return fmt.Errorf("配置文件中未找到qwen3_api_url")
  249. }
  250. model, err := beego.AppConfig.String("qwen3_model")
  251. if err != nil || model == "" {
  252. return fmt.Errorf("配置文件中未找到qwen3_model")
  253. }
  254. qwen3Request := map[string]interface{}{
  255. "model": model,
  256. "stream": true,
  257. "temperature": 0.7,
  258. "messages": []map[string]string{
  259. {"role": "user", "content": userMessage},
  260. },
  261. }
  262. requestBody, err := json.Marshal(qwen3Request)
  263. if err != nil {
  264. return fmt.Errorf("请求序列化失败: %v", err)
  265. }
  266. req, err := http.NewRequest("POST", apiURL+"/v1/chat/completions", bytes.NewBuffer(requestBody))
  267. if err != nil {
  268. return fmt.Errorf("创建HTTP请求失败: %v", err)
  269. }
  270. req.Header.Set("Content-Type", "application/json")
  271. client := &http.Client{Timeout: 600 * time.Second}
  272. resp, err := client.Do(req)
  273. if err != nil {
  274. return fmt.Errorf("请求发送失败: %v", err)
  275. }
  276. defer resp.Body.Close()
  277. if resp.StatusCode != http.StatusOK {
  278. responseBody, err := io.ReadAll(resp.Body)
  279. if err != nil {
  280. return fmt.Errorf("千问API错误: 状态码 %d,读取响应失败: %v", resp.StatusCode, err)
  281. }
  282. return fmt.Errorf("千问API错误: %s", string(responseBody))
  283. }
  284. // 直接流式处理并输出到客户端
  285. return c.handleStreamResponseToClient(resp)
  286. }
  287. // sendQwen3Message 发送消息到千问模型
  288. func (c *LiushiController) sendQwen3Message(userMessage string, useStream bool) (string, error) {
  289. apiURL, err := beego.AppConfig.String("qwen3_api_url")
  290. if err != nil || apiURL == "" {
  291. return "", fmt.Errorf("配置文件中未找到qwen3_api_url")
  292. }
  293. model, err := beego.AppConfig.String("qwen3_model")
  294. if err != nil || model == "" {
  295. return "", fmt.Errorf("配置文件中未找到qwen3_model")
  296. }
  297. qwen3Request := map[string]interface{}{
  298. "model": model,
  299. "stream": useStream,
  300. "temperature": 0.7,
  301. "messages": []map[string]string{
  302. {"role": "user", "content": userMessage},
  303. },
  304. }
  305. requestBody, err := json.Marshal(qwen3Request)
  306. if err != nil {
  307. return "", fmt.Errorf("请求序列化失败: %v", err)
  308. }
  309. req, err := http.NewRequest("POST", apiURL+"/v1/chat/completions", bytes.NewBuffer(requestBody))
  310. if err != nil {
  311. return "", fmt.Errorf("创建HTTP请求失败: %v", err)
  312. }
  313. req.Header.Set("Content-Type", "application/json")
  314. client := &http.Client{Timeout: 600 * time.Second}
  315. resp, err := client.Do(req)
  316. if err != nil {
  317. return "", fmt.Errorf("请求发送失败: %v", err)
  318. }
  319. defer resp.Body.Close()
  320. if resp.StatusCode != http.StatusOK {
  321. responseBody, err := io.ReadAll(resp.Body)
  322. if err != nil {
  323. return "", fmt.Errorf("千问API错误: 状态码 %d,读取响应失败: %v", resp.StatusCode, err)
  324. }
  325. return "", fmt.Errorf("千问API错误: %s", string(responseBody))
  326. }
  327. if useStream {
  328. // 流式响应处理
  329. type StreamResponse struct {
  330. ID string `json:"id"`
  331. Object string `json:"object"`
  332. Created int64 `json:"created"`
  333. Model string `json:"model"`
  334. Choices []struct {
  335. Index int `json:"index"`
  336. Delta struct {
  337. Role string `json:"role,omitempty"`
  338. Content string `json:"content,omitempty"`
  339. } `json:"delta"`
  340. FinishReason *string `json:"finish_reason"`
  341. } `json:"choices"`
  342. }
  343. var fullContent strings.Builder
  344. scanner := bufio.NewScanner(resp.Body)
  345. for scanner.Scan() {
  346. line := scanner.Text()
  347. if line == "" || !strings.HasPrefix(line, "data: ") {
  348. continue
  349. }
  350. data := strings.TrimPrefix(line, "data: ")
  351. if data == "[DONE]" {
  352. break
  353. }
  354. var streamResp StreamResponse
  355. if err := json.Unmarshal([]byte(data), &streamResp); err != nil {
  356. continue
  357. }
  358. if len(streamResp.Choices) > 0 && streamResp.Choices[0].Delta.Content != "" {
  359. fullContent.WriteString(streamResp.Choices[0].Delta.Content)
  360. }
  361. }
  362. return fullContent.String(), nil
  363. } else {
  364. // 非流式响应处理
  365. type NonStreamResponse struct {
  366. Choices []struct {
  367. Message struct {
  368. Content string `json:"content"`
  369. } `json:"message"`
  370. } `json:"choices"`
  371. }
  372. responseBody, err := io.ReadAll(resp.Body)
  373. if err != nil {
  374. return "", fmt.Errorf("读取响应失败: %v", err)
  375. }
  376. var response NonStreamResponse
  377. if err := json.Unmarshal(responseBody, &response); err != nil {
  378. return "", fmt.Errorf("解析响应失败: %v", err)
  379. }
  380. if len(response.Choices) > 0 {
  381. return response.Choices[0].Message.Content, nil
  382. }
  383. return "", fmt.Errorf("未找到有效响应")
  384. }
  385. }
  386. // handleStreamResponseToClient 处理流式响应并直接输出到客户端
  387. func (c *LiushiController) handleStreamResponseToClient(resp *http.Response) error {
  388. type StreamResponse struct {
  389. ID string `json:"id"`
  390. Object string `json:"object"`
  391. Created int64 `json:"created"`
  392. Model string `json:"model"`
  393. Choices []struct {
  394. Index int `json:"index"`
  395. Delta struct {
  396. Role string `json:"role,omitempty"`
  397. Content string `json:"content,omitempty"`
  398. } `json:"delta"`
  399. FinishReason *string `json:"finish_reason"`
  400. } `json:"choices"`
  401. }
  402. fmt.Println("=" + strings.Repeat("=", 80))
  403. fmt.Println("开始处理千问API流式响应(直接转发原始数据块)...")
  404. fmt.Println("=" + strings.Repeat("=", 80))
  405. scanner := bufio.NewScanner(resp.Body)
  406. charCount := 0
  407. blockCount := 0
  408. fullContent := "" // 保存完整的响应内容
  409. for scanner.Scan() {
  410. line := scanner.Text()
  411. if line == "" || !strings.HasPrefix(line, "data: ") {
  412. continue
  413. }
  414. data := strings.TrimPrefix(line, "data: ")
  415. if data == "[DONE]" {
  416. fmt.Println("\n" + strings.Repeat("-", 40))
  417. fmt.Printf("千问API流式结束,总共输出了 %d 个字符\n", charCount)
  418. fmt.Println(strings.Repeat("-", 40))
  419. // 发送结束标记
  420. fmt.Fprintf(c.Ctx.ResponseWriter, "data: [DONE]\n\n")
  421. c.Ctx.ResponseWriter.Flush()
  422. break
  423. }
  424. var streamResp StreamResponse
  425. if err := json.Unmarshal([]byte(data), &streamResp); err != nil {
  426. continue
  427. }
  428. if len(streamResp.Choices) > 0 && streamResp.Choices[0].Delta.Content != "" {
  429. // 获取内容块
  430. content := streamResp.Choices[0].Delta.Content
  431. charCount += len([]rune(content))
  432. blockCount++
  433. // 保存完整内容
  434. fullContent += content
  435. fmt.Printf("收到第 %d 个内容块: '%s' (长度: %d)\n", blockCount, content, len(content))
  436. fmt.Printf("直接转发完整内容块到客户端\n")
  437. // *** 关键修改:处理换行符,确保Markdown格式正确 ***
  438. // 将换行符替换为\n,确保SSE格式正确传输
  439. escapedContent := strings.ReplaceAll(content, "\n", "\\n")
  440. fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", escapedContent)
  441. c.Ctx.ResponseWriter.Flush()
  442. // 移除逐字符输出和 time.Sleep(10 * time.Millisecond)
  443. }
  444. // 检查是否完成
  445. if len(streamResp.Choices) > 0 && streamResp.Choices[0].FinishReason != nil {
  446. fmt.Println("\n" + strings.Repeat("-", 40))
  447. fmt.Printf("流式完成,总共接收了 %d 个数据块,%d 个字符\n", blockCount, charCount)
  448. fmt.Println(strings.Repeat("-", 40))
  449. fmt.Fprintf(c.Ctx.ResponseWriter, "data: [DONE]\n\n")
  450. c.Ctx.ResponseWriter.Flush()
  451. break
  452. }
  453. }
  454. // 打印完整的流式输出完成结果
  455. c.printStreamCompleteResult(charCount, blockCount, fullContent)
  456. return scanner.Err()
  457. }
  458. // StreamChatWithDB 流式聊天接口(集成数据库操作)
  459. func (c *LiushiController) StreamChatWithDB() {
  460. // 设置响应头为SSE流式传输
  461. c.Ctx.ResponseWriter.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
  462. c.Ctx.ResponseWriter.Header().Set("Cache-Control", "no-cache")
  463. c.Ctx.ResponseWriter.Header().Set("Connection", "keep-alive")
  464. c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*")
  465. c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
  466. c.Ctx.ResponseWriter.Header().Set("Access-Control-Allow-Headers", "Content-Type")
  467. // 从token中获取用户信息
  468. userInfo, err := utils.GetUserInfoFromContext(c.Ctx.Input.GetData("userInfo"))
  469. if err != nil {
  470. c.Ctx.ResponseWriter.WriteHeader(http.StatusUnauthorized)
  471. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"获取用户信息失败: %s\"}\n\n", err.Error())
  472. return
  473. }
  474. user_id := uint64(userInfo.ID)
  475. if user_id == 0 {
  476. user_id = 1
  477. }
  478. // 获取请求参数
  479. var requestData StreamChatWithDBRequest
  480. // 添加调试日志
  481. requestBody := c.Ctx.Input.RequestBody
  482. fmt.Printf("🔍 请求体内容: %s\n", string(requestBody))
  483. fmt.Printf("🔍 请求体长度: %d\n", len(requestBody))
  484. fmt.Printf("🔍 Content-Type: %s\n", c.Ctx.Request.Header.Get("Content-Type"))
  485. if err := json.Unmarshal(requestBody, &requestData); err != nil {
  486. fmt.Printf("❌ JSON解析失败: %v\n", err)
  487. c.Ctx.ResponseWriter.WriteHeader(http.StatusBadRequest)
  488. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"请求参数解析失败: %s\"}\n\n", err.Error())
  489. return
  490. }
  491. fmt.Println("流式聊天数据库集成请求数据:", requestData)
  492. // 数据库操作(保存用户消息)
  493. ai_conversation_id, user_message_id, err := c.saveUserMessage(&requestData, user_id)
  494. if err != nil {
  495. c.Ctx.ResponseWriter.WriteHeader(http.StatusInternalServerError)
  496. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"保存用户消息失败: %s\"}\n\n", err.Error())
  497. return
  498. }
  499. // 流式输出AI回复(包含初始响应)
  500. c.streamAIReply(requestData.Message, user_id, ai_conversation_id, user_message_id, requestData.OnlineSearchContent)
  501. }
  502. // saveUserMessage 保存用户消息到数据库
  503. func (c *LiushiController) saveUserMessage(requestData *StreamChatWithDBRequest, user_id uint64) (uint64, uint64, error) {
  504. userMessage := requestData.Message
  505. if user_id == 0 {
  506. user_id = 1
  507. }
  508. ai_conversation_id := requestData.AIConversationId
  509. tx := models.DB.Begin()
  510. defer func() {
  511. if r := recover(); r != nil {
  512. tx.Rollback()
  513. }
  514. }()
  515. // 创建或获取对话
  516. if ai_conversation_id == 0 {
  517. ai_conversation := models.AIConversation{
  518. UserId: user_id,
  519. Content: userMessage,
  520. BusinessType: requestData.BusinessType,
  521. ExamName: requestData.ExamName,
  522. }
  523. if err := tx.Create(&ai_conversation).Error; err != nil {
  524. tx.Rollback()
  525. return 0, 0, err
  526. }
  527. ai_conversation_id = uint64(ai_conversation.ID)
  528. }
  529. // 保存用户消息
  530. ai_message := models.AIMessage{
  531. UserId: user_id,
  532. Content: userMessage,
  533. Type: "user",
  534. AIConversationId: ai_conversation_id,
  535. }
  536. if err := tx.Create(&ai_message).Error; err != nil {
  537. tx.Rollback()
  538. return 0, 0, err
  539. }
  540. tx.Commit()
  541. return ai_conversation_id, uint64(ai_message.ID), nil
  542. }
  543. // sendInitialResponse 发送初始响应(包含数据库ID)
  544. func (c *LiushiController) sendInitialResponse(ai_conversation_id, ai_message_id uint64) {
  545. initialResponse := map[string]interface{}{
  546. "type": "initial",
  547. "ai_conversation_id": ai_conversation_id,
  548. "ai_message_id": ai_message_id,
  549. "status": "success",
  550. }
  551. responseJSON, _ := json.Marshal(initialResponse)
  552. fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", responseJSON)
  553. c.Ctx.ResponseWriter.Flush()
  554. }
  555. // streamAIReply 流式输出AI回复并保存到数据库
  556. func (c *LiushiController) streamAIReply(userMessage string, user_id, ai_conversation_id, ai_message_id uint64, onlineSearchContent string) {
  557. fmt.Println("🚀 ========== 开始流式AI回复流程 ==========")
  558. fmt.Printf("📝 用户消息: %s\n", userMessage)
  559. fmt.Printf("👤 用户ID: %d\n", user_id)
  560. fmt.Printf("💬 对话ID: %d\n", ai_conversation_id)
  561. fmt.Printf("📨 消息ID: %d\n", ai_message_id)
  562. fmt.Println("🚀 ========================================")
  563. // 创建AI回复记录
  564. ai_reply := models.AIMessage{
  565. UserId: user_id, // 使用请求中的用户ID
  566. Content: "", // 初始为空,流式完成后更新
  567. Type: "ai",
  568. AIConversationId: ai_conversation_id,
  569. PrevUserId: ai_message_id,
  570. }
  571. fmt.Println("📊 创建AI回复记录...")
  572. tx := models.DB.Begin()
  573. if err := tx.Create(&ai_reply).Error; err != nil {
  574. tx.Rollback()
  575. fmt.Printf("❌ 创建AI回复记录失败: %v\n", err)
  576. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"创建AI回复记录失败: %s\"}\n\n", err.Error())
  577. return
  578. }
  579. tx.Commit()
  580. fmt.Printf("✅ AI回复记录创建成功,ID: %d\n", ai_reply.ID)
  581. // 发送初始响应(包含AI消息ID)
  582. c.sendInitialResponse(ai_conversation_id, uint64(ai_reply.ID))
  583. // 直接使用RAG流程进行搜索和回答
  584. fmt.Println("🔄 开始处理消息(RAG流程)...")
  585. c.processMessageWithRAG(userMessage, &ai_reply, onlineSearchContent)
  586. fmt.Println("🎉 ========== 流式AI回复流程完成 ==========")
  587. }
  588. // processMessageWithRAG 处理消息的完整流程(RAG+数据库更新)
  589. func (c *LiushiController) processMessageWithRAG(userMessage string, ai_reply *models.AIMessage, onlineSearchContent string) {
  590. fmt.Println("🔍 ========== 开始RAG检索流程 ==========")
  591. fmt.Printf("📝 处理消息: %s\n", userMessage)
  592. // 直接使用用户消息进行搜索
  593. query := userMessage
  594. fmt.Printf("🎯 使用查询: %s\n", query)
  595. // 构建搜索请求
  596. searchRequest := map[string]interface{}{
  597. "query": query,
  598. "n_results": 25,
  599. }
  600. fmt.Printf("📦 搜索请求参数: query=%s, n_results=25\n", query)
  601. requestBody, err := json.Marshal(searchRequest)
  602. if err != nil {
  603. fmt.Printf("❌ 搜索请求构建失败: %v\n", err)
  604. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索请求构建失败: %s\"}\n\n", err.Error())
  605. return
  606. }
  607. // 从配置文件中读取搜索API地址
  608. searchAPIURL, err := beego.AppConfig.String("search_api_url")
  609. if err != nil || searchAPIURL == "" {
  610. fmt.Printf("❌ 配置文件中未找到search_api_url: %v\n", err)
  611. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"配置文件中未找到search_api_url: %s\"}\n\n", err.Error())
  612. return
  613. }
  614. fmt.Printf("🌐 搜索API地址: %s\n", searchAPIURL)
  615. // 发送HTTP请求到搜索服务
  616. fmt.Println("📤 发送搜索请求...")
  617. req, err := http.NewRequest("POST", searchAPIURL, bytes.NewBuffer(requestBody))
  618. if err != nil {
  619. fmt.Printf("❌ 创建搜索请求失败: %v\n", err)
  620. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"创建搜索请求失败: %s\"}\n\n", err.Error())
  621. return
  622. }
  623. req.Header.Set("Content-Type", "application/json")
  624. client := &http.Client{Timeout: 30 * time.Second}
  625. resp, err := client.Do(req)
  626. if err != nil {
  627. fmt.Printf("❌ 搜索请求发送失败: %v\n", err)
  628. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索请求发送失败: %s\"}\n\n", err.Error())
  629. return
  630. }
  631. defer resp.Body.Close()
  632. fmt.Printf("📥 收到搜索响应,状态码: %d\n", resp.StatusCode)
  633. responseBody, err := io.ReadAll(resp.Body)
  634. if err != nil {
  635. fmt.Printf("❌ 读取搜索结果失败: %v\n", err)
  636. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"读取搜索结果失败: %s\"}\n\n", err.Error())
  637. return
  638. }
  639. if resp.StatusCode != http.StatusOK {
  640. fmt.Printf("❌ 搜索API错误: 状态码 %d\n", resp.StatusCode)
  641. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索API错误: 状态码 %d\"}\n\n", resp.StatusCode)
  642. return
  643. }
  644. // 解析搜索响应
  645. var searchResponse map[string]interface{}
  646. if err := json.Unmarshal(responseBody, &searchResponse); err != nil {
  647. fmt.Printf("❌ 解析搜索结果失败: %v\n", err)
  648. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"解析搜索结果失败: %s\"}\n\n", err.Error())
  649. return
  650. }
  651. // 检查响应状态
  652. status, ok := searchResponse["status"].(string)
  653. if !ok || status != "success" {
  654. message, _ := searchResponse["message"].(string)
  655. fmt.Printf("❌ 搜索失败: %s\n", message)
  656. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"搜索失败: %s\"}\n\n", message)
  657. return
  658. }
  659. // 获取搜索结果
  660. results, ok := searchResponse["results"].([]interface{})
  661. if !ok || len(results) == 0 {
  662. fmt.Printf("⚠️ 未找到相关文档\n")
  663. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"未找到相关文档\"}\n\n")
  664. return
  665. }
  666. fmt.Printf("✅ 搜索成功,找到 %d 个相关文档\n", len(results))
  667. fmt.Println("🔄 开始流式输出RAG响应...")
  668. // 流式输出最终回答并更新数据库
  669. c.streamRAGResponseWithDB(userMessage, results, ai_reply, onlineSearchContent)
  670. }
  671. // streamRAGResponseWithDB 流式输出RAG响应并更新数据库
  672. func (c *LiushiController) streamRAGResponseWithDB(userMessage string, results []interface{}, ai_reply *models.AIMessage, onlineSearchContent string) {
  673. // 将搜索结果转换为JSON字符串作为上下文
  674. contextJSON, err := json.Marshal(results)
  675. if err != nil {
  676. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"处理搜索结果失败: %s\"}\n\n", err.Error())
  677. return
  678. }
  679. // 获取历史对话上下文
  680. var historyContext string
  681. if ai_reply.AIConversationId > 0 {
  682. var historyMessages []models.AIMessage
  683. // 获取当前对话的历史消息,按时间排序,排除当前消息
  684. models.DB.Model(&models.AIMessage{}).
  685. Where("user_id = ? AND ai_conversation_id = ? AND is_deleted = ? AND id < ?",
  686. ai_reply.UserId, ai_reply.AIConversationId, 0, ai_reply.ID).
  687. Order("updated_at ASC").
  688. Find(&historyMessages)
  689. // 限制为前两轮对话(每轮包含用户消息和AI回复)
  690. if len(historyMessages) > 0 {
  691. // 计算轮数:每2条消息为1轮(用户消息+AI回复)
  692. maxRounds := 2
  693. maxMessages := maxRounds * 2
  694. if len(historyMessages) > maxMessages {
  695. historyMessages = historyMessages[len(historyMessages)-maxMessages:]
  696. }
  697. // 构建历史对话上下文
  698. historyContext = "\n\n# 历史对话上下文\n"
  699. for _, msg := range historyMessages {
  700. if msg.Type == "user" {
  701. historyContext += "用户: " + msg.Content + "\n"
  702. } else if msg.Type == "ai" {
  703. historyContext += "蜀安AI助手: " + msg.Content + "\n"
  704. }
  705. }
  706. historyContext += "\n"
  707. }
  708. }
  709. finalPrompt := `
  710. `
  711. // 直接流式调用并透传 Markdown 正文(真正的流式输出)
  712. fmt.Println("🌊 ========== 开始RAG流式输出 ==========")
  713. fmt.Printf("📝 用户问题: %s\n", userMessage)
  714. fmt.Printf("📚 检索到文档数量: %d\n", len(results))
  715. var fullContent strings.Builder
  716. charCount := 0
  717. blockCount := 0
  718. fmt.Println("🔄 开始流式调用AI模型...")
  719. err = c.sendQwen3MessageStreamWithCallback(finalPrompt, func(content string) {
  720. fullContent.WriteString(content)
  721. charCount += len([]rune(content))
  722. blockCount++
  723. // 发送流式数据到前端
  724. fmt.Printf("📤 流式块 %d: '%s' (长度: %d字符, 累计: %d字符)\n",
  725. blockCount, content, len(content), charCount)
  726. fmt.Fprintf(c.Ctx.ResponseWriter, "data: %s\n\n", content)
  727. c.Ctx.ResponseWriter.Flush()
  728. })
  729. if err != nil {
  730. fmt.Fprintf(c.Ctx.ResponseWriter, "data: {\"error\": \"生成最终回答失败: %s\"}\n\n", err.Error())
  731. return
  732. }
  733. // 发送完成标记
  734. fmt.Fprintf(c.Ctx.ResponseWriter, "data: [DONE]\n\n")
  735. c.Ctx.ResponseWriter.Flush()
  736. // 更新数据库中的AI回复内容
  737. finalContent := fullContent.String()
  738. if err := models.DB.Model(ai_reply).Update("content", finalContent).Error; err != nil {
  739. fmt.Printf("更新AI回复内容失败: %v\n", err)
  740. }
  741. // 打印完成结果
  742. c.printStreamCompleteResult(charCount, blockCount, finalContent)
  743. }
  744. // sendQwen3MessageStreamWithCallback 带回调的流式方法
  745. func (c *LiushiController) sendQwen3MessageStreamWithCallback(userMessage string, callback func(string)) error {
  746. apiURL, err := beego.AppConfig.String("qwen3_api_url")
  747. if err != nil || apiURL == "" {
  748. return fmt.Errorf("配置文件中未找到qwen3_api_url")
  749. }
  750. model, err := beego.AppConfig.String("qwen3_model")
  751. if err != nil || model == "" {
  752. return fmt.Errorf("配置文件中未找到qwen3_model")
  753. }
  754. qwen3Request := map[string]interface{}{
  755. "model": model,
  756. "stream": true,
  757. "temperature": 0.7,
  758. "messages": []map[string]string{
  759. {"role": "user", "content": userMessage},
  760. },
  761. }
  762. requestBody, err := json.Marshal(qwen3Request)
  763. if err != nil {
  764. return fmt.Errorf("请求序列化失败: %v", err)
  765. }
  766. req, err := http.NewRequest("POST", apiURL+"/v1/chat/completions", bytes.NewBuffer(requestBody))
  767. if err != nil {
  768. return fmt.Errorf("创建HTTP请求失败: %v", err)
  769. }
  770. req.Header.Set("Content-Type", "application/json")
  771. client := &http.Client{Timeout: 600 * time.Second}
  772. resp, err := client.Do(req)
  773. if err != nil {
  774. return fmt.Errorf("请求发送失败: %v", err)
  775. }
  776. defer resp.Body.Close()
  777. if resp.StatusCode != http.StatusOK {
  778. responseBody, err := io.ReadAll(resp.Body)
  779. if err != nil {
  780. return fmt.Errorf("千问API错误: 状态码 %d,读取响应失败: %v", resp.StatusCode, err)
  781. }
  782. return fmt.Errorf("千问API错误: %s", string(responseBody))
  783. }
  784. // 处理流式响应
  785. fmt.Println("📡 开始处理流式响应...")
  786. scanner := bufio.NewScanner(resp.Body)
  787. lineCount := 0
  788. validDataCount := 0
  789. for scanner.Scan() {
  790. line := scanner.Text()
  791. lineCount++
  792. if line == "" || !strings.HasPrefix(line, "data: ") {
  793. continue
  794. }
  795. data := strings.TrimPrefix(line, "data: ")
  796. if data == "[DONE]" {
  797. fmt.Printf("🏁 收到结束标记 [DONE],流式响应结束\n")
  798. break
  799. }
  800. var streamResp struct {
  801. Choices []struct {
  802. Delta struct {
  803. Content string `json:"content,omitempty"`
  804. } `json:"delta"`
  805. FinishReason *string `json:"finish_reason"`
  806. } `json:"choices"`
  807. }
  808. if err := json.Unmarshal([]byte(data), &streamResp); err != nil {
  809. fmt.Printf("⚠️ 解析流式数据失败 (行 %d): %v\n", lineCount, err)
  810. continue
  811. }
  812. if len(streamResp.Choices) > 0 && streamResp.Choices[0].Delta.Content != "" {
  813. content := streamResp.Choices[0].Delta.Content
  814. validDataCount++
  815. // 处理换行符
  816. escapedContent := strings.ReplaceAll(content, "\n", "\\n")
  817. fmt.Printf("📦 流式数据块 %d: '%s' (原始长度: %d)\n", validDataCount, content, len(content))
  818. callback(escapedContent)
  819. }
  820. if len(streamResp.Choices) > 0 && streamResp.Choices[0].FinishReason != nil {
  821. fmt.Printf("🏁 收到完成原因: %s\n", *streamResp.Choices[0].FinishReason)
  822. break
  823. }
  824. }
  825. fmt.Printf("📊 流式处理统计: 总行数=%d, 有效数据块=%d\n", lineCount, validDataCount)
  826. return scanner.Err()
  827. }
  828. // printStreamCompleteResult 打印流式输出完成结果
  829. func (c *LiushiController) printStreamCompleteResult(charCount, blockCount int, fullContent string) {
  830. fmt.Println("=" + strings.Repeat("=", 80))
  831. fmt.Println("🎉 后端流式输出完成!")
  832. fmt.Println("=" + strings.Repeat("=", 80))
  833. fmt.Println("📊 后端统计信息:")
  834. fmt.Printf("📝 总字符数: %d\n", charCount)
  835. fmt.Printf("📦 数据块数: %d\n", blockCount)
  836. fmt.Printf("⏱️ 完成时间: %s\n", time.Now().Format("2006/1/2 下午3:04:05"))
  837. fmt.Println("=" + strings.Repeat("=", 80))
  838. fmt.Println("🔍 数据一致性检查:")
  839. fmt.Println("请对比前端控制台输出的统计信息,确保数据一致")
  840. fmt.Println("=" + strings.Repeat("=", 80))
  841. fmt.Println("✅ 后端流式输出已完全结束,所有数据已发送到客户端")
  842. fmt.Println("=" + strings.Repeat("=", 80))
  843. // 打印完整内容
  844. if fullContent != "" {
  845. fmt.Println("📄 完整响应内容:")
  846. fmt.Println("=" + strings.Repeat("=", 80))
  847. fmt.Println(fullContent)
  848. fmt.Println("=" + strings.Repeat("=", 80))
  849. }
  850. }