|
|
@@ -46,21 +46,24 @@
|
|
|
└── set_review_location_label() # 设置审查位置标签
|
|
|
"""
|
|
|
|
|
|
-import time
|
|
|
-import json
|
|
|
import asyncio
|
|
|
-from enum import Enum
|
|
|
+import concurrent.futures
|
|
|
+import json
|
|
|
+import time
|
|
|
from dataclasses import dataclass
|
|
|
-from typing import Dict, List, Any
|
|
|
-from core.base.task_models import TaskFileInfo
|
|
|
-from foundation.infrastructure.config.config import config_handler
|
|
|
-from foundation.observability.logger.loggering import server_logger as logger
|
|
|
-from foundation.ai.rag.retrieval.query_rewrite import query_rewrite_manager
|
|
|
-from foundation.ai.rag.retrieval.entities_enhance import entity_enhance
|
|
|
+from enum import Enum
|
|
|
+from typing import Any, Dict, List, Optional, Sequence
|
|
|
+
|
|
|
+from core.base.task_models import TaskFileInfo
|
|
|
+from core.construction_review.component.infrastructure.milvus import MilvusConfig, MilvusManager
|
|
|
+from core.construction_review.component.infrastructure.relevance import is_relevant_async
|
|
|
from core.construction_review.component.reviewers.base_reviewer import BaseReviewer
|
|
|
from core.construction_review.component.reviewers.outline_reviewer import OutlineReviewer
|
|
|
-
|
|
|
-
|
|
|
+from core.construction_review.component.reviewers.utils.text_split import split_text
|
|
|
+from foundation.ai.rag.retrieval.entities_enhance import entity_enhance
|
|
|
+from foundation.ai.rag.retrieval.query_rewrite import query_rewrite_manager
|
|
|
+from foundation.infrastructure.config.config import config_handler
|
|
|
+from foundation.observability.logger.loggering import server_logger as logger
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
@@ -129,6 +132,8 @@ class AIReviewEngine(BaseReviewer):
|
|
|
self.milvus_collection = config_handler.get('milvus', 'MILVUS_COLLECTION', 'default')
|
|
|
self.outline_reviewer = OutlineReviewer()
|
|
|
|
|
|
+ self.milvus = MilvusManager(MilvusConfig())
|
|
|
+
|
|
|
def _process_review_result(self, result):
|
|
|
"""
|
|
|
处理审查结果,统一转换为字典格式
|
|
|
@@ -186,6 +191,8 @@ class AIReviewEngine(BaseReviewer):
|
|
|
Dict[str, Any]: 基础合规性检查结果
|
|
|
"""
|
|
|
review_content = unit_content['content']
|
|
|
+ with open('temp/review_content.txt', 'a', encoding='utf-8') as f:
|
|
|
+ f.write(str(unit_content))
|
|
|
#review_references = unit_content.get('review_references')
|
|
|
|
|
|
logger.info(f"basic开始基础合规性检查, 内容长度: {len(review_content)}")
|
|
|
@@ -208,6 +215,10 @@ class AIReviewEngine(BaseReviewer):
|
|
|
basic_tasks.append(
|
|
|
check_with_semaphore(self.check_sensitive, trace_id_idx=trace_id_idx, review_content=review_content, review_references=None, review_location_label=review_location_label, state=state, stage_name=stage_name),
|
|
|
)
|
|
|
+ if 'completeness_check' in self.task_info.get_review_config_list():
|
|
|
+ basic_tasks.append(
|
|
|
+ check_with_semaphore(self.check_completeness, trace_id_idx=trace_id_idx, review_content=unit_content, review_references=None, review_location_label=review_location_label, state=state, stage_name=stage_name),
|
|
|
+ )
|
|
|
|
|
|
# 一次性执行所有任务,避免重复协程调用
|
|
|
if not basic_tasks:
|
|
|
@@ -224,7 +235,8 @@ class AIReviewEngine(BaseReviewer):
|
|
|
grammar_result = self._process_review_result(None)
|
|
|
semantic_result = self._process_review_result(None)
|
|
|
sensitive_result = self._process_review_result(None)
|
|
|
-
|
|
|
+ completeness_result = self._process_review_result(None)
|
|
|
+ logger.info(f"completeness_result: {completeness_result}")
|
|
|
result_index = 0
|
|
|
|
|
|
if 'sensitive_word_check' in self.task_info.get_review_config_list():
|
|
|
@@ -241,11 +253,16 @@ class AIReviewEngine(BaseReviewer):
|
|
|
if result_index < len(results):
|
|
|
sensitive_result = self._process_review_result(results[result_index])
|
|
|
result_index += 1
|
|
|
+ if 'completeness_check' in self.task_info.get_review_config_list():
|
|
|
+ if result_index < len(results):
|
|
|
+ completeness_result = self._process_review_result(results[result_index])
|
|
|
+ result_index += 1
|
|
|
|
|
|
return {
|
|
|
'grammar_check': grammar_result,
|
|
|
'semantic_check': semantic_result,
|
|
|
'sensitive_check': sensitive_result,
|
|
|
+ 'completeness_check': completeness_result,
|
|
|
}
|
|
|
async def technical_compliance_check(self,trace_id_idx: str, unit_content: Dict[str, Any],
|
|
|
review_location_label: str,state:str,stage_name:str) -> Dict[str, Any]:
|
|
|
@@ -347,11 +364,6 @@ class AIReviewEngine(BaseReviewer):
|
|
|
logger.info("构建查询对")
|
|
|
query_pairs = query_rewrite_manager.query_extract(query_content)
|
|
|
bfp_result_lists =entity_enhance.entities_enhance_retrieval(query_pairs)
|
|
|
- # 使用bfp_result_list 获取 parent_id ,通过parent_id 获取父文档内容 utils_test\Milvus_Test\test_查询接口.py
|
|
|
- # llm 异步相关度分析 判断父文档是否与query_content 审查条文相关
|
|
|
- # 如果相关,则追加到 bfp_result,如果不相关则,则跳过
|
|
|
- # 如果len(bfp_result) > 0 则进行RAG增强,否则 则返回空
|
|
|
-
|
|
|
logger.info(f"bfp_result_lists{bfp_result_lists}")
|
|
|
# 检查是否有检索结果
|
|
|
if not bfp_result_lists:
|
|
|
@@ -363,6 +375,97 @@ class AIReviewEngine(BaseReviewer):
|
|
|
'text_content': '',
|
|
|
'metadata': {}
|
|
|
}
|
|
|
+ #todo
|
|
|
+ #异步调用查询。查出所有的
|
|
|
+
|
|
|
+ #todo
|
|
|
+ # 使用bfp_result_list 获取 parent_id ,通过parent_id 获取父文档内容 utils_test\Milvus_Test\test_查询接口.py
|
|
|
+ # llm 异步相关度分析 判断父文档是否与query_content 审查条文相关
|
|
|
+ # 如果相关,则追加到 bfp_result,如果不相关则,则跳过
|
|
|
+ PARENT_COLLECTION = "rag_parent_hybrid" # TODO: 改成你的父段 collection
|
|
|
+ PARENT_TEXT_FIELD = "text" # TODO: 改成你的父段字段名
|
|
|
+ PARENT_OUTPUT_FIELDS: Sequence[str] = ["parent_id", PARENT_TEXT_FIELD]
|
|
|
+
|
|
|
+ def run_async(coro):
|
|
|
+ """在同步函数中跑 async(兼容已有 event loop)"""
|
|
|
+ try:
|
|
|
+ asyncio.get_running_loop()
|
|
|
+ with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
|
+ return executor.submit(asyncio.run, coro).result()
|
|
|
+ except RuntimeError:
|
|
|
+ return asyncio.run(coro)
|
|
|
+
|
|
|
+ async def _async_condition_query_one(pid: str) -> Optional[Dict[str, Any]]:
|
|
|
+ """
|
|
|
+ condition_query 是同步:用线程池包成 async
|
|
|
+ 返回父段 row(或 None)
|
|
|
+ """
|
|
|
+ loop = asyncio.get_running_loop()
|
|
|
+
|
|
|
+
|
|
|
+ def _call():
|
|
|
+ rows = self.milvus.condition_query(
|
|
|
+ collection_name=PARENT_COLLECTION,
|
|
|
+ filter=f"parent_id == '{pid}'",
|
|
|
+ output_fields=PARENT_OUTPUT_FIELDS,
|
|
|
+ limit=1,
|
|
|
+ )
|
|
|
+ if not rows:
|
|
|
+ return None
|
|
|
+ row0 = rows[0] or {}
|
|
|
+ # 白名单投影:避免 pk/id 等多余字段
|
|
|
+ return {k: row0.get(k) for k in PARENT_OUTPUT_FIELDS if k in row0}
|
|
|
+
|
|
|
+ return await loop.run_in_executor(None, _call)
|
|
|
+
|
|
|
+ async def _enhance_all():
|
|
|
+ # 1) 收集 parent_id -> 指向哪些 result 需要被拼接
|
|
|
+ pid_to_results: Dict[str, List[Dict[str, Any]]] = {}
|
|
|
+
|
|
|
+ for result_list in bfp_result_lists:
|
|
|
+ for r in (result_list or []):
|
|
|
+ md = r.get("metadata") or {}
|
|
|
+ pid = md.get("parent_id")
|
|
|
+ if not pid:
|
|
|
+ continue
|
|
|
+ pid = str(pid)
|
|
|
+ pid_to_results.setdefault(pid, []).append(r)
|
|
|
+
|
|
|
+ if not pid_to_results:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 2) 逐个 parent_id 串行:查父段 -> LLM 判断 -> 拼接到对应 results
|
|
|
+ for pid, results in pid_to_results.items():
|
|
|
+ parent_doc = await _async_condition_query_one(pid)
|
|
|
+
|
|
|
+ if not parent_doc:
|
|
|
+ continue
|
|
|
+
|
|
|
+ parent_text = (parent_doc.get(PARENT_TEXT_FIELD) or "").strip()
|
|
|
+ if not parent_text:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # LLM 判断是否相关(你已经封装好了 is_relevant_async:模型直接输出 relevant true/false)
|
|
|
+ relevant = await is_relevant_async(query_content, parent_text)
|
|
|
+ # print("================\n")
|
|
|
+ # print(query_content)
|
|
|
+ # print("\n=====\n")
|
|
|
+ # print(parent_text)
|
|
|
+ # print("\n=====\n")
|
|
|
+ # print(relevant)
|
|
|
+ # print("\n================\n")
|
|
|
+ if not relevant:
|
|
|
+ continue
|
|
|
+
|
|
|
+ extra = (
|
|
|
+ f"{parent_text}\n"
|
|
|
+ )
|
|
|
+
|
|
|
+ # 3) 拼接到所有属于该 parent_id 的条目 text_content
|
|
|
+ for r in results:
|
|
|
+ r["text_content"] = (r.get("text_content") or "") + extra
|
|
|
+
|
|
|
+ run_async(_enhance_all())
|
|
|
logger.info(f"RAG检索返回了 {len(bfp_result_lists)} 个查询对结果")
|
|
|
# 获取第一个查询对的第一个结果
|
|
|
first_result_list = bfp_result_lists[0]
|
|
|
@@ -406,50 +509,224 @@ class AIReviewEngine(BaseReviewer):
|
|
|
reviewer_type = Stage.BASIC.value['reviewer_type']
|
|
|
prompt_name = Stage.BASIC.value['grammar']
|
|
|
trace_id = prompt_name+trace_id_idx
|
|
|
+
|
|
|
+ # 使用文本切割工具将长文本切分为150-250字的片段
|
|
|
+ text_segments = split_text(review_content, min_length=150, target_length=250)
|
|
|
+
|
|
|
+ # TODO: 这里可以对每个片段进行分批审查
|
|
|
+ # 目前先使用原始的完整内容进行审查
|
|
|
+ # 后续可以遍历 text_segments 进行分段审查并汇总结果
|
|
|
+
|
|
|
return await self.review("sensitive_word_check", trace_id, reviewer_type, prompt_name, review_content, review_references,
|
|
|
None, review_location_label, state, stage_name)
|
|
|
|
|
|
+
|
|
|
+
|
|
|
async def check_semantic_logic(self, trace_id_idx: str, review_content: str, review_references: str,
|
|
|
review_location_label: str, state: str, stage_name: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
语义逻辑检查
|
|
|
-
|
|
|
+
|
|
|
Args:
|
|
|
trace_id_idx: 追踪ID索引
|
|
|
review_content: 审查内容
|
|
|
review_references: 审查参考信息
|
|
|
review_location_label: 审查位置标签
|
|
|
+ state: 状态字典
|
|
|
+ stage_name: 阶段名称
|
|
|
|
|
|
Returns:
|
|
|
- Dict[str, Any]: 语义逻辑检查结果
|
|
|
+ ReviewResult: 语义逻辑检查结果
|
|
|
"""
|
|
|
+ from core.construction_review.component.reviewers.semantic_logic import semantic_logic_reviewer
|
|
|
+
|
|
|
+ # 构造trace_id
|
|
|
reviewer_type = Stage.BASIC.value['reviewer_type']
|
|
|
prompt_name = Stage.BASIC.value['semantic']
|
|
|
- trace_id = prompt_name+trace_id_idx
|
|
|
- return await self.review("semantic_logic_check", trace_id, reviewer_type, prompt_name, review_content, review_references,
|
|
|
- None, review_location_label, state, stage_name)
|
|
|
-
|
|
|
- async def check_completeness(self, trace_id_idx: str, review_content: str, review_references: str,
|
|
|
+ trace_id = prompt_name + trace_id_idx
|
|
|
+
|
|
|
+ # 调用语义逻辑审查模块
|
|
|
+ result = await semantic_logic_reviewer.check_semantic_logic(
|
|
|
+ trace_id=trace_id,
|
|
|
+ review_content=review_content,
|
|
|
+ review_references=review_references,
|
|
|
+ review_location_label=review_location_label,
|
|
|
+ state=state,
|
|
|
+ stage_name=stage_name
|
|
|
+ )
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+ ### TODO: 使用review模块,并传入指定模型名称lq-Qwen3-30B,修改底层review接口模块,层层传参直至底层。修改底层model_generate模块,支持初始化多种模型,支持根据传入模型名称参数调用指定模型。
|
|
|
+
|
|
|
+ pass
|
|
|
+ # reviewer_type = Stage.BASIC.value['reviewer_type']
|
|
|
+ # prompt_name = Stage.BASIC.value['semantic']
|
|
|
+ # trace_id = prompt_name+trace_id_idx
|
|
|
+ # return await self.review("semantic_logic_check", trace_id, reviewer_type, prompt_name, review_content, review_references,
|
|
|
+ # None, review_location_label, state, stage_name)
|
|
|
+
|
|
|
+ async def check_completeness(self, trace_id_idx: str, review_content: Dict[str, Any], review_references: str,
|
|
|
review_location_label: str, state: str, stage_name: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
完整性检查
|
|
|
|
|
|
Args:
|
|
|
trace_id_idx: 追踪ID索引
|
|
|
- review_content: 审查内容
|
|
|
+ review_content: 审查内容,单个文档块(chunk)的字典,格式如文档切分预处理结果.json中的chunks项
|
|
|
review_references: 审查参考信息
|
|
|
stage_name: 阶段名称
|
|
|
state: 状态字典
|
|
|
- current_progress: 当前进度
|
|
|
+ review_location_label: 审查位置标签
|
|
|
|
|
|
Returns:
|
|
|
Dict[str, Any]: 完整性检查结果
|
|
|
"""
|
|
|
- reviewer_type = Stage.BASIC.value['reviewer_type']
|
|
|
- prompt_name = Stage.BASIC.value['completeness']
|
|
|
- trace_id = prompt_name+trace_id_idx
|
|
|
- return await self.review("completeness_check", trace_id, reviewer_type, prompt_name, review_content, review_references,
|
|
|
- None, review_location_label, state, stage_name)
|
|
|
+ from pathlib import Path
|
|
|
+ import sys
|
|
|
+ import json
|
|
|
+
|
|
|
+ # 导入check_completeness组件
|
|
|
+ check_completeness_dir = Path(__file__).parent / "check_completeness"
|
|
|
+ if str(check_completeness_dir) not in sys.path:
|
|
|
+ sys.path.insert(0, str(check_completeness_dir))
|
|
|
+
|
|
|
+ from components.data_loader import CSVDataLoader
|
|
|
+ from components.prompt_builder import PromptBuilder
|
|
|
+ from components.llm_client import LLMClient
|
|
|
+ from components.result_processor import ResultProcessor
|
|
|
+
|
|
|
+ name = "completeness_check"
|
|
|
+ start_time = time.time()
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 验证review_content格式
|
|
|
+ if not isinstance(review_content, dict):
|
|
|
+ raise ValueError(f"review_content必须是字典类型,当前类型: {type(review_content)}")
|
|
|
+
|
|
|
+ # 获取文档块信息
|
|
|
+ doc = review_content
|
|
|
+ chunk_id = doc.get('chunk_id', 'unknown')
|
|
|
+ chapter_classification = doc.get('chapter_classification', '')
|
|
|
+ content = doc.get('content', '')
|
|
|
+
|
|
|
+ logger.info(f"开始执行 {name} 审查,trace_id: {trace_id_idx}, chunk_id: {chunk_id}, chapter_classification: {chapter_classification}")
|
|
|
+
|
|
|
+ # 检查必要字段
|
|
|
+ if not chapter_classification:
|
|
|
+ raise ValueError(f"文档块 {chunk_id} 缺少chapter_classification字段")
|
|
|
+
|
|
|
+ if not content:
|
|
|
+ raise ValueError(f"文档块 {chunk_id} 缺少content字段")
|
|
|
+
|
|
|
+ # 初始化组件路径
|
|
|
+ base_dir = check_completeness_dir
|
|
|
+ csv_path = base_dir / 'config' / 'Construction_Plan_Content_Specification.csv'
|
|
|
+ prompt_config_path = base_dir / 'config' / 'prompt.yaml'
|
|
|
+ api_config_path = base_dir / 'config' / 'llm_api.yaml'
|
|
|
+
|
|
|
+ # 加载规范文件
|
|
|
+ data_loader = CSVDataLoader()
|
|
|
+ specification = data_loader.load_specification(str(csv_path))
|
|
|
+
|
|
|
+ # 获取对应的规范要求
|
|
|
+ requirements = specification.get(chapter_classification, [])
|
|
|
+ if not requirements:
|
|
|
+ raise ValueError(f"未找到标签 {chapter_classification} 对应的规范要求")
|
|
|
+
|
|
|
+ logger.info(f"找到 {len(requirements)} 个规范要求项")
|
|
|
+
|
|
|
+ # 初始化组件
|
|
|
+ prompt_builder = PromptBuilder(str(prompt_config_path))
|
|
|
+ llm_client = LLMClient(str(api_config_path))
|
|
|
+ result_processor = ResultProcessor()
|
|
|
+
|
|
|
+ # 构建提示词
|
|
|
+ prompt = prompt_builder.build_prompt(content, requirements)
|
|
|
+
|
|
|
+ # 调用LLM
|
|
|
+ logger.info(f"调用LLM进行审查,使用模型: {llm_client.model_type}")
|
|
|
+ llm_response = await llm_client.call_llm(prompt)
|
|
|
+
|
|
|
+ # 处理结果
|
|
|
+ review_result = result_processor.parse_result(llm_response, requirements)
|
|
|
+
|
|
|
+ # 构建details字段,包含审查结果
|
|
|
+ details = {
|
|
|
+ 'chunk_id': chunk_id,
|
|
|
+ 'name': 'completeness_check',
|
|
|
+ 'chapter_classification': chapter_classification,
|
|
|
+ 'section_label': doc.get('section_label', ''),
|
|
|
+ 'requirements_count': len(requirements),
|
|
|
+ 'checked_items': len(review_result),
|
|
|
+ 'response': review_result[0] if review_result else {},
|
|
|
+ }
|
|
|
+
|
|
|
+ execution_time = time.time() - start_time
|
|
|
+
|
|
|
+ # 创建ReviewResult对象
|
|
|
+ from core.construction_review.component.reviewers.base_reviewer import ReviewResult
|
|
|
+ result = ReviewResult(
|
|
|
+ success=True,
|
|
|
+ details=details,
|
|
|
+ error_message=None,
|
|
|
+ execution_time=execution_time
|
|
|
+ )
|
|
|
+ with open('temp/completeness_check_result.json','w',encoding='utf-8') as f:
|
|
|
+ json.dump({"details":result.details,"success":result.success,"error_message":result.error_message,"execution_time":result.execution_time},f,ensure_ascii=False,indent=4)
|
|
|
+ # 将审查结果转换为字典格式,添加到issues中
|
|
|
+ review_result_data = {
|
|
|
+ 'name': name,
|
|
|
+ 'success': result.success,
|
|
|
+ 'details': result.details,
|
|
|
+ 'error_message': result.error_message,
|
|
|
+ 'execution_time': result.execution_time,
|
|
|
+ 'timestamp': time.time()
|
|
|
+ }
|
|
|
+
|
|
|
+ # 推送审查完成信息
|
|
|
+ state_dict = None
|
|
|
+ if state:
|
|
|
+ if isinstance(state, dict):
|
|
|
+ state_dict = state
|
|
|
+ elif isinstance(state, str):
|
|
|
+ try:
|
|
|
+ state_dict = json.loads(state)
|
|
|
+ except (json.JSONDecodeError, AttributeError):
|
|
|
+ pass
|
|
|
+
|
|
|
+ if state_dict and state_dict.get("progress_manager"):
|
|
|
+ asyncio.create_task(
|
|
|
+ state_dict["progress_manager"].update_stage_progress(
|
|
|
+ callback_task_id=state_dict.get("callback_task_id"),
|
|
|
+ stage_name=stage_name,
|
|
|
+ current=None,
|
|
|
+ status="processing",
|
|
|
+ message=f"{name} 要点审查完成 (chunk_id: {chunk_id}), 耗时: {result.execution_time:.2f}s",
|
|
|
+ issues=[review_result_data],
|
|
|
+ event_type="processing"
|
|
|
+ )
|
|
|
+ )
|
|
|
+ logger.info(f"{name} 审查完成 (chunk_id: {chunk_id}), 耗时: {result.execution_time:.2f}s")
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ execution_time = time.time() - start_time
|
|
|
+ error_msg = f"{name} 审查失败: {str(e)}"
|
|
|
+ logger.error(error_msg, exc_info=True)
|
|
|
+
|
|
|
+ from core.construction_review.component.reviewers.base_reviewer import ReviewResult
|
|
|
+ return ReviewResult(
|
|
|
+ success=False,
|
|
|
+ details={
|
|
|
+ 'chunk_id': review_content.get('chunk_id', 'unknown') if isinstance(review_content, dict) else 'unknown',
|
|
|
+ 'error': str(e)
|
|
|
+ },
|
|
|
+ error_message=error_msg,
|
|
|
+ execution_time=execution_time
|
|
|
+ )
|
|
|
|
|
|
async def check_sensitive(self, trace_id_idx: str, review_content: str, review_references: str,
|
|
|
review_location_label: str, state: str, stage_name: str) -> Dict[str, Any]:
|
|
|
@@ -1040,6 +1317,4 @@ class AIReviewEngine(BaseReviewer):
|
|
|
"execution_time": execution_time,
|
|
|
"error_message": error_msg
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
+ }
|