vertical_domain_processor.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. """
  2. 垂直领域搜索结果处理器
  3. 处理垂直领域搜索返回的结构化数据
  4. 需求: 3.4, 3.5
  5. """
  6. import logging
  7. from typing import Dict, List, Optional, Any, Union
  8. from app.schemas.llm_schema import SearchResult
  9. logger = logging.getLogger(__name__)
  10. class VerticalDomainProcessor:
  11. """垂直领域搜索结果处理器"""
  12. # 支持的垂直领域类型
  13. SUPPORTED_DOMAINS = {
  14. "weather": "天气",
  15. "stock": "股票",
  16. "exchange_rate": "汇率",
  17. "oil_price": "油价",
  18. "calendar": "万年历",
  19. "gold_price": "金价",
  20. "silver_price": "银价",
  21. "lottery": "彩票",
  22. "movie": "影视",
  23. "traffic_restriction": "车牌限行",
  24. "football": "足球赛事"
  25. }
  26. @staticmethod
  27. def detect_vertical_domain(search_info: Dict) -> Optional[str]:
  28. """
  29. 检测搜索结果是否包含垂直领域数据
  30. Args:
  31. search_info: 搜索信息字典
  32. Returns:
  33. 检测到的垂直领域类型,如果没有则返回None
  34. """
  35. if not search_info:
  36. return None
  37. # 检查是否有垂直领域标识
  38. domain_type = search_info.get("vertical_domain_type")
  39. if domain_type and domain_type in VerticalDomainProcessor.SUPPORTED_DOMAINS:
  40. logger.info(f"检测到垂直领域类型: {domain_type}")
  41. return domain_type
  42. # 通过搜索结果内容推断领域类型
  43. search_results = search_info.get("search_results", [])
  44. if search_results:
  45. # 检查第一个结果的内容特征
  46. first_result = search_results[0]
  47. title = first_result.get("title", "").lower()
  48. snippet = first_result.get("snippet", "").lower()
  49. # 基于关键词推断领域类型
  50. content = f"{title} {snippet}"
  51. if any(keyword in content for keyword in ["天气", "气温", "降雨", "weather"]):
  52. return "weather"
  53. if any(keyword in content for keyword in ["股票", "股价", "涨跌", "stock", "沪指", "深指"]):
  54. return "stock"
  55. elif any(keyword in content for keyword in ["汇率", "美元", "人民币", "exchange"]):
  56. return "exchange_rate"
  57. elif any(keyword in content for keyword in ["油价", "汽油", "柴油", "oil"]):
  58. return "oil_price"
  59. elif any(keyword in content for keyword in ["农历", "黄历", "节气", "calendar"]):
  60. return "calendar"
  61. elif any(keyword in content for keyword in ["金价", "黄金", "gold"]):
  62. return "gold_price"
  63. elif any(keyword in content for keyword in ["银价", "白银", "silver"]):
  64. return "silver_price"
  65. elif any(keyword in content for keyword in ["双色球", "大乐透", "彩票", "lottery"]):
  66. return "lottery"
  67. elif any(keyword in content for keyword in ["电影", "电视剧", "影视", "movie"]):
  68. return "movie"
  69. elif any(keyword in content for keyword in ["限行", "车牌", "traffic"]):
  70. return "traffic_restriction"
  71. elif any(keyword in content for keyword in ["足球", "联赛", "积分榜", "football"]):
  72. return "football"
  73. return None
  74. @staticmethod
  75. def extract_structured_data(search_info: Dict, domain_type: str) -> Optional[Dict]:
  76. """
  77. 从搜索信息中提取结构化数据
  78. Args:
  79. search_info: 搜索信息字典
  80. domain_type: 垂直领域类型
  81. Returns:
  82. 提取的结构化数据,如果没有则返回None
  83. """
  84. if not search_info or domain_type not in VerticalDomainProcessor.SUPPORTED_DOMAINS:
  85. return None
  86. # 检查是否有专门的结构化数据字段
  87. structured_data = search_info.get("structured_data")
  88. if structured_data:
  89. logger.info(f"提取到 {domain_type} 领域的结构化数据")
  90. return structured_data
  91. # 检查是否有垂直领域扩展数据
  92. vertical_data = search_info.get("vertical_extension_data")
  93. if vertical_data:
  94. logger.info(f"提取到 {domain_type} 领域的垂直扩展数据")
  95. return vertical_data
  96. # 尝试从搜索结果中解析结构化信息
  97. search_results = search_info.get("search_results", [])
  98. if search_results:
  99. parsed_data = VerticalDomainProcessor._parse_domain_specific_data(
  100. search_results, domain_type
  101. )
  102. if parsed_data:
  103. logger.info(f"从搜索结果中解析出 {domain_type} 领域的结构化数据")
  104. return parsed_data
  105. return None
  106. @staticmethod
  107. def _parse_domain_specific_data(search_results: List[Dict], domain_type: str) -> Optional[Dict]:
  108. """
  109. 解析特定领域的结构化数据
  110. Args:
  111. search_results: 搜索结果列表
  112. domain_type: 垂直领域类型
  113. Returns:
  114. 解析的结构化数据
  115. """
  116. if not search_results:
  117. return None
  118. try:
  119. if domain_type == "weather":
  120. return VerticalDomainProcessor._parse_weather_data(search_results)
  121. elif domain_type == "stock":
  122. return VerticalDomainProcessor._parse_stock_data(search_results)
  123. elif domain_type == "exchange_rate":
  124. return VerticalDomainProcessor._parse_exchange_rate_data(search_results)
  125. elif domain_type == "oil_price":
  126. return VerticalDomainProcessor._parse_oil_price_data(search_results)
  127. elif domain_type == "calendar":
  128. return VerticalDomainProcessor._parse_calendar_data(search_results)
  129. elif domain_type == "gold_price":
  130. return VerticalDomainProcessor._parse_gold_price_data(search_results)
  131. elif domain_type == "silver_price":
  132. return VerticalDomainProcessor._parse_silver_price_data(search_results)
  133. elif domain_type == "lottery":
  134. return VerticalDomainProcessor._parse_lottery_data(search_results)
  135. elif domain_type == "movie":
  136. return VerticalDomainProcessor._parse_movie_data(search_results)
  137. elif domain_type == "traffic_restriction":
  138. return VerticalDomainProcessor._parse_traffic_data(search_results)
  139. elif domain_type == "football":
  140. return VerticalDomainProcessor._parse_football_data(search_results)
  141. else:
  142. logger.warning(f"不支持的垂直领域类型: {domain_type}")
  143. return None
  144. except Exception as e:
  145. logger.error(f"解析 {domain_type} 领域数据时出错: {e}")
  146. return None
  147. @staticmethod
  148. def _parse_weather_data(search_results: List[Dict]) -> Optional[Dict]:
  149. """解析天气数据"""
  150. # 这里可以实现具体的天气数据解析逻辑
  151. # 由于实际的API响应格式可能变化,这里提供一个基础框架
  152. first_result = search_results[0]
  153. snippet = first_result.get("snippet", "")
  154. return {
  155. "domain": "weather",
  156. "data_type": "weather_info",
  157. "raw_content": snippet,
  158. "parsed": True
  159. }
  160. @staticmethod
  161. def _parse_stock_data(search_results: List[Dict]) -> Optional[Dict]:
  162. """解析股票数据"""
  163. first_result = search_results[0]
  164. snippet = first_result.get("snippet", "")
  165. return {
  166. "domain": "stock",
  167. "data_type": "stock_info",
  168. "raw_content": snippet,
  169. "parsed": True
  170. }
  171. @staticmethod
  172. def _parse_exchange_rate_data(search_results: List[Dict]) -> Optional[Dict]:
  173. """解析汇率数据"""
  174. first_result = search_results[0]
  175. snippet = first_result.get("snippet", "")
  176. return {
  177. "domain": "exchange_rate",
  178. "data_type": "exchange_rate_info",
  179. "raw_content": snippet,
  180. "parsed": True
  181. }
  182. @staticmethod
  183. def _parse_oil_price_data(search_results: List[Dict]) -> Optional[Dict]:
  184. """解析油价数据"""
  185. first_result = search_results[0]
  186. snippet = first_result.get("snippet", "")
  187. return {
  188. "domain": "oil_price",
  189. "data_type": "oil_price_info",
  190. "raw_content": snippet,
  191. "parsed": True
  192. }
  193. @staticmethod
  194. def _parse_calendar_data(search_results: List[Dict]) -> Optional[Dict]:
  195. """解析万年历数据"""
  196. first_result = search_results[0]
  197. snippet = first_result.get("snippet", "")
  198. return {
  199. "domain": "calendar",
  200. "data_type": "calendar_info",
  201. "raw_content": snippet,
  202. "parsed": True
  203. }
  204. @staticmethod
  205. def _parse_gold_price_data(search_results: List[Dict]) -> Optional[Dict]:
  206. """解析金价数据"""
  207. first_result = search_results[0]
  208. snippet = first_result.get("snippet", "")
  209. return {
  210. "domain": "gold_price",
  211. "data_type": "gold_price_info",
  212. "raw_content": snippet,
  213. "parsed": True
  214. }
  215. @staticmethod
  216. def _parse_silver_price_data(search_results: List[Dict]) -> Optional[Dict]:
  217. """解析银价数据"""
  218. first_result = search_results[0]
  219. snippet = first_result.get("snippet", "")
  220. return {
  221. "domain": "silver_price",
  222. "data_type": "silver_price_info",
  223. "raw_content": snippet,
  224. "parsed": True
  225. }
  226. @staticmethod
  227. def _parse_lottery_data(search_results: List[Dict]) -> Optional[Dict]:
  228. """解析彩票数据"""
  229. first_result = search_results[0]
  230. snippet = first_result.get("snippet", "")
  231. return {
  232. "domain": "lottery",
  233. "data_type": "lottery_info",
  234. "raw_content": snippet,
  235. "parsed": True
  236. }
  237. @staticmethod
  238. def _parse_movie_data(search_results: List[Dict]) -> Optional[Dict]:
  239. """解析影视数据"""
  240. first_result = search_results[0]
  241. snippet = first_result.get("snippet", "")
  242. return {
  243. "domain": "movie",
  244. "data_type": "movie_info",
  245. "raw_content": snippet,
  246. "parsed": True
  247. }
  248. @staticmethod
  249. def _parse_traffic_data(search_results: List[Dict]) -> Optional[Dict]:
  250. """解析车牌限行数据"""
  251. first_result = search_results[0]
  252. snippet = first_result.get("snippet", "")
  253. return {
  254. "domain": "traffic_restriction",
  255. "data_type": "traffic_restriction_info",
  256. "raw_content": snippet,
  257. "parsed": True
  258. }
  259. @staticmethod
  260. def _parse_football_data(search_results: List[Dict]) -> Optional[Dict]:
  261. """解析足球赛事数据"""
  262. first_result = search_results[0]
  263. snippet = first_result.get("snippet", "")
  264. return {
  265. "domain": "football",
  266. "data_type": "football_info",
  267. "raw_content": snippet,
  268. "parsed": True
  269. }
  270. @staticmethod
  271. def format_vertical_domain_response(
  272. content: str,
  273. search_info: Dict,
  274. structured_data: Optional[Dict] = None
  275. ) -> str:
  276. """
  277. 格式化垂直领域搜索响应
  278. Args:
  279. content: 原始回复内容
  280. search_info: 搜索信息
  281. structured_data: 结构化数据
  282. Returns:
  283. 格式化后的内容
  284. """
  285. if not structured_data:
  286. return content
  287. domain_type = structured_data.get("domain")
  288. domain_name = VerticalDomainProcessor.SUPPORTED_DOMAINS.get(domain_type, domain_type)
  289. # 在内容末尾添加垂直领域数据说明
  290. formatted_content = content
  291. if structured_data.get("parsed"):
  292. formatted_content += f"\n\n*以上信息来自{domain_name}垂直领域搜索,数据已结构化处理。*"
  293. return formatted_content
  294. @staticmethod
  295. def process_vertical_domain_search(search_info: Dict) -> Dict:
  296. """
  297. 处理垂直领域搜索的完整流程
  298. Args:
  299. search_info: 搜索信息字典
  300. Returns:
  301. 处理后的搜索信息,包含垂直领域数据
  302. """
  303. if not search_info:
  304. return search_info
  305. # 检测垂直领域类型
  306. domain_type = VerticalDomainProcessor.detect_vertical_domain(search_info)
  307. if not domain_type:
  308. return search_info
  309. # 提取结构化数据
  310. structured_data = VerticalDomainProcessor.extract_structured_data(search_info, domain_type)
  311. # 将垂直领域信息添加到搜索信息中
  312. enhanced_search_info = search_info.copy()
  313. enhanced_search_info["vertical_domain_detected"] = domain_type
  314. enhanced_search_info["vertical_domain_name"] = VerticalDomainProcessor.SUPPORTED_DOMAINS[domain_type]
  315. if structured_data:
  316. enhanced_search_info["vertical_structured_data"] = structured_data
  317. logger.info(f"垂直领域搜索处理完成: {domain_type}")
  318. return enhanced_search_info
  319. @staticmethod
  320. def is_vertical_domain_result(search_info: Dict) -> bool:
  321. """
  322. 判断搜索结果是否为垂直领域结果
  323. Args:
  324. search_info: 搜索信息字典
  325. Returns:
  326. 是否为垂直领域结果
  327. """
  328. if not search_info:
  329. return False
  330. # 检查是否有垂直领域标识
  331. return (
  332. "vertical_domain_type" in search_info or
  333. "vertical_extension_data" in search_info or
  334. "structured_data" in search_info or
  335. VerticalDomainProcessor.detect_vertical_domain(search_info) is not None
  336. )