|
|
@@ -39,10 +39,10 @@ from typing import Dict, Union, List, Any, Optional, Tuple
|
|
|
from dataclasses import dataclass
|
|
|
from langchain_core.messages import AIMessage
|
|
|
|
|
|
+from core.construction_review.component.reviewers.utils import directory_extraction
|
|
|
from foundation.observability.logger.loggering import server_logger as logger
|
|
|
from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
|
|
|
from core.base.task_models import TaskFileInfo
|
|
|
-# from core.construction_review.component.reviewers.base_reviewer import ReviewResult
|
|
|
from ...component.reviewers.utils.inter_tool import InterTool
|
|
|
from ..types import AIReviewState
|
|
|
|
|
|
@@ -95,7 +95,7 @@ class AIReviewCoreFun:
|
|
|
from core.base.workflow_manager import WorkflowManager
|
|
|
self.workflow_manager = WorkflowManager()
|
|
|
|
|
|
- async def _process_basis_chapter(
|
|
|
+ async def _process_chapter_item(
|
|
|
self,
|
|
|
chapter_code: str,
|
|
|
chapter_content: List[Dict[str, Any]],
|
|
|
@@ -104,151 +104,9 @@ class AIReviewCoreFun:
|
|
|
all_issues: List[Dict],
|
|
|
completed_chunks: int,
|
|
|
total_chunks: int
|
|
|
- ) -> List[Dict]:
|
|
|
- """
|
|
|
- 处理编制依据章节(basis)
|
|
|
-
|
|
|
- 特点:
|
|
|
- - 收集所有chunk的content
|
|
|
- - 拼接后一次性审查
|
|
|
- - 整个章节推送一次进度
|
|
|
-
|
|
|
- Args:
|
|
|
- chapter_code: 章节代码(应该是"basis")
|
|
|
- chapter_content: 章节的所有chunk
|
|
|
- func_names: 需要执行的审查方法
|
|
|
- state: AI审查状态
|
|
|
- all_issues: 累积的issues列表
|
|
|
- completed_chunks: 已完成的块数
|
|
|
- total_chunks: 总块数
|
|
|
-
|
|
|
- Returns:
|
|
|
- List[Dict]: 更新后的issues列表(包含本次章节的结果)
|
|
|
- """
|
|
|
- logger.info(f"🔍 处理编制依据章节: {chapter_code}, 共 {len(chapter_content)} 个chunk")
|
|
|
-
|
|
|
- # 1. 收集所有chunk的content
|
|
|
- basis_contents = []
|
|
|
- for chunk in chapter_content:
|
|
|
- content = chunk.get("content", "")
|
|
|
- if content:
|
|
|
- basis_contents.append(content)
|
|
|
-
|
|
|
- # 2. 拼接content
|
|
|
- if not basis_contents:
|
|
|
- logger.warning(f"编制依据章节 {chapter_code} 没有内容,跳过")
|
|
|
- return all_issues # ✅ 返回原始的 all_issues,避免数据丢失
|
|
|
-
|
|
|
- combined_content = "\n\n".join(basis_contents)
|
|
|
- logger.info(f"编制依据内容拼接完成,总长度: {len(combined_content)} 字符")
|
|
|
-
|
|
|
- # 3. 提取编制依据条目(时效性和规范性审查都需要)
|
|
|
- basis_items = None
|
|
|
- needs_basis_extraction = any(
|
|
|
- fn in ["timeliness_basis_reviewer", "reference_basis_reviewer"]
|
|
|
- for fn in func_names
|
|
|
- )
|
|
|
-
|
|
|
- if needs_basis_extraction:
|
|
|
- try:
|
|
|
- from core.construction_review.component.reviewers.utils.directory_extraction import (
|
|
|
- extract_basis_with_langchain_qwen,
|
|
|
- )
|
|
|
- basis_items = await extract_basis_with_langchain_qwen(
|
|
|
- progress_manager=state.get("progress_manager"),
|
|
|
- callback_task_id=state.get("callback_task_id"),
|
|
|
- text=combined_content,
|
|
|
- )
|
|
|
- logger.info(f"编制依据AI提取完成,条数: {len(getattr(basis_items, 'items', []))}")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"编制依据AI提取失败: {e}", exc_info=True)
|
|
|
-
|
|
|
- # 4. 执行需要的审查方法
|
|
|
- chapter_issues = []
|
|
|
-
|
|
|
- for func_name in func_names:
|
|
|
- if func_name == "reference_basis_reviewer":
|
|
|
- # 编制依据审查
|
|
|
- logger.info(f"执行编制依据审查: {chapter_code}")
|
|
|
- try:
|
|
|
- review_data = {
|
|
|
- "content": combined_content,
|
|
|
- "basis_items": basis_items,
|
|
|
- "max_concurrent": self.max_concurrent
|
|
|
- }
|
|
|
- result = await self.ai_review_engine.reference_basis_reviewer(
|
|
|
- review_data=review_data,
|
|
|
- trace_id=f"{state['callback_task_id']}_{chapter_code}",
|
|
|
- state=state,
|
|
|
- stage_name=f"{chapter_code}_编制依据审查"
|
|
|
- )
|
|
|
-
|
|
|
- # 提取issues
|
|
|
- issues = self._extract_issues_from_result(result)
|
|
|
- chapter_issues.extend(issues)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"编制依据审查失败: {str(e)}", exc_info=True)
|
|
|
-
|
|
|
- elif func_name == "timeliness_basis_reviewer":
|
|
|
- # 时效性审查
|
|
|
- logger.info(f"执行时效性审查: {chapter_code}")
|
|
|
- try:
|
|
|
- review_data = {
|
|
|
- "content": combined_content,
|
|
|
- "basis_items": basis_items,
|
|
|
- "max_concurrent": self.max_concurrent
|
|
|
- }
|
|
|
- result = await self.ai_review_engine.timeliness_basis_reviewer(
|
|
|
- review_data=review_data,
|
|
|
- trace_id=f"{state['callback_task_id']}_{chapter_code}",
|
|
|
- state=state,
|
|
|
- stage_name=f"{chapter_code}_时效性审查"
|
|
|
- )
|
|
|
-
|
|
|
- # 提取issues
|
|
|
- issues = self._extract_issues_from_result(result)
|
|
|
- chapter_issues.extend(issues)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"时效性审查失败: {str(e)}", exc_info=True)
|
|
|
-
|
|
|
- else:
|
|
|
- # 其他方法:不支持对basis章节的操作
|
|
|
- logger.warning(f"方法 {func_name} 不支持编制依据章节模式,跳过")
|
|
|
-
|
|
|
- # 4. 累积到总issues
|
|
|
- all_issues.extend(chapter_issues)
|
|
|
-
|
|
|
- # 5. 推送章节级别的进度
|
|
|
- if state.get("progress_manager") and chapter_content:
|
|
|
- # 计算当前进度
|
|
|
- current = int(((completed_chunks + len(chapter_content)) / total_chunks) * 100)
|
|
|
-
|
|
|
- # 推送进度
|
|
|
- await state["progress_manager"].update_stage_progress(
|
|
|
- callback_task_id=state["callback_task_id"],
|
|
|
- stage_name=f"AI审查:{chapter_code}",
|
|
|
- current=current,
|
|
|
- status="completed",
|
|
|
- message=f"编制依据章节审查完成,共 {len(chapter_content)} 个块",
|
|
|
- event_type="processing"
|
|
|
- )
|
|
|
- return all_issues
|
|
|
- async def _process_normal_chapter(
|
|
|
- self,
|
|
|
- chapter_code: str,
|
|
|
- chapter_content: List[Dict[str, Any]],
|
|
|
- func_names: List[str],
|
|
|
- state: AIReviewState,
|
|
|
- all_issues: List[Dict]
|
|
|
) -> Tuple[int, List[Dict]]:
|
|
|
"""
|
|
|
- 处理普通章节(非basis)
|
|
|
-
|
|
|
- 特点:
|
|
|
- - 逐块审查
|
|
|
- - 每块完成后立即推送进度
|
|
|
+ 统一处理章节审查(所有章节都走逐块审查流程)
|
|
|
|
|
|
Args:
|
|
|
chapter_code: 章节代码
|
|
|
@@ -256,17 +114,19 @@ class AIReviewCoreFun:
|
|
|
func_names: 需要执行的审查方法
|
|
|
state: AI审查状态
|
|
|
all_issues: 累积的issues列表
|
|
|
+ completed_chunks: 已完成的块数
|
|
|
+ total_chunks: 总块数
|
|
|
|
|
|
Returns:
|
|
|
Tuple[int, List[Dict]]: (处理的块数量, 更新后的issues列表)
|
|
|
"""
|
|
|
- logger.info(f"📝 处理普通章节: {chapter_code}, 共 {len(chapter_content)} 个chunk")
|
|
|
- total_chunks = len(chapter_content)
|
|
|
+ logger.info(f"📝 处理章节: {chapter_code}, 共 {len(chapter_content)} 个chunk")
|
|
|
+ chapter_total_chunks = len(chapter_content)
|
|
|
|
|
|
- # 按块串行遍历
|
|
|
+ # 按块串行遍历(所有章节统一流程)
|
|
|
for chunk_index, chunk in enumerate(chapter_content):
|
|
|
chunk_label = chunk.get("section_label", f"chunk_{chunk_index}")
|
|
|
- logger.info(f" 📄 处理块 {chunk_index+1}/{total_chunks}: {chunk_label}")
|
|
|
+ logger.info(f" 📄 处理块 {chunk_index+1}/{chapter_total_chunks}: {chunk_label}")
|
|
|
|
|
|
# 终止信号检查(块级别)
|
|
|
if await self.workflow_manager.check_terminate_signal(state["callback_task_id"]):
|
|
|
@@ -277,7 +137,7 @@ class AIReviewCoreFun:
|
|
|
chunk_results = await self._execute_chunk_methods(
|
|
|
chapter_code, chunk, chunk_index, func_names, state
|
|
|
)
|
|
|
- logger.info(f"🔍 chunk_results: basic_compliance={chunk_results.get('basic_compliance', {})}, technical_compliance={chunk_results.get('technical_compliance', {})}")
|
|
|
+
|
|
|
# 格式化当前块的结果为issues
|
|
|
chunk_page = chunk.get('page', '')
|
|
|
review_location_label = f"第{chunk_page}页:{chunk_label}"
|
|
|
@@ -293,9 +153,9 @@ class AIReviewCoreFun:
|
|
|
)
|
|
|
|
|
|
# 推送当前块的进度
|
|
|
- current = int(((chunk_index + 1) / total_chunks) * 100)
|
|
|
+ current = int(((completed_chunks + chunk_index + 1) / total_chunks) * 100)
|
|
|
await self._send_unit_review_progress(
|
|
|
- state, chunk_index, total_chunks, chunk_label, issues, current
|
|
|
+ state, chunk_index, chapter_total_chunks, chunk_label, issues, current
|
|
|
)
|
|
|
|
|
|
# 累积issues
|
|
|
@@ -305,8 +165,8 @@ class AIReviewCoreFun:
|
|
|
else:
|
|
|
logger.warning(f"⚠️ 块{chunk_index}: issues为空,未添加到all_issues")
|
|
|
|
|
|
- logger.info(f"🔍 章节{chapter_code}完成: 总共处理{total_chunks}个块, all_issues最终数量={len(all_issues)}")
|
|
|
- return total_chunks,all_issues
|
|
|
+ logger.info(f"🔍 章节{chapter_code}完成: 总共处理{chapter_total_chunks}个块, all_issues最终数量={len(all_issues)}")
|
|
|
+ return chapter_total_chunks, all_issues
|
|
|
|
|
|
def _extract_issues_from_result(self, result: Any) -> List[Dict]:
|
|
|
"""
|
|
|
@@ -417,15 +277,25 @@ class AIReviewCoreFun:
|
|
|
"""
|
|
|
semaphore = asyncio.Semaphore(5) # 单个块内限制并发数为5
|
|
|
rag_enhanced_content = None # 初始化变量,避免作用域错误
|
|
|
- if 'check_parameter_compliance' in func_names or 'check_non_parameter_compliance' in func_names:
|
|
|
- logger.debug("开始执行RAG检索增强")
|
|
|
- rag_enhanced_content = self.ai_review_engine.rag_enhanced_check(chunk.get('content', ''))
|
|
|
+ basis_content = None # 初始化变量,避免作用域错误
|
|
|
|
|
|
+ if 'check_parameter_compliance' in func_names or 'check_non_parameter_compliance' in func_names:
|
|
|
+ logger.debug("开始执行RAG检索增强")
|
|
|
+ rag_enhanced_content = self.ai_review_engine.rag_enhanced_check(chunk.get('content', ''))
|
|
|
+
|
|
|
+ if 'reference_basis_reviewer' in func_names or 'timeliness_basis_reviewer' in func_names:
|
|
|
+ logger.debug("开始执行编制依据/时效性预处理")
|
|
|
+ # 预处理编制依据/时效性审查所需内容
|
|
|
+ basis_content = await directory_extraction.extract_basis_with_langchain_qwen(
|
|
|
+ state['progress_manager'],
|
|
|
+ state["callback_task_id"],
|
|
|
+ chunk.get('content', '')
|
|
|
+ )
|
|
|
async def execute_with_semaphore(func_name):
|
|
|
async with semaphore:
|
|
|
try:
|
|
|
# 创建并执行单个审查任务
|
|
|
- result = await self._execute_single_review(chapter_code, chunk, chunk_index, func_name, state,rag_enhanced_content)
|
|
|
+ result = await self._execute_single_review(chapter_code, chunk, chunk_index, func_name, state,rag_enhanced_content, basis_content)
|
|
|
return func_name, result
|
|
|
except Exception as e:
|
|
|
logger.error(f"审查任务执行失败 [{chapter_code}.chunk{chunk_index}.{func_name}]: {str(e)}")
|
|
|
@@ -470,7 +340,7 @@ class AIReviewCoreFun:
|
|
|
'rag_enhanced': merged_rag
|
|
|
}
|
|
|
|
|
|
- async def _execute_single_review(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_name: str, state: AIReviewState,rag_enhanced_content :dict = None) -> UnitReviewResult:
|
|
|
+ async def _execute_single_review(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_name: str, state: AIReviewState,rag_enhanced_content :dict = None, basis_content: dict = None) -> UnitReviewResult:
|
|
|
"""
|
|
|
执行单个块的审查任务
|
|
|
|
|
|
@@ -546,7 +416,8 @@ class AIReviewCoreFun:
|
|
|
)
|
|
|
|
|
|
elif func_name == "check_completeness":
|
|
|
- raw_result = await method(trace_id, chunk, state, stage_name)
|
|
|
+ # check_completeness 需要列表类型,将单个 chunk 包装成列表
|
|
|
+ raw_result = await method(trace_id, [chunk], state, stage_name)
|
|
|
# 基础审查方法,放入 basic_compliance
|
|
|
return UnitReviewResult(
|
|
|
unit_index=chunk_index,
|
|
|
@@ -587,16 +458,62 @@ class AIReviewCoreFun:
|
|
|
overall_risk=self._calculate_single_result_risk(raw_result)
|
|
|
)
|
|
|
|
|
|
- # ⚠️ 以下三个特殊方法不在块级别处理,由主流程统一管理
|
|
|
- elif func_name in ["outline_check", "timeliness_basis_reviewer", "reference_basis_reviewer"]:
|
|
|
- logger.warning(f"方法 {func_name} 不应在块级别调用,已在主流程中处理")
|
|
|
+ # # outline_check 仍在章节级别处理
|
|
|
+ # elif func_name == "outline_check":
|
|
|
+ # logger.warning(f"方法 {func_name} 不应在块级别调用,已在主流程中处理")
|
|
|
+ # return UnitReviewResult(
|
|
|
+ # unit_index=chunk_index,
|
|
|
+ # unit_content=chunk,
|
|
|
+ # basic_compliance={},
|
|
|
+ # technical_compliance={},
|
|
|
+ # rag_enhanced={},
|
|
|
+ # overall_risk="low"
|
|
|
+ # )
|
|
|
+
|
|
|
+ # reference_basis_reviewer:编制依据审查(逐块处理)
|
|
|
+ elif func_name == "reference_basis_reviewer":
|
|
|
+ review_data = {
|
|
|
+ "content": review_content, # 原始文本内容
|
|
|
+ "basis_items": basis_content, # 提取的 BasisItems 对象
|
|
|
+ "max_concurrent": self.max_concurrent
|
|
|
+ }
|
|
|
+ raw_result = await method(
|
|
|
+ review_data=review_data,
|
|
|
+ trace_id=trace_id,
|
|
|
+ state=state,
|
|
|
+ stage_name=stage_name
|
|
|
+ )
|
|
|
+ # 基础审查方法,放入 basic_compliance
|
|
|
return UnitReviewResult(
|
|
|
unit_index=chunk_index,
|
|
|
unit_content=chunk,
|
|
|
- basic_compliance={},
|
|
|
+ basic_compliance={func_name: raw_result},
|
|
|
+ technical_compliance={},
|
|
|
+ rag_enhanced={},
|
|
|
+ overall_risk=self._calculate_single_result_risk(raw_result)
|
|
|
+ )
|
|
|
+
|
|
|
+ # timeliness_basis_reviewer:时效性审查(逐块处理)
|
|
|
+ elif func_name == "timeliness_basis_reviewer":
|
|
|
+ review_data = {
|
|
|
+ "content": review_content, # 原始文本内容
|
|
|
+ "basis_items": basis_content, # 提取的 BasisItems 对象
|
|
|
+ "max_concurrent": self.max_concurrent
|
|
|
+ }
|
|
|
+ raw_result = await method(
|
|
|
+ review_data=review_data,
|
|
|
+ trace_id=trace_id,
|
|
|
+ state=state,
|
|
|
+ stage_name=stage_name
|
|
|
+ )
|
|
|
+ # 基础审查方法,放入 basic_compliance
|
|
|
+ return UnitReviewResult(
|
|
|
+ unit_index=chunk_index,
|
|
|
+ unit_content=chunk,
|
|
|
+ basic_compliance={func_name: raw_result},
|
|
|
technical_compliance={},
|
|
|
rag_enhanced={},
|
|
|
- overall_risk="low"
|
|
|
+ overall_risk=self._calculate_single_result_risk(raw_result)
|
|
|
)
|
|
|
|
|
|
else:
|