| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- """
- 联网搜索服务
- """
- import httpx
- import json
- from typing import List, Dict, Any
- from utils.config import settings
- from utils.logger import logger
- class SearchService:
- def __init__(self):
- self.workflow_url = settings.dify.workflow_url
- self.workflow_id = settings.dify.workflow_id
- self.auth_token = f"Bearer {settings.dify.auth_token}"
-
- async def workflow_search(self, keywords: str, num: int = 10, max_text_len: int = 150) -> List[Dict[str, Any]]:
- """调用 Dify workflow 进行搜索"""
- try:
- request_body = {
- "workflow_id": self.workflow_id,
- "inputs": {
- "keywords": keywords,
- "num": num,
- "max_text_len": max_text_len
- },
- "response_mode": "blocking",
- "user": "user_001"
- }
-
- async with httpx.AsyncClient(timeout=30.0) as client:
- response = await client.post(
- self.workflow_url,
- json=request_body,
- headers={
- "Authorization": self.auth_token,
- "Content-Type": "application/json"
- }
- )
-
- if response.status_code != 200:
- logger.error(f"Workflow API 请求失败: {response.status_code}, {response.text}")
- return []
-
- api_response = response.json()
-
- # 检查工作流状态
- data = api_response.get("data", {})
- status = data.get("status")
-
- if status != "succeeded":
- error_msg = data.get("error", "未知错误")
- logger.error(f"工作流执行失败: {status}, {error_msg}")
- return []
-
- # 提取结果
- outputs = data.get("outputs", {})
-
- # 优先解析 outputs.text
- text_result = outputs.get("text", "")
- if text_result:
- try:
- # 直接解析
- parsed = json.loads(text_result.strip())
- if isinstance(parsed, list):
- return parsed
- except json.JSONDecodeError:
- # 清洗后解析
- try:
- cleaned = text_result.replace("'", '"').replace("None", "null").replace("\\xa0", " ").replace("\\u0026", "&")
- parsed = json.loads(cleaned.strip())
- if isinstance(parsed, list):
- return parsed
- except json.JSONDecodeError:
- pass
-
- # 回退:解析 outputs.json[0].results
- json_array = outputs.get("json", [])
- if json_array and len(json_array) > 0:
- first_result = json_array[0]
- if isinstance(first_result, dict):
- results = first_result.get("results", [])
- if isinstance(results, list):
- return results
-
- logger.warning("无法从 workflow 响应中提取结果")
- return []
-
- except httpx.HTTPError as e:
- logger.error(f"Workflow API 调用失败: {e}")
- return []
- except Exception as e:
- logger.error(f"搜索服务异常: {e}")
- return []
- # 全局实例
- search_service = SearchService()
|