|
|
@@ -0,0 +1,886 @@
|
|
|
+#!/usr/bin/env python
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+统一时效性审查模块
|
|
|
+
|
|
|
+整合原 standard_timeliness_reviewer、timeliness_basis_reviewer、timeliness_content_reviewer 的功能,
|
|
|
+提供统一的时效性审查入口。
|
|
|
+
|
|
|
+主要组件:
|
|
|
+1. StandardExtractor: 规范引用提取器
|
|
|
+2. StandardTimelinessReviewer: 核心时效性审查引擎
|
|
|
+3. TimelinessReviewService: 统一审查服务(支持basis和content两种来源)
|
|
|
+
|
|
|
+使用示例:
|
|
|
+ # 方法1: 使用便捷函数
|
|
|
+ from foundation.infrastructure.mysql.async_mysql_conn_pool import AsyncMySQLPool
|
|
|
+
|
|
|
+ db_pool = AsyncMySQLPool()
|
|
|
+ await db_pool.initialize()
|
|
|
+
|
|
|
+ results = await review_standards_timeliness(
|
|
|
+ standards_list=[
|
|
|
+ {"standard_name": "铁路桥涵设计规范", "standard_number": "TB 10002-2017"},
|
|
|
+ ],
|
|
|
+ db_pool=db_pool
|
|
|
+ )
|
|
|
+
|
|
|
+ # 方法2: 使用异步上下文管理器
|
|
|
+ async with TimelinessReviewService(max_concurrent=10, db_pool=db_pool) as service:
|
|
|
+ # 从编制依据审查
|
|
|
+ results = await service.review_all(basis_items)
|
|
|
+ # 或从正文内容审查
|
|
|
+ results = await service.review_from_content(content, chapter_code)
|
|
|
+"""
|
|
|
+import asyncio
|
|
|
+import json
|
|
|
+import os
|
|
|
+import re
|
|
|
+import threading
|
|
|
+import time
|
|
|
+from dataclasses import dataclass, asdict, field
|
|
|
+from datetime import datetime
|
|
|
+from typing import Any, Dict, List, Optional, Tuple
|
|
|
+from functools import partial
|
|
|
+
|
|
|
+from foundation.observability.logger.loggering import review_logger as logger
|
|
|
+from foundation.ai.agent.generate.model_generate import generate_model_client
|
|
|
+from core.construction_review.component.reviewers.utils.inter_tool import InterTool
|
|
|
+from core.construction_review.component.reviewers.utils.directory_extraction import BasisItems, BasisItem
|
|
|
+from core.construction_review.component.standard_matching import (
|
|
|
+ StandardMatchingService,
|
|
|
+ StandardMatchResult,
|
|
|
+ MatchResultCode,
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# 数据类定义
|
|
|
+# ============================================================================
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class StandardReference:
|
|
|
+ """规范引用数据类"""
|
|
|
+ original_text: str # 原始文本,如"《公路工程技术标准》(JTG B01-2011)"
|
|
|
+ name: str # 规范名称
|
|
|
+ number: str # 规范编号
|
|
|
+ context: str # 上下文内容
|
|
|
+ location_info: Dict[str, Any] = field(default_factory=dict) # 位置信息
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class TimelinessReviewResult:
|
|
|
+ """时效性审查结果"""
|
|
|
+ seq_no: int # 序号
|
|
|
+ standard_name: str # 原始标准名称
|
|
|
+ standard_number: str # 原始标准号
|
|
|
+ process_result: str # 处理结果
|
|
|
+ status_code: str # 状态码
|
|
|
+ has_issue: bool # 是否有问题
|
|
|
+ issue_type: Optional[str] = None # 问题类型
|
|
|
+ suggestion: Optional[str] = None # 建议
|
|
|
+ reason: Optional[str] = None # 原因
|
|
|
+ risk_level: str = "low" # 风险等级
|
|
|
+ replacement_name: Optional[str] = None # 替代标准名称
|
|
|
+ replacement_number: Optional[str] = None # 替代标准号
|
|
|
+ mismatch_analysis: Optional[str] = None # MISMATCH 具体差异分析
|
|
|
+ final_result: Optional[str] = None # 最终结果描述
|
|
|
+
|
|
|
+ def to_dict(self) -> Dict[str, Any]:
|
|
|
+ return asdict(self)
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# 规范提取器
|
|
|
+# ============================================================================
|
|
|
+
|
|
|
+class StandardExtractor:
|
|
|
+ """规范引用提取器 - 统一用于从文本中提取规范引用"""
|
|
|
+
|
|
|
+ # 规范编号正则模式
|
|
|
+ STANDARD_NUMBER_PATTERNS = [
|
|
|
+ r'GB(?:/T)?\s*\d{1,5}(?:\.\d+)?\s*-\s*\d{4}', # 国标
|
|
|
+ r'[A-Z]{2,3}(?:/T)?\s*[A-Z]?\s*\d{1,5}(?:\.\d+)?\s*-\s*\d{4}', # 行标
|
|
|
+ r'DB\d{2}(?:/T)?\s*\d{1,5}\s*-\s*\d{4}', # 地标
|
|
|
+ r'T/\w+\s*\d{1,5}\s*-\s*\d{4}', # 团标
|
|
|
+ ]
|
|
|
+
|
|
|
+ STANDARD_FULL_PATTERN = re.compile(
|
|
|
+ r'《([^《》]+)》\s*[((]([^))]+)[))]',
|
|
|
+ re.MULTILINE
|
|
|
+ )
|
|
|
+
|
|
|
+ STANDARD_NUMBER_ONLY_PATTERN = re.compile(
|
|
|
+ r'(' + '|'.join(STANDARD_NUMBER_PATTERNS) + r')',
|
|
|
+ re.MULTILINE | re.IGNORECASE
|
|
|
+ )
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.extracted_cache: Dict[str, List[StandardReference]] = {}
|
|
|
+
|
|
|
+ def extract_from_content(self, content: str, location_info: Optional[Dict] = None) -> List[StandardReference]:
|
|
|
+ """从内容文本中提取规范引用"""
|
|
|
+ if not content:
|
|
|
+ return []
|
|
|
+
|
|
|
+ cache_key = hash(content)
|
|
|
+ if cache_key in self.extracted_cache:
|
|
|
+ return self.extracted_cache[cache_key]
|
|
|
+
|
|
|
+ references = []
|
|
|
+
|
|
|
+ # 1. 提取完整格式:《名称》(编号)
|
|
|
+ full_matches = self.STANDARD_FULL_PATTERN.findall(content)
|
|
|
+ for name, number in full_matches:
|
|
|
+ if self._is_valid_standard_number(number):
|
|
|
+ original = f"《{name}》({number})"
|
|
|
+ context = self._extract_context(content, original)
|
|
|
+ ref = StandardReference(
|
|
|
+ original_text=original,
|
|
|
+ name=name.strip(),
|
|
|
+ number=number.strip(),
|
|
|
+ context=context,
|
|
|
+ location_info=location_info or {}
|
|
|
+ )
|
|
|
+ references.append(ref)
|
|
|
+
|
|
|
+ # 2. 提取孤立的规范编号
|
|
|
+ number_matches = self.STANDARD_NUMBER_ONLY_PATTERN.findall(content)
|
|
|
+ for match in number_matches:
|
|
|
+ number = match if isinstance(match, str) else match[0]
|
|
|
+ if not any(number in ref.number for ref in references):
|
|
|
+ name = self._infer_name_from_context(content, number)
|
|
|
+ original = f"《{name}》({number})" if name else number
|
|
|
+ ref = StandardReference(
|
|
|
+ original_text=original,
|
|
|
+ name=name or "",
|
|
|
+ number=number.strip(),
|
|
|
+ context=self._extract_context(content, number),
|
|
|
+ location_info=location_info or {}
|
|
|
+ )
|
|
|
+ references.append(ref)
|
|
|
+
|
|
|
+ # 去重
|
|
|
+ seen = set()
|
|
|
+ unique_refs = []
|
|
|
+ for ref in references:
|
|
|
+ if ref.original_text not in seen:
|
|
|
+ seen.add(ref.original_text)
|
|
|
+ unique_refs.append(ref)
|
|
|
+
|
|
|
+ self.extracted_cache[cache_key] = unique_refs
|
|
|
+ return unique_refs
|
|
|
+
|
|
|
+ def _is_valid_standard_number(self, number: str) -> bool:
|
|
|
+ """验证是否为有效的规范编号"""
|
|
|
+ number = number.strip().upper()
|
|
|
+ for pattern in self.STANDARD_NUMBER_PATTERNS:
|
|
|
+ if re.match(pattern, number, re.IGNORECASE):
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _extract_context(self, content: str, target: str, window: int = 50) -> str:
|
|
|
+ """提取目标文本的上下文"""
|
|
|
+ idx = content.find(target)
|
|
|
+ if idx == -1:
|
|
|
+ return ""
|
|
|
+ start = max(0, idx - window)
|
|
|
+ end = min(len(content), idx + len(target) + window)
|
|
|
+ return content[start:end].strip()
|
|
|
+
|
|
|
+ def _infer_name_from_context(self, content: str, number: str) -> str:
|
|
|
+ """从上下文推断规范名称"""
|
|
|
+ pattern = re.compile(r'《([^《》]{3,50})》[^《》]{0,30}' + re.escape(number))
|
|
|
+ match = pattern.search(content)
|
|
|
+ if match:
|
|
|
+ return match.group(1)
|
|
|
+ return ""
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# 核心时效性审查引擎
|
|
|
+# ============================================================================
|
|
|
+
|
|
|
+class StandardTimelinessReviewer:
|
|
|
+ """标准时效性审查器 - 基于 StandardMatchingService 的内存匹配"""
|
|
|
+
|
|
|
+ def __init__(self, db_pool=None, standard_service: Optional[StandardMatchingService] = None,
|
|
|
+ callback_task_id: Optional[str] = None):
|
|
|
+ if standard_service is None and not db_pool:
|
|
|
+ raise RuntimeError(
|
|
|
+ "StandardTimelinessReviewer 初始化失败: 必须提供数据库连接池(db_pool)或已初始化的StandardMatchingService实例"
|
|
|
+ )
|
|
|
+ self.db_pool = db_pool
|
|
|
+ self._service = standard_service
|
|
|
+ self._own_service = False
|
|
|
+ self.callback_task_id = callback_task_id
|
|
|
+ self._log_lock = threading.Lock()
|
|
|
+ self._mismatch_analysis_semaphore = asyncio.Semaphore(3)
|
|
|
+
|
|
|
+ async def __aenter__(self):
|
|
|
+ if self._service is None:
|
|
|
+ self._service = StandardMatchingService(self.db_pool, own_db_pool=False)
|
|
|
+ await self._service.initialize()
|
|
|
+ self._own_service = True
|
|
|
+ return self
|
|
|
+
|
|
|
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
|
+ if self._own_service and self._service:
|
|
|
+ await self._service.close()
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _log_determination_results(self, review_results: List[TimelinessReviewResult]) -> None:
|
|
|
+ """将时效性判定结果持久化到JSON文件"""
|
|
|
+ if not self.callback_task_id:
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ with self._log_lock:
|
|
|
+ log_dir = os.path.join("temp", "construction_review", "timeliness_result")
|
|
|
+ os.makedirs(log_dir, exist_ok=True)
|
|
|
+ log_path = os.path.join(log_dir, f"{self.callback_task_id}.json")
|
|
|
+
|
|
|
+ records = []
|
|
|
+ if os.path.exists(log_path):
|
|
|
+ try:
|
|
|
+ with open(log_path, "r", encoding="utf-8") as f:
|
|
|
+ records = json.load(f)
|
|
|
+ if not isinstance(records, list):
|
|
|
+ records = []
|
|
|
+ except Exception:
|
|
|
+ records = []
|
|
|
+
|
|
|
+ for result in review_results:
|
|
|
+ records.append({
|
|
|
+ "timestamp": datetime.now().isoformat(),
|
|
|
+ "callback_task_id": self.callback_task_id,
|
|
|
+ **result.to_dict()
|
|
|
+ })
|
|
|
+
|
|
|
+ with open(log_path, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(records, f, ensure_ascii=False, indent=2)
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"记录时效性判定结果失败: {e}")
|
|
|
+
|
|
|
+ def review_standards(self, standards: List[Dict[str, str]]) -> List[TimelinessReviewResult]:
|
|
|
+ """审查标准列表的时效性"""
|
|
|
+ if not self._service:
|
|
|
+ raise RuntimeError("服务未初始化,请使用异步上下文管理器")
|
|
|
+
|
|
|
+ match_results = self._service.check_standards(standards)
|
|
|
+
|
|
|
+ review_results = []
|
|
|
+ for match_result in match_results:
|
|
|
+ if match_result is not None:
|
|
|
+ logger.info(
|
|
|
+ "[时效性审查变量] "
|
|
|
+ f"提取standard_name={match_result.raw_name}, "
|
|
|
+ f"提取standard_number={match_result.raw_number}, "
|
|
|
+ f"数据库standard_name={match_result.matched_name or ''}, "
|
|
|
+ f"数据库standard_number={match_result.matched_number or ''}"
|
|
|
+ )
|
|
|
+ review_result = self._convert_match_to_review_result(match_result)
|
|
|
+ review_results.append(review_result)
|
|
|
+
|
|
|
+ self._log_determination_results(review_results)
|
|
|
+ return review_results
|
|
|
+
|
|
|
+ def review_single(self, standard_name: str, standard_number: str, seq_no: int = 1) -> Optional[TimelinessReviewResult]:
|
|
|
+ """审查单个标准的时效性"""
|
|
|
+ if not self._service:
|
|
|
+ raise RuntimeError("服务未初始化")
|
|
|
+
|
|
|
+ match_result = self._service.check_single(seq_no, standard_name, standard_number)
|
|
|
+ if match_result is None:
|
|
|
+ return None
|
|
|
+
|
|
|
+ review_result = self._convert_match_to_review_result(match_result)
|
|
|
+ self._log_determination_results([review_result])
|
|
|
+ return review_result
|
|
|
+
|
|
|
+ def _convert_match_to_review_result(self, match_result: StandardMatchResult) -> TimelinessReviewResult:
|
|
|
+ """将匹配结果转换为时效性审查结果"""
|
|
|
+ status_code = match_result.status_code
|
|
|
+
|
|
|
+ if status_code == MatchResultCode.OK.value:
|
|
|
+ return TimelinessReviewResult(
|
|
|
+ seq_no=match_result.seq_no,
|
|
|
+ standard_name=match_result.raw_name,
|
|
|
+ standard_number=match_result.raw_number,
|
|
|
+ process_result=match_result.process_result,
|
|
|
+ status_code=status_code,
|
|
|
+ has_issue=False,
|
|
|
+ risk_level="low",
|
|
|
+ final_result=match_result.final_result
|
|
|
+ )
|
|
|
+ elif status_code == MatchResultCode.SUBSTITUTED.value:
|
|
|
+ return TimelinessReviewResult(
|
|
|
+ seq_no=match_result.seq_no,
|
|
|
+ standard_name=match_result.raw_name,
|
|
|
+ standard_number=match_result.raw_number,
|
|
|
+ process_result=match_result.process_result,
|
|
|
+ status_code=status_code,
|
|
|
+ has_issue=True,
|
|
|
+ issue_type="标准被替代",
|
|
|
+ suggestion=f"请更新为现行标准: {match_result.substitute_name}{match_result.substitute_number}",
|
|
|
+ reason=match_result.final_result,
|
|
|
+ risk_level="high",
|
|
|
+ replacement_name=match_result.substitute_name,
|
|
|
+ replacement_number=match_result.substitute_number,
|
|
|
+ final_result=match_result.final_result
|
|
|
+ )
|
|
|
+ elif status_code == MatchResultCode.ABOLISHED.value:
|
|
|
+ return TimelinessReviewResult(
|
|
|
+ seq_no=match_result.seq_no,
|
|
|
+ standard_name=match_result.raw_name,
|
|
|
+ standard_number=match_result.raw_number,
|
|
|
+ process_result=match_result.process_result,
|
|
|
+ status_code=status_code,
|
|
|
+ has_issue=True,
|
|
|
+ issue_type="标准已废止",
|
|
|
+ suggestion="该标准已废止且无现行替代,请检查是否仍需引用或寻找其他替代方案",
|
|
|
+ reason=match_result.final_result,
|
|
|
+ risk_level="high",
|
|
|
+ final_result=match_result.final_result
|
|
|
+ )
|
|
|
+ elif status_code == MatchResultCode.MISMATCH.value:
|
|
|
+ return TimelinessReviewResult(
|
|
|
+ seq_no=match_result.seq_no,
|
|
|
+ standard_name=match_result.raw_name,
|
|
|
+ standard_number=match_result.raw_number,
|
|
|
+ process_result=match_result.process_result,
|
|
|
+ status_code=status_code,
|
|
|
+ has_issue=True,
|
|
|
+ issue_type="标准信息不匹配",
|
|
|
+ suggestion=f"名称与标准号不匹配,实际应为: {match_result.substitute_name}{match_result.substitute_number}",
|
|
|
+ reason=match_result.final_result,
|
|
|
+ risk_level="high",
|
|
|
+ replacement_name=match_result.substitute_name,
|
|
|
+ replacement_number=match_result.substitute_number,
|
|
|
+ final_result=match_result.final_result
|
|
|
+ )
|
|
|
+ elif status_code == MatchResultCode.NOT_FOUND.value:
|
|
|
+ return TimelinessReviewResult(
|
|
|
+ seq_no=match_result.seq_no,
|
|
|
+ standard_name=match_result.raw_name,
|
|
|
+ standard_number=match_result.raw_number,
|
|
|
+ process_result=match_result.process_result,
|
|
|
+ status_code=status_code,
|
|
|
+ has_issue=False,
|
|
|
+ risk_level="low",
|
|
|
+ final_result=match_result.final_result
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ logger.warning(f"未知的匹配状态码: {status_code}")
|
|
|
+ return TimelinessReviewResult(
|
|
|
+ seq_no=match_result.seq_no,
|
|
|
+ standard_name=match_result.raw_name,
|
|
|
+ standard_number=match_result.raw_number,
|
|
|
+ process_result="未知",
|
|
|
+ status_code=status_code,
|
|
|
+ has_issue=True,
|
|
|
+ issue_type="未知状态",
|
|
|
+ reason=match_result.final_result,
|
|
|
+ risk_level="medium",
|
|
|
+ final_result=match_result.final_result
|
|
|
+ )
|
|
|
+
|
|
|
+ async def enrich_mismatch_details(self, review_results: List[TimelinessReviewResult]) -> List[TimelinessReviewResult]:
|
|
|
+ """使用 LLM 补充 MISMATCH 的具体差异说明"""
|
|
|
+ mismatch_results = [
|
|
|
+ result for result in review_results
|
|
|
+ if result.status_code == MatchResultCode.MISMATCH.value
|
|
|
+ and result.has_issue
|
|
|
+ and result.replacement_name
|
|
|
+ and result.replacement_number
|
|
|
+ ]
|
|
|
+ if not mismatch_results:
|
|
|
+ return review_results
|
|
|
+
|
|
|
+ async def _enrich_single(result: TimelinessReviewResult) -> None:
|
|
|
+ async with self._mismatch_analysis_semaphore:
|
|
|
+ analysis = await self._generate_mismatch_analysis(result)
|
|
|
+ if analysis:
|
|
|
+ result.mismatch_analysis = analysis
|
|
|
+ if analysis not in (result.suggestion or ""):
|
|
|
+ result.suggestion = f"{result.suggestion}\n{analysis}"
|
|
|
+
|
|
|
+ tasks = [_enrich_single(result) for result in mismatch_results]
|
|
|
+ await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
+
|
|
|
+ return review_results
|
|
|
+
|
|
|
+ async def _generate_mismatch_analysis(self, result: TimelinessReviewResult) -> Optional[str]:
|
|
|
+ """调用 LLM 生成 MISMATCH 改进建议"""
|
|
|
+ input_name = self._strip_wrapper(result.standard_name, '《》')
|
|
|
+ input_number = self._strip_wrapper(result.standard_number, '()()')
|
|
|
+ actual_name = self._strip_wrapper(result.replacement_name, '《》')
|
|
|
+ actual_number = self._strip_wrapper(result.replacement_number, '()()')
|
|
|
+
|
|
|
+ system_prompt = (
|
|
|
+ "你是规范引用差异分析助手。"
|
|
|
+ "你的任务是比较用户引用的标准信息与标准库中的实际标准信息,"
|
|
|
+ "输出必须是可直接展示给用户的改进建议,严格使用指定句式。"
|
|
|
+ )
|
|
|
+ user_prompt = f"""请根据以下两组标准信息,输出一条可直接展示给用户的"改进建议"。
|
|
|
+
|
|
|
+【用户引用】
|
|
|
+- 标准名称:{input_name}
|
|
|
+- 标准编号:{input_number}
|
|
|
+
|
|
|
+【标准库实际记录】
|
|
|
+- 标准名称:{actual_name}
|
|
|
+- 标准编号:{actual_number}
|
|
|
+
|
|
|
+【要求】
|
|
|
+1. 输出必须严格为 JSON 对象。
|
|
|
+2. JSON 中只保留一个字段:`improvement_suggestion`。
|
|
|
+3. `improvement_suggestion` 必须严格以 `改进建议:\\n` 开头。
|
|
|
+4. 判断应该是"修改""删除"还是"补充",并指出具体片段。
|
|
|
+
|
|
|
+输出格式:
|
|
|
+{{"improvement_suggestion": "改进建议:\\n..."}}
|
|
|
+/no_think
|
|
|
+""".strip()
|
|
|
+
|
|
|
+ try:
|
|
|
+ raw = await generate_model_client.get_model_generate_invoke(
|
|
|
+ trace_id=f"timeliness_mismatch_{self.callback_task_id or 'default'}_{result.seq_no}",
|
|
|
+ system_prompt=system_prompt,
|
|
|
+ user_prompt=user_prompt,
|
|
|
+ model_name="shutian_qwen3_5_122b",
|
|
|
+ enable_thinking=False
|
|
|
+ )
|
|
|
+ payload = self._extract_first_json(raw)
|
|
|
+ suggestion_text = str(payload.get("improvement_suggestion", "")).strip()
|
|
|
+ if suggestion_text:
|
|
|
+ return suggestion_text
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"MISMATCH LLM 分析失败: {e}")
|
|
|
+
|
|
|
+ return self._build_fallback_mismatch_analysis(result)
|
|
|
+
|
|
|
+ def _extract_first_json(self, text: str) -> Dict[str, Any]:
|
|
|
+ """从文本中提取第一个JSON对象"""
|
|
|
+ if not text:
|
|
|
+ raise ValueError("模型返回为空")
|
|
|
+ start = text.find("{")
|
|
|
+ if start == -1:
|
|
|
+ raise ValueError("未找到JSON起始符")
|
|
|
+
|
|
|
+ depth = 0
|
|
|
+ for idx in range(start, len(text)):
|
|
|
+ char = text[idx]
|
|
|
+ if char == "{":
|
|
|
+ depth += 1
|
|
|
+ elif char == "}":
|
|
|
+ depth -= 1
|
|
|
+ if depth == 0:
|
|
|
+ return json.loads(text[start:idx + 1])
|
|
|
+ raise ValueError("JSON对象未闭合")
|
|
|
+
|
|
|
+ def _build_fallback_mismatch_analysis(self, result: TimelinessReviewResult) -> str:
|
|
|
+ """LLM 不可用时的兜底建议"""
|
|
|
+ input_name = self._strip_wrapper(result.standard_name, '《》')
|
|
|
+ input_number = self._strip_wrapper(result.standard_number, '()()')
|
|
|
+ actual_name = self._strip_wrapper(result.replacement_name, '《》')
|
|
|
+ actual_number = self._strip_wrapper(result.replacement_number, '()()')
|
|
|
+
|
|
|
+ if input_number == actual_number and input_name != actual_name:
|
|
|
+ return f"改进建议:\n标准号({actual_number})对应的规范名称应为《{actual_name}》,请核对修改。"
|
|
|
+ if input_name == actual_name and input_number != actual_number:
|
|
|
+ return f"改进建议:\n《{actual_name}》对应的标准号应为({actual_number}),请核对修改。"
|
|
|
+ return f"改进建议:\n请将当前标准信息核对并修改为《{actual_name}》({actual_number})。"
|
|
|
+
|
|
|
+ def _strip_wrapper(self, text: Optional[str], chars: str) -> str:
|
|
|
+ """去除文本外围符号"""
|
|
|
+ if not text:
|
|
|
+ return ""
|
|
|
+ result = str(text).strip()
|
|
|
+ for char in chars:
|
|
|
+ result = result.replace(char, "")
|
|
|
+ return result.strip()
|
|
|
+
|
|
|
+ def convert_to_standardized_format(
|
|
|
+ self,
|
|
|
+ review_results: List[TimelinessReviewResult],
|
|
|
+ check_item: str = "timeliness_check",
|
|
|
+ chapter_code: str = "basis",
|
|
|
+ check_item_code: str = "timeliness_check"
|
|
|
+ ) -> List[Dict[str, Any]]:
|
|
|
+ """将审查结果转换为标准格式"""
|
|
|
+ standardized_results = []
|
|
|
+
|
|
|
+ for result in review_results:
|
|
|
+ # 过滤无问题的结果
|
|
|
+ if result.status_code == MatchResultCode.NOT_FOUND.value or not result.has_issue:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 兜底:替代标准与原始标准实质相同则跳过
|
|
|
+ if result.replacement_name and result.replacement_number:
|
|
|
+ original = self._normalize(f"{result.standard_name}{result.standard_number}")
|
|
|
+ replacement = self._normalize(f"{result.replacement_name}{result.replacement_number}")
|
|
|
+ if original == replacement:
|
|
|
+ logger.info(f"[兜底过滤] 替代标准与原始标准实质相同,跳过")
|
|
|
+ continue
|
|
|
+
|
|
|
+ standardized_results.append({
|
|
|
+ "check_item": check_item,
|
|
|
+ "chapter_code": chapter_code,
|
|
|
+ "check_item_code": check_item_code,
|
|
|
+ "check_result": {
|
|
|
+ "location": f"《{result.standard_name}》({result.standard_number})",
|
|
|
+ "description": result.reason or result.final_result,
|
|
|
+ "suggestion": result.suggestion,
|
|
|
+ "issue_type": result.issue_type,
|
|
|
+ "standard_name": result.standard_name,
|
|
|
+ "standard_number": result.standard_number,
|
|
|
+ "replacement_name": result.replacement_name,
|
|
|
+ "replacement_number": result.replacement_number,
|
|
|
+ "mismatch_analysis": result.mismatch_analysis,
|
|
|
+ },
|
|
|
+ "exist_issue": True,
|
|
|
+ "risk_info": {"risk_level": result.risk_level}
|
|
|
+ })
|
|
|
+
|
|
|
+ return standardized_results
|
|
|
+
|
|
|
+ def _normalize(self, text: str) -> str:
|
|
|
+ """规范化文本用于比较"""
|
|
|
+ if not text:
|
|
|
+ return ""
|
|
|
+ text = re.sub(r'<[^>]+>', '', text)
|
|
|
+ text = re.sub(r'\s+', '', text)
|
|
|
+ text = text.replace('《', '').replace('》', '').replace('(', '').replace(')', '').replace('(', '').replace(')', '')
|
|
|
+ # 从配置读取符号
|
|
|
+ default_symbols = '),.],},【,】,〔,〕,-,—,―,‐,‑,‒,–,−'
|
|
|
+ try:
|
|
|
+ from foundation.infrastructure.config.config import config_handler
|
|
|
+ symbols_str = config_handler.get('timeliness_review', 'REMOVE_SYMBOLS', default_symbols)
|
|
|
+ except Exception:
|
|
|
+ symbols_str = default_symbols
|
|
|
+
|
|
|
+ if symbols_str:
|
|
|
+ symbols = [s.strip() for s in symbols_str.split(',') if s.strip()]
|
|
|
+ for symbol in symbols:
|
|
|
+ text = text.replace(symbol, '')
|
|
|
+ return text
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# 统一审查服务
|
|
|
+# ============================================================================
|
|
|
+
|
|
|
+class TimelinessReviewService:
|
|
|
+ """时效性审查统一服务 - 支持 basis 和 content 两种来源"""
|
|
|
+
|
|
|
+ def __init__(self, max_concurrent: int = 10, db_pool=None):
|
|
|
+ self.max_concurrent = max_concurrent
|
|
|
+ self._semaphore = None
|
|
|
+ self.db_pool = db_pool
|
|
|
+ self._timeliness_reviewer = None
|
|
|
+ self.extractor = StandardExtractor()
|
|
|
+
|
|
|
+ async def __aenter__(self):
|
|
|
+ if self._semaphore is None:
|
|
|
+ self._semaphore = asyncio.Semaphore(self.max_concurrent)
|
|
|
+ if self._timeliness_reviewer is None:
|
|
|
+ self._timeliness_reviewer = StandardTimelinessReviewer(db_pool=self.db_pool)
|
|
|
+ if not self._timeliness_reviewer._service or not self._timeliness_reviewer._service._initialized:
|
|
|
+ await self._timeliness_reviewer.__aenter__()
|
|
|
+ return self
|
|
|
+
|
|
|
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
|
+ if self._timeliness_reviewer:
|
|
|
+ await self._timeliness_reviewer.__aexit__(exc_type, exc_val, exc_tb)
|
|
|
+ return False
|
|
|
+
|
|
|
+ async def review_batch(
|
|
|
+ self,
|
|
|
+ basis_items: List[str],
|
|
|
+ collection_name: str = "first_bfp_collection_status",
|
|
|
+ top_k_each: int = 10,
|
|
|
+ ) -> List[Dict[str, Any]]:
|
|
|
+ """从编制依据列表中审查"""
|
|
|
+ basis_items = [x for x in (basis_items or []) if isinstance(x, str) and x.strip()]
|
|
|
+ if not basis_items:
|
|
|
+ return []
|
|
|
+
|
|
|
+ async with self._semaphore:
|
|
|
+ try:
|
|
|
+ standards_list = []
|
|
|
+ for basis in basis_items:
|
|
|
+ std_info = self._extract_from_basis(basis)
|
|
|
+ if std_info:
|
|
|
+ standards_list.append(std_info)
|
|
|
+
|
|
|
+ if not standards_list:
|
|
|
+ return []
|
|
|
+
|
|
|
+ if not self._timeliness_reviewer:
|
|
|
+ raise RuntimeError("时效性审查器未初始化")
|
|
|
+
|
|
|
+ review_results = self._timeliness_reviewer.review_standards(standards_list)
|
|
|
+ await self._timeliness_reviewer.enrich_mismatch_details(review_results)
|
|
|
+
|
|
|
+ standardized_results = self._timeliness_reviewer.convert_to_standardized_format(
|
|
|
+ review_results,
|
|
|
+ check_item="timeliness_check",
|
|
|
+ chapter_code="basis",
|
|
|
+ check_item_code="basis_timeliness_check"
|
|
|
+ )
|
|
|
+
|
|
|
+ issue_count = sum(1 for item in standardized_results if item.get('exist_issue', False))
|
|
|
+ logger.info(f"编制依据批次审查完成:总计 {len(standards_list)} 项,发现问题 {issue_count} 项")
|
|
|
+
|
|
|
+ return standardized_results
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"批次处理失败: {e}")
|
|
|
+ return [{
|
|
|
+ "check_item": "timeliness_check",
|
|
|
+ "chapter_code": "basis",
|
|
|
+ "check_item_code": "basis_timeliness_check",
|
|
|
+ "check_result": {"error": str(e)},
|
|
|
+ "exist_issue": True,
|
|
|
+ "risk_info": {"risk_level": "high"}
|
|
|
+ }]
|
|
|
+
|
|
|
+ def _extract_from_basis(self, basis_text: str) -> Optional[Dict[str, str]]:
|
|
|
+ """从编制依据文本中提取标准名称和编号"""
|
|
|
+ if not basis_text:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 模式1: 《名称》(编号)全角
|
|
|
+ pattern1 = r'《([^《》]+)》\s*(([^)]+))'
|
|
|
+ match = re.search(pattern1, basis_text)
|
|
|
+ if match:
|
|
|
+ return {
|
|
|
+ "standard_name": match.group(1).strip(),
|
|
|
+ "standard_number": match.group(2).strip()
|
|
|
+ }
|
|
|
+
|
|
|
+ # 模式2: 《名称》(编号) 半角
|
|
|
+ pattern2 = r'《([^《》]+)》\s*\(([^)]+)\)'
|
|
|
+ match = re.search(pattern2, basis_text)
|
|
|
+ if match:
|
|
|
+ return {
|
|
|
+ "standard_name": match.group(1).strip(),
|
|
|
+ "standard_number": match.group(2).strip()
|
|
|
+ }
|
|
|
+
|
|
|
+ # 模式3: 仅标准号
|
|
|
+ standard_pattern = r'([A-Z]{2,6}(?:/[A-Z])?\s*\d{1,6}(?:\.\d)?(?:-\d{4})?)'
|
|
|
+ std_match = re.search(standard_pattern, basis_text.upper())
|
|
|
+ if std_match:
|
|
|
+ standard_number = std_match.group(1).strip()
|
|
|
+ name_match = re.search(r'《([^《》]+)》', basis_text)
|
|
|
+ return {
|
|
|
+ "standard_name": name_match.group(1).strip() if name_match else "",
|
|
|
+ "standard_number": standard_number
|
|
|
+ }
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ async def review_all(
|
|
|
+ self,
|
|
|
+ basis_items: BasisItems,
|
|
|
+ collection_name: str = "first_bfp_collection_status",
|
|
|
+ progress_manager=None,
|
|
|
+ callback_task_id: str = None
|
|
|
+ ) -> List[List[Dict[str, Any]]]:
|
|
|
+ """异步批量审查所有编制依据"""
|
|
|
+ if not basis_items or not getattr(basis_items, "items", None):
|
|
|
+ return []
|
|
|
+
|
|
|
+ items = [item.raw for item in basis_items.items if getattr(item, "raw", None)]
|
|
|
+ if not items:
|
|
|
+ return []
|
|
|
+
|
|
|
+ start_time = time.time()
|
|
|
+ total_batches = (len(items) + 2) // 3
|
|
|
+
|
|
|
+ if self._timeliness_reviewer and callback_task_id:
|
|
|
+ self._timeliness_reviewer.callback_task_id = callback_task_id
|
|
|
+
|
|
|
+ # 分批处理
|
|
|
+ batches = [items[i:i + 3] for i in range(0, len(items), 3)]
|
|
|
+
|
|
|
+ async def process_batch(batch_index: int, batch: List[str]) -> List[Dict[str, Any]]:
|
|
|
+ try:
|
|
|
+ result = await self.review_batch(batch, collection_name)
|
|
|
+ return result
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"批次 {batch_index} 处理失败: {e}")
|
|
|
+ return [{
|
|
|
+ "check_item": "timeliness_check",
|
|
|
+ "chapter_code": "basis",
|
|
|
+ "check_item_code": "basis_timeliness_check",
|
|
|
+ "check_result": {"error": str(e)},
|
|
|
+ "exist_issue": True,
|
|
|
+ "risk_info": {"risk_level": "high"}
|
|
|
+ }]
|
|
|
+
|
|
|
+ batch_tasks = [process_batch(i, batch) for i, batch in enumerate(batches)]
|
|
|
+ processed_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
|
|
|
+
|
|
|
+ final_results = []
|
|
|
+ for i, result in enumerate(processed_results):
|
|
|
+ if isinstance(result, Exception):
|
|
|
+ logger.error(f"批次 {i} 返回异常: {result}")
|
|
|
+ error_batch = batches[i] if i < len(batches) else []
|
|
|
+ final_results.append([{
|
|
|
+ "check_item": "timeliness_check",
|
|
|
+ "chapter_code": "basis",
|
|
|
+ "check_item_code": "basis_timeliness_check",
|
|
|
+ "check_result": {"error": str(result), "basis_items": error_batch},
|
|
|
+ "exist_issue": True,
|
|
|
+ "risk_info": {"risk_level": "high"}
|
|
|
+ }])
|
|
|
+ else:
|
|
|
+ final_results.append(result)
|
|
|
+
|
|
|
+ # 过滤空结果
|
|
|
+ final_results = [res for res in final_results if res]
|
|
|
+
|
|
|
+ elapsed_time = time.time() - start_time
|
|
|
+ logger.info(f"异步审查完成,耗时: {elapsed_time:.4f} 秒")
|
|
|
+
|
|
|
+ return final_results
|
|
|
+
|
|
|
+ async def review_from_content(
|
|
|
+ self,
|
|
|
+ content: str,
|
|
|
+ chapter_code: str = "content",
|
|
|
+ collection_name: str = "first_bfp_collection_status",
|
|
|
+ ) -> List[Dict[str, Any]]:
|
|
|
+ """从正文内容中提取规范引用并审查"""
|
|
|
+ if not content or not content.strip():
|
|
|
+ return []
|
|
|
+
|
|
|
+ async with self._semaphore:
|
|
|
+ try:
|
|
|
+ refs = self.extractor.extract_from_content(content)
|
|
|
+
|
|
|
+ if not refs:
|
|
|
+ logger.info(f"从内容中未提取到规范引用,章节: {chapter_code}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ logger.info(f"从内容中提取到 {len(refs)} 个规范引用,章节: {chapter_code}")
|
|
|
+
|
|
|
+ standards_list = [
|
|
|
+ {"standard_name": ref.name, "standard_number": ref.number}
|
|
|
+ for ref in refs
|
|
|
+ ]
|
|
|
+
|
|
|
+ if not self._timeliness_reviewer:
|
|
|
+ raise RuntimeError("时效性审查器未初始化")
|
|
|
+
|
|
|
+ review_results = self._timeliness_reviewer.review_standards(standards_list)
|
|
|
+ await self._timeliness_reviewer.enrich_mismatch_details(review_results)
|
|
|
+
|
|
|
+ standardized_results = self._timeliness_reviewer.convert_to_standardized_format(
|
|
|
+ review_results,
|
|
|
+ check_item="timeliness_check",
|
|
|
+ chapter_code=chapter_code,
|
|
|
+ check_item_code=f"{chapter_code}_timeliness_check"
|
|
|
+ )
|
|
|
+
|
|
|
+ issue_count = sum(1 for item in standardized_results if item.get('exist_issue', False))
|
|
|
+ logger.info(f"内容时效性审查完成:总计 {len(standards_list)} 项,发现问题 {issue_count} 项")
|
|
|
+
|
|
|
+ return standardized_results
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"内容时效性审查失败: {e}")
|
|
|
+ return [{
|
|
|
+ "check_item": "timeliness_check",
|
|
|
+ "chapter_code": chapter_code,
|
|
|
+ "check_item_code": f"{chapter_code}_timeliness_check",
|
|
|
+ "check_result": {"error": str(e)},
|
|
|
+ "exist_issue": True,
|
|
|
+ "risk_info": {"risk_level": "high"}
|
|
|
+ }]
|
|
|
+
|
|
|
+
|
|
|
+class StandardizedResponseProcessor:
|
|
|
+ """标准化响应处理器(兼容性保留)"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.inter_tool = InterTool()
|
|
|
+
|
|
|
+ def process_llm_response(self, response_text: str, check_name: str, chapter_code: str, check_item_code: str) -> List[Dict[str, Any]]:
|
|
|
+ """处理LLM响应,返回标准格式"""
|
|
|
+ try:
|
|
|
+ json_data = response_text
|
|
|
+ if isinstance(response_text, str):
|
|
|
+ json_data = self.inter_tool._extract_json_data(response_text)
|
|
|
+
|
|
|
+ parsed_result = []
|
|
|
+ if json_data and isinstance(json_data, list):
|
|
|
+ for item in json_data:
|
|
|
+ parsed_result.append(self.inter_tool._create_issue_item(item, check_name, chapter_code, check_item_code))
|
|
|
+ elif json_data and isinstance(json_data, dict):
|
|
|
+ parsed_result.append(self.inter_tool._create_issue_item(json_data, check_name, chapter_code, check_item_code))
|
|
|
+
|
|
|
+ return parsed_result
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"处理LLM响应失败: {e}")
|
|
|
+ return [{
|
|
|
+ "check_item": check_name,
|
|
|
+ "chapter_code": chapter_code,
|
|
|
+ "check_item_code": check_item_code,
|
|
|
+ "check_result": {"error": str(e)},
|
|
|
+ "exist_issue": True,
|
|
|
+ "risk_info": {"risk_level": "medium"}
|
|
|
+ }]
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# 便捷函数
|
|
|
+# ============================================================================
|
|
|
+
|
|
|
+async def review_basis_batch_async(
|
|
|
+ basis_items: List[str],
|
|
|
+ max_concurrent: int = 4,
|
|
|
+ db_pool=None
|
|
|
+) -> List[Dict[str, Any]]:
|
|
|
+ """异步批次审查便捷函数"""
|
|
|
+ async with TimelinessReviewService(max_concurrent=max_concurrent, db_pool=db_pool) as service:
|
|
|
+ return await service.review_batch(basis_items)
|
|
|
+
|
|
|
+
|
|
|
+async def review_all_basis_async(
|
|
|
+ basis_items: BasisItems,
|
|
|
+ max_concurrent: int = 4,
|
|
|
+ db_pool=None
|
|
|
+) -> List[List[Dict[str, Any]]]:
|
|
|
+ """异步全部审查便捷函数"""
|
|
|
+ async with TimelinessReviewService(max_concurrent=max_concurrent, db_pool=db_pool) as service:
|
|
|
+ return await service.review_all(basis_items)
|
|
|
+
|
|
|
+
|
|
|
+async def review_standards_timeliness(
|
|
|
+ standards_list: List[Dict[str, str]],
|
|
|
+ db_pool=None,
|
|
|
+ standard_service: Optional[StandardMatchingService] = None
|
|
|
+) -> List[TimelinessReviewResult]:
|
|
|
+ """审查标准列表时效性的便捷函数"""
|
|
|
+ async with StandardTimelinessReviewer(db_pool=db_pool, standard_service=standard_service) as reviewer:
|
|
|
+ review_results = reviewer.review_standards(standards_list)
|
|
|
+ await reviewer.enrich_mismatch_details(review_results)
|
|
|
+ return review_results
|
|
|
+
|
|
|
+
|
|
|
+async def review_standard_timeliness_with_standardized_output(
|
|
|
+ standards_list: List[Dict[str, str]],
|
|
|
+ db_pool=None,
|
|
|
+ standard_service: Optional[StandardMatchingService] = None,
|
|
|
+ check_item: str = "timeliness_check",
|
|
|
+ chapter_code: str = "basis",
|
|
|
+ check_item_code: str = "timeliness_check"
|
|
|
+) -> List[Dict[str, Any]]:
|
|
|
+ """审查标准列表时效性并输出标准格式的便捷函数"""
|
|
|
+ async with StandardTimelinessReviewer(db_pool=db_pool, standard_service=standard_service) as reviewer:
|
|
|
+ review_results = reviewer.review_standards(standards_list)
|
|
|
+ await reviewer.enrich_mismatch_details(review_results)
|
|
|
+ return reviewer.convert_to_standardized_format(review_results, check_item, chapter_code, check_item_code)
|