Просмотр исходного кода

Merge branch 'dev_sgsc_xth' of CRBC-MaaS-Platform-Project/LQAgentPlatform into dev

fix(sgsc-文档审计-xth): 修复目录缺失检查模糊匹配逻辑
LingMin 1 неделя назад
Родитель
Сommit
3ed71eb2fb

+ 23 - 16
core/construction_review/component/ai_review_engine.py

@@ -1047,34 +1047,41 @@ class AIReviewEngine(BaseReviewer):
                 elif isinstance(outline_raw, list):
                 elif isinstance(outline_raw, list):
                     outline_chapters = outline_raw
                     outline_chapters = outline_raw
 
 
-            # 提取一级和二级信息
-            outline_first = set()
-            outline_secondary = {}
+            # 🆕 提取一级和二级标题(用于独立模糊匹配)
+            # 结构:{first_code: {'title': '章节标题', 'subsections': ['二级标题1', '二级标题2']}}
+            outline_by_first: Dict[str, Dict] = {}
             
             
             for chapter in outline_chapters:
             for chapter in outline_chapters:
                 if not isinstance(chapter, dict):
                 if not isinstance(chapter, dict):
                     continue
                     continue
                 
                 
                 first_code = chapter.get('chapter_classification', '')
                 first_code = chapter.get('chapter_classification', '')
-                if first_code:
-                    outline_first.add(first_code)
+                first_title = chapter.get('title', '')
                 
                 
-                # 提取 subsections 中的二级章节
+                if not first_code:
+                    continue
+                
+                if first_code not in outline_by_first:
+                    outline_by_first[first_code] = {
+                        'title': first_title,
+                        'subsections': []
+                    }
+                
+                # 提取二级标题列表
                 for sub in chapter.get('subsections', []):
                 for sub in chapter.get('subsections', []):
                     if not isinstance(sub, dict):
                     if not isinstance(sub, dict):
                         continue
                         continue
-                    second_code = sub.get('secondary_category_code', '')
-                    if first_code and second_code:
-                        outline_secondary[(first_code, second_code)] = sub.get('title', '')
+                    sub_title = sub.get('title', '')
+                    if sub_title:
+                        outline_by_first[first_code]['subsections'].append(sub_title)
             
             
-            logger.info(f"[{name}] 获取到 {len(outline_first)} 个一级, {len(outline_secondary)} 个二级")
-
-            # 使用模糊匹配
+            logger.info(f"[{name}] 获取到 {len(outline_by_first)} 个一级章节")
+            
+            # 使用模糊匹配(基于标题)
             matcher = OutlineCatalogueMatcher(csv_path, raw_content_csv)
             matcher = OutlineCatalogueMatcher(csv_path, raw_content_csv)
