""" 垂直领域搜索结果处理器 处理垂直领域搜索返回的结构化数据 需求: 3.4, 3.5 """ import logging from typing import Dict, List, Optional, Any, Union from app.schemas.llm_schema import SearchResult logger = logging.getLogger(__name__) class VerticalDomainProcessor: """垂直领域搜索结果处理器""" # 支持的垂直领域类型 SUPPORTED_DOMAINS = { "weather": "天气", "stock": "股票", "exchange_rate": "汇率", "oil_price": "油价", "calendar": "万年历", "gold_price": "金价", "silver_price": "银价", "lottery": "彩票", "movie": "影视", "traffic_restriction": "车牌限行", "football": "足球赛事" } @staticmethod def detect_vertical_domain(search_info: Dict) -> Optional[str]: """ 检测搜索结果是否包含垂直领域数据 Args: search_info: 搜索信息字典 Returns: 检测到的垂直领域类型,如果没有则返回None """ if not search_info: return None # 检查是否有垂直领域标识 domain_type = search_info.get("vertical_domain_type") if domain_type and domain_type in VerticalDomainProcessor.SUPPORTED_DOMAINS: logger.info(f"检测到垂直领域类型: {domain_type}") return domain_type # 通过搜索结果内容推断领域类型 search_results = search_info.get("search_results", []) if search_results: # 检查第一个结果的内容特征 first_result = search_results[0] title = first_result.get("title", "").lower() snippet = first_result.get("snippet", "").lower() # 基于关键词推断领域类型 content = f"{title} {snippet}" if any(keyword in content for keyword in ["天气", "气温", "降雨", "weather"]): return "weather" if any(keyword in content for keyword in ["股票", "股价", "涨跌", "stock", "沪指", "深指"]): return "stock" elif any(keyword in content for keyword in ["汇率", "美元", "人民币", "exchange"]): return "exchange_rate" elif any(keyword in content for keyword in ["油价", "汽油", "柴油", "oil"]): return "oil_price" elif any(keyword in content for keyword in ["农历", "黄历", "节气", "calendar"]): return "calendar" elif any(keyword in content for keyword in ["金价", "黄金", "gold"]): return "gold_price" elif any(keyword in content for keyword in ["银价", "白银", "silver"]): return "silver_price" elif any(keyword in content for keyword in ["双色球", "大乐透", "彩票", "lottery"]): return "lottery" elif any(keyword in content for keyword in ["电影", "电视剧", "影视", "movie"]): return "movie" elif any(keyword in content for keyword in ["限行", "车牌", "traffic"]): return "traffic_restriction" elif any(keyword in content for keyword in ["足球", "联赛", "积分榜", "football"]): return "football" return None @staticmethod def extract_structured_data(search_info: Dict, domain_type: str) -> Optional[Dict]: """ 从搜索信息中提取结构化数据 Args: search_info: 搜索信息字典 domain_type: 垂直领域类型 Returns: 提取的结构化数据,如果没有则返回None """ if not search_info or domain_type not in VerticalDomainProcessor.SUPPORTED_DOMAINS: return None # 检查是否有专门的结构化数据字段 structured_data = search_info.get("structured_data") if structured_data: logger.info(f"提取到 {domain_type} 领域的结构化数据") return structured_data # 检查是否有垂直领域扩展数据 vertical_data = search_info.get("vertical_extension_data") if vertical_data: logger.info(f"提取到 {domain_type} 领域的垂直扩展数据") return vertical_data # 尝试从搜索结果中解析结构化信息 search_results = search_info.get("search_results", []) if search_results: parsed_data = VerticalDomainProcessor._parse_domain_specific_data( search_results, domain_type ) if parsed_data: logger.info(f"从搜索结果中解析出 {domain_type} 领域的结构化数据") return parsed_data return None @staticmethod def _parse_domain_specific_data(search_results: List[Dict], domain_type: str) -> Optional[Dict]: """ 解析特定领域的结构化数据 Args: search_results: 搜索结果列表 domain_type: 垂直领域类型 Returns: 解析的结构化数据 """ if not search_results: return None try: if domain_type == "weather": return VerticalDomainProcessor._parse_weather_data(search_results) elif domain_type == "stock": return VerticalDomainProcessor._parse_stock_data(search_results) elif domain_type == "exchange_rate": return VerticalDomainProcessor._parse_exchange_rate_data(search_results) elif domain_type == "oil_price": return VerticalDomainProcessor._parse_oil_price_data(search_results) elif domain_type == "calendar": return VerticalDomainProcessor._parse_calendar_data(search_results) elif domain_type == "gold_price": return VerticalDomainProcessor._parse_gold_price_data(search_results) elif domain_type == "silver_price": return VerticalDomainProcessor._parse_silver_price_data(search_results) elif domain_type == "lottery": return VerticalDomainProcessor._parse_lottery_data(search_results) elif domain_type == "movie": return VerticalDomainProcessor._parse_movie_data(search_results) elif domain_type == "traffic_restriction": return VerticalDomainProcessor._parse_traffic_data(search_results) elif domain_type == "football": return VerticalDomainProcessor._parse_football_data(search_results) else: logger.warning(f"不支持的垂直领域类型: {domain_type}") return None except Exception as e: logger.error(f"解析 {domain_type} 领域数据时出错: {e}") return None @staticmethod def _parse_weather_data(search_results: List[Dict]) -> Optional[Dict]: """解析天气数据""" # 这里可以实现具体的天气数据解析逻辑 # 由于实际的API响应格式可能变化,这里提供一个基础框架 first_result = search_results[0] snippet = first_result.get("snippet", "") return { "domain": "weather", "data_type": "weather_info", "raw_content": snippet, "parsed": True } @staticmethod def _parse_stock_data(search_results: List[Dict]) -> Optional[Dict]: """解析股票数据""" first_result = search_results[0] snippet = first_result.get("snippet", "") return { "domain": "stock", "data_type": "stock_info", "raw_content": snippet, "parsed": True } @staticmethod def _parse_exchange_rate_data(search_results: List[Dict]) -> Optional[Dict]: """解析汇率数据""" first_result = search_results[0] snippet = first_result.get("snippet", "") return { "domain": "exchange_rate", "data_type": "exchange_rate_info", "raw_content": snippet, "parsed": True } @staticmethod def _parse_oil_price_data(search_results: List[Dict]) -> Optional[Dict]: """解析油价数据""" first_result = search_results[0] snippet = first_result.get("snippet", "") return { "domain": "oil_price", "data_type": "oil_price_info", "raw_content": snippet, "parsed": True } @staticmethod def _parse_calendar_data(search_results: List[Dict]) -> Optional[Dict]: """解析万年历数据""" first_result = search_results[0] snippet = first_result.get("snippet", "") return { "domain": "calendar", "data_type": "calendar_info", "raw_content": snippet, "parsed": True } @staticmethod def _parse_gold_price_data(search_results: List[Dict]) -> Optional[Dict]: """解析金价数据""" first_result = search_results[0] snippet = first_result.get("snippet", "") return { "domain": "gold_price", "data_type": "gold_price_info", "raw_content": snippet, "parsed": True } @staticmethod def _parse_silver_price_data(search_results: List[Dict]) -> Optional[Dict]: """解析银价数据""" first_result = search_results[0] snippet = first_result.get("snippet", "") return { "domain": "silver_price", "data_type": "silver_price_info", "raw_content": snippet, "parsed": True } @staticmethod def _parse_lottery_data(search_results: List[Dict]) -> Optional[Dict]: """解析彩票数据""" first_result = search_results[0] snippet = first_result.get("snippet", "") return { "domain": "lottery", "data_type": "lottery_info", "raw_content": snippet, "parsed": True } @staticmethod def _parse_movie_data(search_results: List[Dict]) -> Optional[Dict]: """解析影视数据""" first_result = search_results[0] snippet = first_result.get("snippet", "") return { "domain": "movie", "data_type": "movie_info", "raw_content": snippet, "parsed": True } @staticmethod def _parse_traffic_data(search_results: List[Dict]) -> Optional[Dict]: """解析车牌限行数据""" first_result = search_results[0] snippet = first_result.get("snippet", "") return { "domain": "traffic_restriction", "data_type": "traffic_restriction_info", "raw_content": snippet, "parsed": True } @staticmethod def _parse_football_data(search_results: List[Dict]) -> Optional[Dict]: """解析足球赛事数据""" first_result = search_results[0] snippet = first_result.get("snippet", "") return { "domain": "football", "data_type": "football_info", "raw_content": snippet, "parsed": True } @staticmethod def format_vertical_domain_response( content: str, search_info: Dict, structured_data: Optional[Dict] = None ) -> str: """ 格式化垂直领域搜索响应 Args: content: 原始回复内容 search_info: 搜索信息 structured_data: 结构化数据 Returns: 格式化后的内容 """ if not structured_data: return content domain_type = structured_data.get("domain") domain_name = VerticalDomainProcessor.SUPPORTED_DOMAINS.get(domain_type, domain_type) # 在内容末尾添加垂直领域数据说明 formatted_content = content if structured_data.get("parsed"): formatted_content += f"\n\n*以上信息来自{domain_name}垂直领域搜索,数据已结构化处理。*" return formatted_content @staticmethod def process_vertical_domain_search(search_info: Dict) -> Dict: """ 处理垂直领域搜索的完整流程 Args: search_info: 搜索信息字典 Returns: 处理后的搜索信息,包含垂直领域数据 """ if not search_info: return search_info # 检测垂直领域类型 domain_type = VerticalDomainProcessor.detect_vertical_domain(search_info) if not domain_type: return search_info # 提取结构化数据 structured_data = VerticalDomainProcessor.extract_structured_data(search_info, domain_type) # 将垂直领域信息添加到搜索信息中 enhanced_search_info = search_info.copy() enhanced_search_info["vertical_domain_detected"] = domain_type enhanced_search_info["vertical_domain_name"] = VerticalDomainProcessor.SUPPORTED_DOMAINS[domain_type] if structured_data: enhanced_search_info["vertical_structured_data"] = structured_data logger.info(f"垂直领域搜索处理完成: {domain_type}") return enhanced_search_info @staticmethod def is_vertical_domain_result(search_info: Dict) -> bool: """ 判断搜索结果是否为垂直领域结果 Args: search_info: 搜索信息字典 Returns: 是否为垂直领域结果 """ if not search_info: return False # 检查是否有垂直领域标识 return ( "vertical_domain_type" in search_info or "vertical_extension_data" in search_info or "structured_data" in search_info or VerticalDomainProcessor.detect_vertical_domain(search_info) is not None )