search_service.py 3.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. """
  2. 联网搜索服务
  3. """
  4. import httpx
  5. import json
  6. from typing import List, Dict, Any
  7. from utils.config import settings
  8. from utils.logger import logger
  9. class SearchService:
  10. def __init__(self):
  11. self.workflow_url = settings.dify.workflow_url
  12. self.workflow_id = settings.dify.workflow_id
  13. self.auth_token = f"Bearer {settings.dify.auth_token}"
  14. async def workflow_search(self, keywords: str, num: int = 10, max_text_len: int = 150) -> List[Dict[str, Any]]:
  15. """调用 Dify workflow 进行搜索"""
  16. try:
  17. request_body = {
  18. "workflow_id": self.workflow_id,
  19. "inputs": {
  20. "keywords": keywords,
  21. "num": num,
  22. "max_text_len": max_text_len
  23. },
  24. "response_mode": "blocking",
  25. "user": "user_001"
  26. }
  27. async with httpx.AsyncClient(timeout=30.0) as client:
  28. response = await client.post(
  29. self.workflow_url,
  30. json=request_body,
  31. headers={
  32. "Authorization": self.auth_token,
  33. "Content-Type": "application/json"
  34. }
  35. )
  36. if response.status_code != 200:
  37. logger.error(f"Workflow API 请求失败: {response.status_code}, {response.text}")
  38. return []
  39. api_response = response.json()
  40. # 检查工作流状态
  41. data = api_response.get("data", {})
  42. status = data.get("status")
  43. if status != "succeeded":
  44. error_msg = data.get("error", "未知错误")
  45. logger.error(f"工作流执行失败: {status}, {error_msg}")
  46. return []
  47. # 提取结果
  48. outputs = data.get("outputs", {})
  49. # 优先解析 outputs.text
  50. text_result = outputs.get("text", "")
  51. if text_result:
  52. try:
  53. # 直接解析
  54. parsed = json.loads(text_result.strip())
  55. if isinstance(parsed, list):
  56. return parsed
  57. except json.JSONDecodeError:
  58. # 清洗后解析
  59. try:
  60. cleaned = text_result.replace("'", '"').replace("None", "null").replace("\\xa0", " ").replace("\\u0026", "&")
  61. parsed = json.loads(cleaned.strip())
  62. if isinstance(parsed, list):
  63. return parsed
  64. except json.JSONDecodeError:
  65. pass
  66. # 回退:解析 outputs.json[0].results
  67. json_array = outputs.get("json", [])
  68. if json_array and len(json_array) > 0:
  69. first_result = json_array[0]
  70. if isinstance(first_result, dict):
  71. results = first_result.get("results", [])
  72. if isinstance(results, list):
  73. return results
  74. logger.warning("无法从 workflow 响应中提取结果")
  75. return []
  76. except httpx.HTTPError as e:
  77. logger.error(f"Workflow API 调用失败: {e}")
  78. return []
  79. except Exception as e:
  80. logger.error(f"搜索服务异常: {e}")
  81. return []
  82. # 全局实例
  83. search_service = SearchService()