-            match_result = matcher.match_catalogue(
-                outline_first=outline_first,
-                outline_secondary=outline_secondary,
-                threshold=0.6
+            match_result = matcher.match_catalogue_by_title(
+                outline_by_first=outline_by_first,
+                threshold=0.6  # 阈值0.6
             )
             )
             
             
             catalogue_result = {
             catalogue_result = {

+ 86 - 0
core/construction_review/component/document_processor.py

@@ -66,6 +66,91 @@ class DocumentComponents:
     text_splitter: TextSplitter
     text_splitter: TextSplitter
 
 
 
 
+# 二级分类标题关键词映射(用于outline的subsection分类)
+# 基于 StandardCategoryTable.csv,严格匹配标准目录名
+SECONDARY_CATEGORY_KEYWORDS = {
+    # 编制依据 (basis)
+    "basis": {
+        "LawsAndRegulations": ["法律法规"],  # 严格匹配
+        "StandardsAndSpecifications": ["标准规范"],  # 严格匹配
+        "DocumentSystems": ["文件制度"],  # 严格匹配
+        "CompilationPrinciples": ["编制原则"],  # 严格匹配
+        "CompilationScope": ["编制范围"],  # 严格匹配
+    },
+    # 工程概况 (overview)
+    "overview": {
+        "DesignSummary": ["设计概况"],  # 严格匹配
+        "GeologyWeather": ["工程地质与水文气象"],  # 严格匹配标准目录名
+        "Surroundings": ["周边环境"],  # 严格匹配
+        "LayoutPlan": ["施工平面及立面布置"],  # 严格匹配标准目录名
+        "RequirementsTech": ["施工要求和技术保证条件"],  # 严格匹配标准目录名
+        "RiskLevel": ["风险辨识与分级"],  # 严格匹配标准目录名
+        "Stakeholders": ["参建各方责任主体单位"],  # 严格匹配标准目录名
+    },
+    # 施工计划 (plan)
+    "plan": {
+        "Schedule": ["施工进度计划"],  # 严格匹配标准目录名
+        "Materials": ["施工材料计划"],  # 严格匹配标准目录名
+        "Equipment": ["施工设备计划"],  # 严格匹配标准目录名
+        "Workforce": ["劳动力计划"],  # 严格匹配
+        "SafetyCost": ["安全生产费用使用计划"],  # 严格匹配标准目录名
+    },
+    # 施工工艺技术 (technology)
+    "technology": {
+        # 按标准目录严格匹配,优先匹配完整名称避免歧义
+        "MethodsOverview": ["主要施工方法概述", "施工方法概述"],  # 不包含"施工方法"避免与Operations冲突
+        "TechParams": ["技术参数"],  # 不包含"参数"避免过于宽泛
+        "Process": ["工艺流程"],  # 不包含"流程"避免过于宽泛
+        "PrepWork": ["施工准备"],  # 不包含"准备"避免过于宽泛
+        "Operations": ["施工方法及操作要求", "施工方案及操作要求", "操作要求", "施工方案"],  # 最具体的放前面
+        "Inspection": ["检查要求"],  # 不包含"检查""验收"避免与其他章节冲突
+    },
+    # 安全保证措施 (safety)
+    "safety": {
+        "SafetySystem": ["安全保证体系"],  # 严格匹配标准目录名
+        "Organization": ["组织保证措施"],  # 严格匹配
+        "TechMeasures": ["技术保障措施", "技术保证措施"],  # 严格匹配(包含常见变体)
+        "Protection": ["安全防护措施"],  # 🆕 新增缺失的分类
+        "Monitoring": ["监测监控措施"],  # 严格匹配
+        "Emergency": ["应急处置措施"],  # 严格匹配
+    },
+    # 质量保证措施 (quality)
+    "quality": {
+        "QualitySystem": ["质量保证体系"],  # 严格匹配
+        "QualityGoals": ["质量目标"],  # 严格匹配
+        "Excellence": ["工程创优规划"],  # 严格匹配
+        "QualityControl": ["质量控制程序与具体措施"],  # 严格匹配标准目录名
+    },
+    # 环境保证措施 (environment)
+    "environment": {
+        "EnvSystem": ["环境保证体系"],  # 严格匹配
+        "EnvOrg": ["环境保护组织机构"],  # 严格匹配
+        "EnvProtection": ["环境保护及文明施工措施"],  # 严格匹配标准目录名
+    },
+    # 施工管理及作业人员配备与分工 (management)
+    "management": {
+        "Managers": ["施工管理人员"],  # 严格匹配
+        "SafetyStaff": ["专职安全生产管理人员"],  # 严格匹配标准目录名
+        "SpecialWorkers": ["特种作业人员"],  # 严格匹配
+        "OtherWorkers": ["其他作业人员"],  # 严格匹配
+    },
+    # 验收要求 (acceptance)
+    "acceptance": {
+        "Standards": ["验收标准"],  # 严格匹配
+        "Procedure": ["验收程序"],  # 严格匹配
+        "Content": ["验收内容"],  # 严格匹配
+        "Timing": ["验收时间"],  # 严格匹配
+        "Personnel": ["验收人员"],  # 严格匹配
+    },
+    # 其他资料 (other)
+    "other": {
+        "Calculations": ["计算书"],  # 严格匹配
+        "Drawings": ["相关施工图纸"],  # 严格匹配标准目录名
+        "Tables": ["附图附表"],  # 严格匹配
+        "Team": ["编制及审核人员情况"],  # 严格匹配标准目录名
+    },
+}
+
 class DocumentProcessor:
 class DocumentProcessor:
     """
     """
     文档处理器
     文档处理器
@@ -734,3 +819,4 @@ class DocumentProcessor:
         except Exception as e:
         except Exception as e:
             logger.error(f"基础PDF处理失败: {str(e)}", exc_info=True)
             logger.error(f"基础PDF处理失败: {str(e)}", exc_info=True)
             raise
             raise
+

+ 171 - 131
core/construction_review/component/outline_catalogue_matcher.py

@@ -8,6 +8,7 @@
 """
 """
 
 
 import difflib
 import difflib
+import logging
 import re
 import re
 from typing import Dict, List, Optional, Set, Tuple, Any
 from typing import Dict, List, Optional, Set, Tuple, Any
 from collections import defaultdict
 from collections import defaultdict
@@ -15,6 +16,8 @@ from pathlib import Path
 
 
 import pandas as pd
 import pandas as pd
 
 
+logger = logging.getLogger(__name__)
+
 
 
 class OutlineCatalogueMatcher:
 class OutlineCatalogueMatcher:
     """
     """
@@ -227,161 +230,198 @@ class OutlineCatalogueMatcher:
         
         
         return min(sum(scores), 1.0)
         return min(sum(scores), 1.0)
     
     
-    def match_catalogue(
+    def _match_by_title_fuzzy(
+        self,
+        standard_name: str,
+        candidate_titles: List[str],
+        threshold: float
+    ) -> Tuple[bool, float, Optional[str]]:
+        """
+        在候选标题中找到与标准名称最相似的一个
+        
+        Returns:
+            (是否匹配, 最佳分数, 匹配的标题)
+        """
+        best_score = 0.0
+        best_title = None
+        
+        for title in candidate_titles:
+            score = self._calculate_enhanced_similarity(standard_name, title)
+            if score > best_score:
+                best_score = score
+                best_title = title
+        
+        is_match = best_score >= threshold
+        return is_match, best_score, best_title
+    
+    def match_catalogue_by_title(
         self,
         self,
-        outline_first: Set[str],
-        outline_secondary: Dict[Tuple[str, str], str],
+        outline_by_first: Dict[str, Dict[str, any]],
         threshold: float = 0.6
         threshold: float = 0.6
     ) -> Dict[str, Any]:
     ) -> Dict[str, Any]:
         """
         """
-        执行目录匹配
+        🆕 基于标题的独立模糊匹配(一二级都独立)
         
         
         Args:
         Args:
-            outline_first: 从outline中提取的一级code集合
-            outline_secondary: 从outline中提取的二级 {(first_code, second_code): title}
-            threshold: 模糊匹配阈值(默认0.6)
+            outline_by_first: {
+                first_code: {
+                    'title': '一级标题',
+                    'subsections': ['二级标题1', '二级标题2', ...]
+                }
+            }
+            threshold: 匹配阈值,默认0.6
             
             
         Returns:
         Returns:
-            匹配结果,包含:
-            - matched_first: 匹配的一级code集合
-            - matched_second: 匹配的二级key集合
-            - missing_first: 缺失的一级列表
-            - missing_second: 缺失的二级列表
-            - match_details: 匹配详情
+            匹配结果
         """
         """
-        required_first = set(self.first_names.keys())
-        required_second = set(self.second_names.keys())
+        logger.info(f"[独立模糊匹配] 开始,阈值={threshold}")
+        
+        # ========== 一级目录匹配(独立模糊)==========
+        actual_first_titles = {
+            code: info['title'] 
+            for code, info in outline_by_first.items()
+        }
         
         
-        # 一级匹配
-        matched_first = outline_first & required_first
-        missing_first = required_first - matched_first
+        matched_first = set()
+        missing_first = []
+        
+        for req_code, req_name in self.first_names.items():
+            # 优先:直接用code精确匹配,因为一级分类通常较准
+            if req_code in actual_first_titles:
+                matched_first.add(req_code)
+                logger.debug(f"[一级匹配] {req_name}: 存在")
+            else:
+                # 尝试用标题模糊匹配
+                is_match, score, matched_title = self._match_by_title_fuzzy(
+                    req_name,
+                    list(actual_first_titles.values()),
+                    threshold
+                )
+                if is_match:
+                    # 找到匹配的标题,反向查找code
+                    for code, title in actual_first_titles.items():
+                        if title == matched_title:
+                            matched_first.add(req_code)
+                            logger.debug(f"[一级模糊匹配] {req_name} -> {matched_title} ({score:.3f})")
+                            break
+                else:
+                    missing_first.append({
+                        'first_code': req_code,
+                        'first_name': req_name,
+                        'first_seq': self.first_seq.get(req_code, 0)
+                    })
+                    logger.debug(f"[一级缺失] {req_name}")
+        
+        # ========== 二级目录匹配(结合一级 + 全局兜底)==========
+        # 🆕 先收集所有二级标题用于全局兜底
+        all_actual_second_titles = []
+        for fc, info in outline_by_first.items():
+            for sub_title in info.get('subsections', []):
+                all_actual_second_titles.append({
+                    'first_code': fc,
+                    'title': sub_title
+                })
         
         
-        # 二级匹配
         matched_second = set()
         matched_second = set()
-        missing_second = set()
+        missing_second = []
         match_details = []
         match_details = []
+        matched_actual_titles = set()  # 防重复
         
         
-        # 精确匹配
-        outline_second_keys = set(outline_secondary.keys())
-        exact_matches = outline_second_keys & required_second
-        matched_second.update(exact_matches)
-        
-        for key in exact_matches:
-            first_code, second_code = key
+        for req_key, req_name in self.second_names.items():
+            first_code, second_code = req_key
+            
+            # 🆕 步骤1:优先在同一一级下匹配
+            same_group_titles = outline_by_first.get(first_code, {}).get('subsections', [])
+            best_score_same = 0.0
+            best_match_same = None
+            
+            for title in same_group_titles:
+                if title in matched_actual_titles:
+                    continue
+                score = self._calculate_enhanced_similarity(req_name, title)
+                if score > best_score_same:
+                    best_score_same = score
+                    best_match_same = title
+            
+            # 同组匹配成功
+            if best_score_same >= threshold and best_match_same:
+                matched_second.add(req_key)
+                matched_actual_titles.add(best_match_same)
+                match_details.append({
+                    'level': 'second',
+                    'required_first_code': first_code,
+                    'required_second_code': second_code,
+                    'required_second_name': req_name,
+                    'matched': True,
+                    'match_type': 'same_group_fuzzy',
+                    'similarity': best_score_same,
+                    'matched_title': best_match_same
+                })
+                logger.debug(f"[二级同组匹配] {req_name} -> {best_match_same} ({best_score_same:.3f})")
+                continue
+            
+            # 🆕 步骤2:同组失败,尝试全局匹配(提高阈值防误匹配)
+            GLOBAL_THRESHOLD = 0.7  # 全局匹配阈值更高
+            best_score_global = 0.0
+            best_match_global = None
+            best_match_fc = None
+            
+            for actual in all_actual_second_titles:
+                if actual['title'] in matched_actual_titles:
+                    continue
+                score = self._calculate_enhanced_similarity(req_name, actual['title'])
+                if score > best_score_global:
+                    best_score_global = score
+                    best_match_global = actual['title']
+                    best_match_fc = actual['first_code']
+            
+            # 全局匹配成功(且跨组)
+            if best_score_global >= GLOBAL_THRESHOLD and best_match_global:
+                matched_second.add(req_key)
+                matched_actual_titles.add(best_match_global)
+                match_details.append({
+                    'level': 'second',
+                    'required_first_code': first_code,
+                    'required_second_code': second_code,
+                    'required_second_name': req_name,
+                    'matched': True,
+                    'match_type': 'cross_group_fuzzy',  # 标记为跨组匹配
+                    'similarity': best_score_global,
+                    'matched_title': best_match_global,
+                    'matched_actual_first': best_match_fc  # 实际匹配到的一级
+                })
+                logger.warning(f"[二级跨组匹配] {req_name}(应在{first_code}) -> {best_match_global}(实际在{best_match_fc}) ({best_score_global:.3f})")
+                continue
+            
+            # 都失败,记为缺失
+            best_score = max(best_score_same, best_score_global)
+            best_attempt = best_match_same or best_match_global
+            missing_second.append({
+                'first_code': first_code,
+                'first_name': self.first_names.get(first_code, ''),
+                'secondary_code': second_code,
+                'secondary_name': req_name,
+                'second_seq': self.second_seq.get(req_key, 0)
+            })
             match_details.append({
             match_details.append({
                 'level': 'second',
                 'level': 'second',
                 'required_first_code': first_code,
                 'required_first_code': first_code,
                 'required_second_code': second_code,
                 'required_second_code': second_code,
-                'required_second_name': self.second_names.get(key, ''),
-                'matched': True,
-                'match_type': 'exact',
-                'similarity': 1.0
-            })
-        
-        # 模糊匹配(对未精确匹配的)
-        required_remaining = required_second - exact_matches
-        outline_remaining = outline_second_keys - exact_matches
-        
-        if required_remaining and outline_remaining:
-            # 准备outline数据
-            outline_list = []
-            for key in outline_remaining:
-                first_code, second_code = key
-                title = outline_secondary.get(key, "")
-                outline_list.append({
-                    'key': key,
-                    'first_code': first_code,
-                    'second_code': second_code,
-                    'title': title
-                })
-            
-            # 对每个required进行模糊匹配
-            for req_key in required_remaining:
-                first_code, second_code = req_key
-                second_name = self.second_names.get(req_key, '')
-                first_name = self.first_names.get(first_code, '')
-                
-                # 获取详细定义
-                raw_content = self.second_raw_content.get((first_name, second_name))
-                
-                best_match = None
-                best_score = 0.0
-                
-                for item in outline_list:
-                    # 计算相似度
-                    score1 = self._calculate_enhanced_similarity(second_name, item['title'])
-                    score2 = self._calculate_enhanced_similarity(
-                        f"{first_name}{second_name}",
-                        item['title']
-                    )
-                    score = max(score1, score2)
-                    
-                    # 如果有详细定义,也计算
-                    if raw_content:
-                        score3 = self._calculate_enhanced_similarity(
-                            second_name,
-                            item['title'],
-                            raw_content
-                        )
-                        score = max(score, score3)
-                    
-                    if score > best_score:
-                        best_score = score
-                        best_match = item
-                
-                if best_score >= threshold:
-                    matched_second.add(req_key)
-                    match_details.append({
-                        'level': 'second',
-                        'required_first_code': first_code,
-                        'required_second_code': second_code,
-                        'required_second_name': second_name,
-                        'matched': True,
-                        'match_type': 'fuzzy',
-                        'similarity': best_score,
-                        'matched_title': best_match['title'] if best_match else None,
-                        'used_raw_content': raw_content is not None
-                    })
-                else:
-                    missing_second.add(req_key)
-                    match_details.append({
-                        'level': 'second',
-                        'required_first_code': first_code,
-                        'required_second_code': second_code,
-                        'required_second_name': second_name,
-                        'matched': False,
-                        'match_type': 'none',
-                        'similarity': best_score
-                    })
-        else:
-            missing_second = required_remaining
-        
-        # 构建缺失详情
-        missing_first_details = []
-        for code in sorted(missing_first, key=lambda x: self.first_seq.get(x, 0)):
-            missing_first_details.append({
-                'first_code': code,
-                'first_name': self.first_names.get(code, code),
-                'first_seq': self.first_seq.get(code, 0)
+                'required_second_name': req_name,
+                'matched': False,
+                'match_type': 'none',
+                'similarity': best_score,
+                'best_attempt': best_attempt
             })
             })
+            logger.debug(f"[二级缺失] {req_name} (最佳尝试: {best_attempt}, {best_score:.3f})")
         
         
-        missing_second_details = []
-        for key in sorted(missing_second, key=lambda x: (self.first_seq.get(x[0], 0), self.second_seq.get(x, 0))):
-            first_code, second_code = key
-            missing_second_details.append({
-                'first_code': first_code,
-                'first_name': self.first_names.get(first_code, first_code),
-                'first_seq': self.first_seq.get(first_code, 0),
-                'secondary_code': second_code,
-                'secondary_name': self.second_names.get(key, ''),
-                'second_seq': self.second_seq.get(key, 0)
-            })
+        logger.info(f"[独立模糊匹配] 完成:一级缺失 {len(missing_first)} 个,二级缺失 {len(missing_second)} 个")
         
         
         return {
         return {
             'matched_first': matched_first,
             'matched_first': matched_first,
             'matched_second': matched_second,
             'matched_second': matched_second,
-            'missing_first': missing_first_details,
-            'missing_second': missing_second_details,
+            'missing_first': missing_first,
+            'missing_second': missing_second,
             'missing_first_count': len(missing_first),
             'missing_first_count': len(missing_first),
             'missing_second_count': len(missing_second),
             'missing_second_count': len(missing_second),
             'match_details': match_details
             'match_details': match_details