|
@@ -63,141 +63,116 @@ def fetch_parent_document(
|
|
|
return None
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
-def enhance_with_parent_docs(
|
|
|
|
|
|
|
+
|
|
|
|
|
+def enhance_with_parent_docs_grouped(
|
|
|
milvus_manager,
|
|
milvus_manager,
|
|
|
bfp_result_lists: List,
|
|
bfp_result_lists: List,
|
|
|
- top_k: int = 3,
|
|
|
|
|
- max_parent_text_length: Optional[int] = None
|
|
|
|
|
|
|
+ score_threshold: float = 0.5,
|
|
|
|
|
+ max_parents_per_pair: int = 2,
|
|
|
|
|
+ # max_parent_text_length: Optional[int] = None
|
|
|
) -> Dict[str, Any]:
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
"""
|
|
|
- 使用父文档增强检索结果 (显式返回版本)
|
|
|
|
|
|
|
+ 分组增强 + 按分数筛选 (每个查询对独立处理)
|
|
|
|
|
|
|
|
- 流程:
|
|
|
|
|
- 1. 统计所有 parent_id 的出现频率
|
|
|
|
|
- 2. 按频率排序,取 top-k 个 parent_id
|
|
|
|
|
- 3. 查询这 k 个父文档
|
|
|
|
|
- 4. 将所有父文档拼接在一起
|
|
|
|
|
- 5. 创建新的结果列表,添加父文档内容
|
|
|
|
|
- 6. 显式返回增强后的结果
|
|
|
|
|
|
|
+ 核心逻辑:
|
|
|
|
|
+ 1. 每个查询对独立处理,按 bfp_rerank_score 筛选
|
|
|
|
|
+ 2. 只保留并增强高分结果,低分查询对直接跳过
|
|
|
|
|
+ 3. 用父ID从Milvus召回父文档内容
|
|
|
|
|
+ 4. 将父文档内容拼接到高分结果后
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
|
milvus_manager: MilvusManager 实例
|
|
milvus_manager: MilvusManager 实例
|
|
|
- bfp_result_lists: 检索结果列表 (不会被修改)
|
|
|
|
|
- top_k: 提取前k个父文档ID (默认3个)
|
|
|
|
|
|
|
+ bfp_result_lists: 检索结果列表 (二维,每个子列表对应一个查询对)
|
|
|
|
|
+ score_threshold: bfp_rerank_score 最低阈值,低于此分数直接跳过 (默认0.7)
|
|
|
|
|
+ max_parents_per_pair: 每个查询对最多选取的父文档数量 (默认2个)
|
|
|
max_parent_text_length: 单个父文档最大长度限制 (None=不限制)
|
|
max_parent_text_length: 单个父文档最大长度限制 (None=不限制)
|
|
|
|
|
|
|
|
Returns:
|
|
Returns:
|
|
|
- Dict[str, Any]: 增强结果,包含:
|
|
|
|
|
- - enhanced_results: 增强后的结果列表
|
|
|
|
|
|
|
+ Dict: 增强结果,包含:
|
|
|
|
|
+ - enhanced_results: 增强后的结果列表 (二维,只包含高分查询对)
|
|
|
- enhanced_count: 成功增强的结果数量
|
|
- enhanced_count: 成功增强的结果数量
|
|
|
- - parent_docs: 使用的父文档列表
|
|
|
|
|
- - combined_text: 拼接后的父文档文本
|
|
|
|
|
|
|
+ - parent_docs: 使用的所有父文档列表
|
|
|
|
|
+ - enhanced_pairs: 进行了增强的查询对数量
|
|
|
|
|
+ - total_pairs: 原始查询对总数
|
|
|
"""
|
|
"""
|
|
|
- # Step 1: 统计 parent_id 出现频率
|
|
|
|
|
- parent_id_freq = {}
|
|
|
|
|
- for result_list in bfp_result_lists:
|
|
|
|
|
|
|
+ enhanced_results = []
|
|
|
|
|
+ total_enhanced_count = 0
|
|
|
|
|
+ all_parent_docs = []
|
|
|
|
|
+ enhanced_pairs_count = 0
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"[分组增强] 开始处理 {len(bfp_result_lists)} 个查询对,阈值={score_threshold}")
|
|
|
|
|
+
|
|
|
|
|
+ for pair_idx, result_list in enumerate(bfp_result_lists):
|
|
|
if not result_list:
|
|
if not result_list:
|
|
|
continue
|
|
continue
|
|
|
- for result in result_list:
|
|
|
|
|
- metadata = result.get('metadata', {})
|
|
|
|
|
- parent_id = metadata.get('parent_id')
|
|
|
|
|
- if parent_id:
|
|
|
|
|
- parent_id = str(parent_id)
|
|
|
|
|
- parent_id_freq[parent_id] = parent_id_freq.get(parent_id, 0) + 1
|
|
|
|
|
-
|
|
|
|
|
- if not parent_id_freq:
|
|
|
|
|
- logger.info("[父文档工具] 没有发现父文档ID")
|
|
|
|
|
- return {
|
|
|
|
|
- 'enhanced_results': [],
|
|
|
|
|
- 'enhanced_count': 0,
|
|
|
|
|
- 'parent_docs': [],
|
|
|
|
|
- 'combined_text': ''
|
|
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
- # Step 2: 按频率排序,取 top-k 个 parent_id
|
|
|
|
|
- top_parent_items = sorted(
|
|
|
|
|
- parent_id_freq.items(),
|
|
|
|
|
- key=lambda x: x[1],
|
|
|
|
|
- reverse=True
|
|
|
|
|
- )[:top_k]
|
|
|
|
|
-
|
|
|
|
|
- top_parent_ids = [pid for pid, freq in top_parent_items]
|
|
|
|
|
- logger.info(f"[父文档工具] 提取 top-{len(top_parent_ids)} 父文档ID: {top_parent_ids}")
|
|
|
|
|
- logger.debug(f"[父文档工具] 父文档频率: {dict(top_parent_items)}")
|
|
|
|
|
-
|
|
|
|
|
- # Step 3: 批量查询父文档内容
|
|
|
|
|
- parent_docs = []
|
|
|
|
|
- for parent_id in top_parent_ids:
|
|
|
|
|
- parent_doc = fetch_parent_document(
|
|
|
|
|
- milvus_manager=milvus_manager,
|
|
|
|
|
- parent_id=parent_id
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ # 1. 按分数排序并筛选高分结果
|
|
|
|
|
+ sorted_results = sorted(result_list, key=lambda x: x.get('bfp_rerank_score', 0), reverse=True)
|
|
|
|
|
+ high_score_results = [r for r in sorted_results if r.get('bfp_rerank_score', 0) >= score_threshold]
|
|
|
|
|
|
|
|
- if parent_doc and parent_doc.get('text'):
|
|
|
|
|
- parent_text = parent_doc['text']
|
|
|
|
|
|
|
+ if not high_score_results:
|
|
|
|
|
+ logger.info(f"[分组增强] 查询对 {pair_idx}: 所有结果分数均低于 {score_threshold},跳过")
|
|
|
|
|
+ continue
|
|
|
|
|
|
|
|
- # 可选: 截断过长的父文档
|
|
|
|
|
- if max_parent_text_length and len(parent_text) > max_parent_text_length:
|
|
|
|
|
- parent_text = parent_text[:max_parent_text_length] + "\n...(内容过长已截断)"
|
|
|
|
|
- logger.debug(f"[父文档工具] 父文档 {parent_id} 内容过长,已截断到 {max_parent_text_length} 字符")
|
|
|
|
|
|
|
+ # 2. 提取父ID(去重,限制数量)
|
|
|
|
|
+ parent_ids = list(set([
|
|
|
|
|
+ r.get('metadata', {}).get('parent_id')
|
|
|
|
|
+ for r in high_score_results[:max_parents_per_pair]
|
|
|
|
|
+ if r.get('metadata', {}).get('parent_id')
|
|
|
|
|
+ ]))
|
|
|
|
|
|
|
|
- parent_docs.append({
|
|
|
|
|
- 'parent_id': parent_id,
|
|
|
|
|
- 'text': parent_text
|
|
|
|
|
- })
|
|
|
|
|
- logger.info(f"[父文档工具] 成功查询父文档 {parent_id}, 内容长度: {len(parent_text)}")
|
|
|
|
|
- else:
|
|
|
|
|
- logger.warning(f"[父文档工具] 父文档 {parent_id} 查询失败或内容为空")
|
|
|
|
|
-
|
|
|
|
|
- if not parent_docs:
|
|
|
|
|
- logger.warning("[父文档工具] 所有父文档查询均失败")
|
|
|
|
|
- return {
|
|
|
|
|
- 'enhanced_results': [],
|
|
|
|
|
- 'enhanced_count': 0,
|
|
|
|
|
- 'parent_docs': [],
|
|
|
|
|
- 'combined_text': ''
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ if not parent_ids:
|
|
|
|
|
+ logger.warning(f"[分组增强] 查询对 {pair_idx}: 没有有效的parent_id,跳过")
|
|
|
|
|
+ continue
|
|
|
|
|
|
|
|
- # Step 4: 将所有父文档拼接在一起
|
|
|
|
|
- combined_parent_text = "\n".join([
|
|
|
|
|
- f"【父文档 {i+1}】\n{doc['text']}"
|
|
|
|
|
- for i, doc in enumerate(parent_docs)
|
|
|
|
|
- ])
|
|
|
|
|
- logger.info(f"[父文档工具] 拼接了 {len(parent_docs)} 个父文档, 总长度: {len(combined_parent_text)}")
|
|
|
|
|
|
|
+ # 3. 查询父文档内容
|
|
|
|
|
+ parent_docs = []
|
|
|
|
|
+ for pid in parent_ids:
|
|
|
|
|
+ doc = fetch_parent_document(milvus_manager, str(pid))
|
|
|
|
|
+ if doc and doc.get('text'):
|
|
|
|
|
+ text = doc['text']
|
|
|
|
|
+ # if max_parent_text_length and len(text) > max_parent_text_length:
|
|
|
|
|
+ # text = text[:max_parent_text_length] + "\n...(已截断)"
|
|
|
|
|
+ parent_docs.append({'parent_id': pid, 'text': text})
|
|
|
|
|
+
|
|
|
|
|
+ if not parent_docs:
|
|
|
|
|
+ logger.warning(f"[分组增强] 查询对 {pair_idx}: 父文档查询失败,跳过")
|
|
|
|
|
+ continue
|
|
|
|
|
|
|
|
- # Step 5: 创建新的增强结果列表
|
|
|
|
|
- enhanced_results = []
|
|
|
|
|
- enhanced_count = 0
|
|
|
|
|
|
|
+ # 4. 拼接父文档内容
|
|
|
|
|
+ combined_text = "\n".join([f"【参考文档 {i+1}】\n{d['text']}" for i, d in enumerate(parent_docs)])
|
|
|
|
|
|
|
|
- for result_list in bfp_result_lists:
|
|
|
|
|
|
|
+ # 5. 只保留并增强高分结果
|
|
|
enhanced_list = []
|
|
enhanced_list = []
|
|
|
- if not result_list:
|
|
|
|
|
- enhanced_results.append(enhanced_list)
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
for result in result_list:
|
|
for result in result_list:
|
|
|
- # 创建新的结果字典 (不修改原数据)
|
|
|
|
|
- enhanced_result = {
|
|
|
|
|
- 'text_content': result.get('text_content', '') + f"\n{combined_parent_text}\n",
|
|
|
|
|
- 'metadata': result.get('metadata', {}),
|
|
|
|
|
- 'hybrid_similarity': result.get('hybrid_similarity'),
|
|
|
|
|
- 'rerank_score': result.get('rerank_score'),
|
|
|
|
|
- 'bfp_rerank_score': result.get('bfp_rerank_score'),
|
|
|
|
|
- 'bfp_rerank_parent_id': result.get('bfp_rerank_parent_id', '')
|
|
|
|
|
- }
|
|
|
|
|
- enhanced_list.append(enhanced_result)
|
|
|
|
|
- enhanced_count += 1
|
|
|
|
|
-
|
|
|
|
|
- enhanced_results.append(enhanced_list)
|
|
|
|
|
|
|
+ if result.get('bfp_rerank_score', 0) >= score_threshold:
|
|
|
|
|
+ enhanced_list.append({
|
|
|
|
|
+ 'text_content': result.get('text_content', '') + f"\n{combined_text}\n",
|
|
|
|
|
+ 'metadata': result.get('metadata', {}),
|
|
|
|
|
+ 'hybrid_similarity': result.get('hybrid_similarity'),
|
|
|
|
|
+ 'rerank_score': result.get('rerank_score'),
|
|
|
|
|
+ 'bfp_rerank_score': result.get('bfp_rerank_score'),
|
|
|
|
|
+ 'bfp_rerank_parent_id': result.get('bfp_rerank_parent_id', ''),
|
|
|
|
|
+ 'source_entity': result.get('source_entity', ''),
|
|
|
|
|
+ 'enhanced': True,
|
|
|
|
|
+ 'parent_docs_count': len(parent_docs)
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ if enhanced_list:
|
|
|
|
|
+ enhanced_results.append(enhanced_list)
|
|
|
|
|
+ all_parent_docs.extend(parent_docs)
|
|
|
|
|
+ enhanced_pairs_count += 1
|
|
|
|
|
+ total_enhanced_count += len(enhanced_list)
|
|
|
|
|
+ logger.info(f"[分组增强] 查询对 {pair_idx}: 保留 {len(enhanced_list)} 个高分结果")
|
|
|
|
|
|
|
|
- logger.info(f"[父文档工具] 成功增强 {enhanced_count} 个结果")
|
|
|
|
|
|
|
+ logger.info(f"[分组增强] 完成: {enhanced_pairs_count}/{len(bfp_result_lists)} 个查询对,{total_enhanced_count} 个结果")
|
|
|
|
|
|
|
|
- # Step 6: 显式返回增强结果
|
|
|
|
|
return {
|
|
return {
|
|
|
'enhanced_results': enhanced_results,
|
|
'enhanced_results': enhanced_results,
|
|
|
- 'enhanced_count': enhanced_count,
|
|
|
|
|
- 'parent_docs': parent_docs,
|
|
|
|
|
- 'combined_text': combined_parent_text
|
|
|
|
|
|
|
+ 'enhanced_count': total_enhanced_count,
|
|
|
|
|
+ 'parent_docs': all_parent_docs,
|
|
|
|
|
+ 'enhanced_pairs': enhanced_pairs_count,
|
|
|
|
|
+ 'total_pairs': len(bfp_result_lists)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@@ -317,3 +292,4 @@ def extract_query_pairs_results(bfp_result_lists: List, query_pairs: List[Dict]
|
|
|
logger.info(f"[父文档工具] 提取完成: 总计 {total_count} 个查询对,{filtered_count} 个结果通过阈值过滤")
|
|
logger.info(f"[父文档工具] 提取完成: 总计 {total_count} 个查询对,{filtered_count} 个结果通过阈值过滤")
|
|
|
|
|
|
|
|
return entity_results
|
|
return entity_results
|
|
|
|
|
+
|