|
|
@@ -56,6 +56,10 @@ from typing import Any, Dict, List, Optional, Sequence
|
|
|
|
|
|
from core.base.task_models import TaskFileInfo
|
|
|
from core.construction_review.component.infrastructure.milvus import MilvusConfig, MilvusManager
|
|
|
+from core.construction_review.component.infrastructure.parent_tool import (
|
|
|
+ enhance_with_parent_docs,
|
|
|
+ extract_first_result
|
|
|
+)
|
|
|
from core.construction_review.component.infrastructure.relevance import is_relevant_async
|
|
|
from core.construction_review.component.reviewers.base_reviewer import BaseReviewer
|
|
|
from core.construction_review.component.reviewers.outline_reviewer import OutlineReviewer
|
|
|
@@ -201,8 +205,10 @@ class AIReviewEngine(BaseReviewer):
|
|
|
async with self.semaphore:
|
|
|
return await check_func(**kwargs)
|
|
|
|
|
|
- # 外层超时配置(单个任务的整体超时时间,略大于内部单次超时15秒)
|
|
|
- TASK_TIMEOUT = 20
|
|
|
+ # 外层超时配置(单个任务的整体超时时间)
|
|
|
+ # 计算公式: 模型单次超时(15秒) × (1次初始 + 最大重试次数2次) + 缓冲时间(10秒)
|
|
|
+ # = 15 × 3 + 10 = 55秒
|
|
|
+ TASK_TIMEOUT = 55
|
|
|
|
|
|
basic_tasks = []
|
|
|
|
|
|
@@ -269,7 +275,9 @@ class AIReviewEngine(BaseReviewer):
|
|
|
result = task.result()
|
|
|
results.append(result)
|
|
|
except Exception as e:
|
|
|
- logger.error(f"[基础审查] 任务执行失败: {str(e)}")
|
|
|
+ logger.error(f"[基础审查] 任务执行失败: {str(e)}", exc_info=True)
|
|
|
+ logger.error(f"[基础审查] 异常类型: {type(e).__name__}")
|
|
|
+ logger.error(f"[基础审查] 异常详情: {repr(e)}")
|
|
|
results.append(e)
|
|
|
|
|
|
# 根据配置项分配结果
|
|
|
@@ -337,12 +345,11 @@ class AIReviewEngine(BaseReviewer):
|
|
|
async with self.semaphore:
|
|
|
return await check_func(**kwargs)
|
|
|
|
|
|
- # 外层超时配置(单个任务的整体超时时间,略大于内部单次超时15秒)
|
|
|
- TASK_TIMEOUT = 20
|
|
|
-
|
|
|
# 根据配置动态创建技术性检查任务
|
|
|
technical_tasks = []
|
|
|
- task_mapping = [] # 任务名称映射
|
|
|
+ task_mapping = []
|
|
|
+
|
|
|
+ TASK_TIMEOUT = 150
|
|
|
|
|
|
if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():
|
|
|
task_mapping.append('non_parameter_compliance')
|
|
|
@@ -397,7 +404,9 @@ class AIReviewEngine(BaseReviewer):
|
|
|
result = task.result()
|
|
|
results.append(result)
|
|
|
except Exception as e:
|
|
|
- logger.error(f"[技术审查] 任务执行失败: {str(e)}")
|
|
|
+ logger.error(f"[技术审查] 任务执行失败: {str(e)}", exc_info=True)
|
|
|
+ logger.error(f"[技术审查] 异常类型: {type(e).__name__}")
|
|
|
+ logger.error(f"[技术审查] 异常详情: {repr(e)}")
|
|
|
results.append(e)
|
|
|
|
|
|
# 根据配置项分配结果
|
|
|
@@ -425,21 +434,30 @@ class AIReviewEngine(BaseReviewer):
|
|
|
"""
|
|
|
RAG增强审查
|
|
|
|
|
|
+ 流程:
|
|
|
+ 1. 提取查询对
|
|
|
+ 2. 实体增强检索
|
|
|
+ 3. 自动拼接父文档内容
|
|
|
+
|
|
|
Args:
|
|
|
unit_content: 待审查单元内容
|
|
|
|
|
|
Returns:
|
|
|
Dict[str, Any]: RAG增强审查结果
|
|
|
"""
|
|
|
- # 向量检索
|
|
|
+ # Step 1: 向量检索
|
|
|
query_content = unit_content['content']
|
|
|
- logger.info("构建查询对")
|
|
|
+ logger.info(f"[RAG增强] 开始处理, 内容长度: {len(query_content)}")
|
|
|
+
|
|
|
+ # Step 2: 查询提取 + 实体增强检索
|
|
|
query_pairs = query_rewrite_manager.query_extract(query_content)
|
|
|
- bfp_result_lists =entity_enhance.entities_enhance_retrieval(query_pairs)
|
|
|
- logger.info(f"bfp_result_lists{bfp_result_lists}")
|
|
|
- # 检查是否有检索结果
|
|
|
+ logger.info(f"[RAG增强] 提取到 {len(query_pairs)} 个查询对")
|
|
|
+
|
|
|
+ bfp_result_lists = entity_enhance.entities_enhance_retrieval(query_pairs)
|
|
|
+
|
|
|
+ # Step 3: 检查检索结果
|
|
|
if not bfp_result_lists:
|
|
|
- logger.warning("RAG检索未返回任何结果")
|
|
|
+ logger.warning("[RAG增强] 实体检索未返回结果")
|
|
|
return {
|
|
|
'vector_search': [],
|
|
|
'retrieval_status': 'no_results',
|
|
|
@@ -447,121 +465,34 @@ class AIReviewEngine(BaseReviewer):
|
|
|
'text_content': '',
|
|
|
'metadata': {}
|
|
|
}
|
|
|
- #todo
|
|
|
- #异步调用查询。查出所有的
|
|
|
-
|
|
|
- #todo
|
|
|
- # 使用bfp_result_list 获取 parent_id ,通过parent_id 获取父文档内容 utils_test\Milvus_Test\test_查询接口.py
|
|
|
- # llm 异步相关度分析 判断父文档是否与query_content 审查条文相关
|
|
|
- # 如果相关,则追加到 bfp_result,如果不相关则,则跳过
|
|
|
- PARENT_COLLECTION = "rag_parent_hybrid" # TODO: 改成你的父段 collection
|
|
|
- PARENT_TEXT_FIELD = "text" # TODO: 改成你的父段字段名
|
|
|
- PARENT_OUTPUT_FIELDS: Sequence[str] = ["parent_id", PARENT_TEXT_FIELD]
|
|
|
-
|
|
|
- def run_async(coro):
|
|
|
- """在同步函数中跑 async(兼容已有 event loop)"""
|
|
|
- try:
|
|
|
- asyncio.get_running_loop()
|
|
|
- with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
|
- return executor.submit(asyncio.run, coro).result()
|
|
|
- except RuntimeError:
|
|
|
- return asyncio.run(coro)
|
|
|
-
|
|
|
- async def _async_condition_query_one(pid: str) -> Optional[Dict[str, Any]]:
|
|
|
- """
|
|
|
- condition_query 是同步:用线程池包成 async
|
|
|
- 返回父段 row(或 None)
|
|
|
- """
|
|
|
- loop = asyncio.get_running_loop()
|
|
|
-
|
|
|
-
|
|
|
- def _call():
|
|
|
- rows = self.milvus.condition_query(
|
|
|
- collection_name=PARENT_COLLECTION,
|
|
|
- filter=f"parent_id == '{pid}'",
|
|
|
- output_fields=PARENT_OUTPUT_FIELDS,
|
|
|
- limit=1,
|
|
|
- )
|
|
|
- if not rows:
|
|
|
- return None
|
|
|
- row0 = rows[0] or {}
|
|
|
- # 白名单投影:避免 pk/id 等多余字段
|
|
|
- return {k: row0.get(k) for k in PARENT_OUTPUT_FIELDS if k in row0}
|
|
|
-
|
|
|
- return await loop.run_in_executor(None, _call)
|
|
|
-
|
|
|
- async def _enhance_all():
|
|
|
- # 1) 收集 parent_id -> 指向哪些 result 需要被拼接
|
|
|
- pid_to_results: Dict[str, List[Dict[str, Any]]] = {}
|
|
|
-
|
|
|
- for result_list in bfp_result_lists:
|
|
|
- for r in (result_list or []):
|
|
|
- md = r.get("metadata") or {}
|
|
|
- pid = md.get("parent_id")
|
|
|
- if not pid:
|
|
|
- continue
|
|
|
- pid = str(pid)
|
|
|
- pid_to_results.setdefault(pid, []).append(r)
|
|
|
-
|
|
|
- if not pid_to_results:
|
|
|
- return
|
|
|
-
|
|
|
- # 2) 逐个 parent_id 串行:查父段 -> LLM 判断 -> 拼接到对应 results
|
|
|
- for pid, results in pid_to_results.items():
|
|
|
- parent_doc = await _async_condition_query_one(pid)
|
|
|
-
|
|
|
- if not parent_doc:
|
|
|
- continue
|
|
|
-
|
|
|
- parent_text = (parent_doc.get(PARENT_TEXT_FIELD) or "").strip()
|
|
|
- if not parent_text:
|
|
|
- continue
|
|
|
-
|
|
|
- # LLM 判断是否相关(你已经封装好了 is_relevant_async:模型直接输出 relevant true/false)
|
|
|
- relevant = await is_relevant_async(query_content, parent_text)
|
|
|
- # print("================\n")
|
|
|
- # print(query_content)
|
|
|
- # print("\n=====\n")
|
|
|
- # print(parent_text)
|
|
|
- # print("\n=====\n")
|
|
|
- # print(relevant)
|
|
|
- # print("\n================\n")
|
|
|
- if not relevant:
|
|
|
- continue
|
|
|
-
|
|
|
- extra = (
|
|
|
- f"{parent_text}\n"
|
|
|
- )
|
|
|
|
|
|
- # 3) 拼接到所有属于该 parent_id 的条目 text_content
|
|
|
- for r in results:
|
|
|
- r["text_content"] = (r.get("text_content") or "") + extra
|
|
|
+ # Step 4: 父文档增强 (使用独立工具函数 - 显式返回)
|
|
|
+ try:
|
|
|
+ enhancement_result = enhance_with_parent_docs(self.milvus, bfp_result_lists)
|
|
|
+ enhanced_results = enhancement_result['enhanced_results']
|
|
|
+ enhanced_count = enhancement_result['enhanced_count']
|
|
|
|
|
|
- run_async(_enhance_all())
|
|
|
- logger.info(f"RAG检索返回了 {len(bfp_result_lists)} 个查询对结果")
|
|
|
- # 获取第一个查询对的第一个结果
|
|
|
- first_result_list = bfp_result_lists[0]
|
|
|
+ # 保存增强后的结果
|
|
|
+ with open(rf"temp\entity_bfp_recall\enhance_with_parent_docs.json", "w", encoding='utf-8') as f:
|
|
|
+ json.dump(enhanced_results, f, ensure_ascii=False, indent=4)
|
|
|
|
|
|
- if not first_result_list:
|
|
|
- logger.warning("第一个查询对无检索结果")
|
|
|
- return {
|
|
|
- 'vector_search': [],
|
|
|
- 'retrieval_status': 'no_results',
|
|
|
- 'file_name': '',
|
|
|
- 'text_content': '',
|
|
|
- 'metadata': {}
|
|
|
- }
|
|
|
+ logger.info(f"[RAG增强] 成功增强 {enhanced_count} 个结果")
|
|
|
+ logger.info(f"[RAG增强] 使用了 {len(enhancement_result['parent_docs'])} 个父文档")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"[RAG增强] 父文档增强失败: {e}", exc_info=True)
|
|
|
+ # 失败时使用原始结果
|
|
|
+ enhanced_results = bfp_result_lists
|
|
|
+
|
|
|
+ # Step 5: 提取第一个结果返回 (使用增强后的结果)
|
|
|
+ final_result = extract_first_result(enhanced_results)
|
|
|
+
|
|
|
+ # 保存最终结果用于调试
|
|
|
+ with open(rf"temp\entity_bfp_recall\extract_first_result.json", "w", encoding='utf-8') as f:
|
|
|
+ json.dump(final_result, f, ensure_ascii=False, indent=4)
|
|
|
+
|
|
|
+ return final_result
|
|
|
|
|
|
- first_result = first_result_list[0]
|
|
|
- file_name = first_result['metadata'].get('file_name', 'unknown')
|
|
|
- text_content = first_result['text_content']
|
|
|
|
|
|
- return {
|
|
|
- 'file_name': file_name,
|
|
|
- 'text_content': text_content,
|
|
|
- 'metadata': first_result['metadata']
|
|
|
- }
|
|
|
-
|
|
|
async def check_grammar(self, trace_id_idx: str, review_content: str, review_references: str,
|
|
|
review_location_label: str, state: str, stage_name: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
@@ -664,10 +595,10 @@ class AIReviewEngine(BaseReviewer):
|
|
|
if str(check_completeness_dir) not in sys.path:
|
|
|
sys.path.insert(0, str(check_completeness_dir))
|
|
|
|
|
|
- from components.data_loader import CSVDataLoader
|
|
|
- from components.prompt_builder import PromptBuilder
|
|
|
- from components.llm_client import LLMClient
|
|
|
- from components.result_processor import ResultProcessor
|
|
|
+ from check_completeness.components.data_loader import CSVDataLoader
|
|
|
+ from check_completeness.components.prompt_builder import PromptBuilder
|
|
|
+ from check_completeness.components.llm_client import LLMClient
|
|
|
+ from check_completeness.components.result_processor import ResultProcessor
|
|
|
|
|
|
name = "completeness_check"
|
|
|
start_time = time.time()
|
|
|
@@ -906,7 +837,7 @@ class AIReviewEngine(BaseReviewer):
|
|
|
prompt_name = Stage.TECHNICAL.value['non_parameter']
|
|
|
trace_id = prompt_name+trace_id_idx
|
|
|
return await self.review("non_parameter_compliance_check", trace_id, reviewer_type, prompt_name, review_content, review_references,
|
|
|
- reference_source, review_location_label, state, stage_name)
|
|
|
+ reference_source, review_location_label, state, stage_name, timeout=45)
|
|
|
|
|
|
async def check_parameter_compliance(self, trace_id_idx: str, review_content: str, review_references: str,
|
|
|
reference_source: str, review_location_label: str, state: str, stage_name: str) -> Dict[str, Any]:
|
|
|
@@ -929,7 +860,7 @@ class AIReviewEngine(BaseReviewer):
|
|
|
prompt_name = Stage.TECHNICAL.value['parameter']
|
|
|
trace_id = prompt_name+trace_id_idx
|
|
|
return await self.review("parameter_compliance_check", trace_id, reviewer_type, prompt_name, review_content, review_references,
|
|
|
- reference_source, review_location_label, state, stage_name)
|
|
|
+ reference_source, review_location_label, state, stage_name, timeout=45)
|
|
|
|
|
|
async def outline_check(self, trace_id_idx: str, outline_content: Dict[str, Any],
|
|
|
state:dict,stage_name:str) -> Dict[str, Any]:
|