import json import asyncio from foundation.observability.monitoring.time_statistics import track_execution_time from foundation.ai.rag.retrieval.retrieval import retrieval_manager from foundation.observability.logger.loggering import server_logger class EntitiesEnhance(): def __init__(self): self.save_path = "temp\entity_bfp_recall\entity_bfp_recall.json" self.bfp_result_lists = [] @track_execution_time def entities_enhance_retrieval(self, query_pairs): def run_async(coro): """在合适的环境中运行异步函数""" try: loop = asyncio.get_running_loop() import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(asyncio.run, coro) return future.result() except RuntimeError: return asyncio.run(coro) # 清空之前的结果 self.bfp_result_lists = [] for query_pair in query_pairs: entity = query_pair['entity'] search_keywords = query_pair['search_keywords'] background = query_pair['background'] server_logger.info(f"正在处理实体:{entity},辅助搜索词:{search_keywords},背景:{background}") entity_list = run_async(retrieval_manager.entity_recall( entity, search_keywords, recall_top_k=5, # 主实体返回数量 max_results=5 # 最终最多返回20个实体文本 )) # BFP背景增强召回 bfp_result = run_async(retrieval_manager.async_bfp_recall(entity_list, background, top_k=3)) # 为每个结果添加实体信息 for result in bfp_result: result['source_entity'] = entity self.bfp_result_lists.append(bfp_result) self.test_file(self.bfp_result_lists, seve=True) return self.bfp_result_lists def test_file(self,bfp_result,seve = False): if seve: with open(self.save_path, "w", encoding="utf-8") as f: json.dump(bfp_result, f, ensure_ascii=False, indent=4) entity_enhance = EntitiesEnhance()