| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- import json
- import asyncio
- from foundation.observability.monitoring.time_statistics import track_execution_time
- from foundation.ai.rag.retrieval.retrieval import retrieval_manager
- from foundation.observability.logger.loggering import server_logger
- class EntitiesEnhance():
- def __init__(self):
- self.save_path = "temp\entity_bfp_recall\entity_bfp_recall.json"
- self.bfp_result_lists = []
- @track_execution_time
- def entities_enhance_retrieval(self, query_pairs):
- def run_async(coro):
- """在合适的环境中运行异步函数"""
- try:
- loop = asyncio.get_running_loop()
- import concurrent.futures
- with concurrent.futures.ThreadPoolExecutor() as executor:
- future = executor.submit(asyncio.run, coro)
- return future.result()
- except RuntimeError:
- return asyncio.run(coro)
- # 清空之前的结果
- self.bfp_result_lists = []
- for query_pair in query_pairs:
- entity = query_pair['entity']
- search_keywords = query_pair['search_keywords']
- background = query_pair['background']
- server_logger.info(f"正在处理实体:{entity},辅助搜索词:{search_keywords},背景:{background}")
- entity_list = run_async(retrieval_manager.entity_recall(
- entity,
- search_keywords,
- recall_top_k=5, # 主实体返回数量
- max_results=5 # 最终最多返回20个实体文本
- ))
- # BFP背景增强召回
- bfp_result = run_async(retrieval_manager.async_bfp_recall(entity_list, background, top_k=3))
- # 为每个结果添加实体信息
- for result in bfp_result:
- result['source_entity'] = entity
- self.bfp_result_lists.append(bfp_result)
- self.test_file(self.bfp_result_lists, seve=True)
- return self.bfp_result_lists
-
- def test_file(self,bfp_result,seve = False):
- if seve:
- with open(self.save_path, "w", encoding="utf-8") as f:
- json.dump(bfp_result, f, ensure_ascii=False, indent=4)
- entity_enhance = EntitiesEnhance()
|