Bladeren bron

fix(sgsc-文档审计-xth): 修复目录缺失检查模糊匹配逻辑

xgo 1 week geleden
bovenliggende
commit
cb5c4551f5

+ 23 - 16
core/construction_review/component/ai_review_engine.py

@@ -1047,34 +1047,41 @@ class AIReviewEngine(BaseReviewer):
                 elif isinstance(outline_raw, list):
                     outline_chapters = outline_raw
 
-            # 提取一级和二级信息
-            outline_first = set()
-            outline_secondary = {}
+            # 🆕 提取一级和二级标题(用于独立模糊匹配)
+            # 结构:{first_code: {'title': '章节标题', 'subsections': ['二级标题1', '二级标题2']}}
+            outline_by_first: Dict[str, Dict] = {}
             
             for chapter in outline_chapters:
                 if not isinstance(chapter, dict):
                     continue
                 
                 first_code = chapter.get('chapter_classification', '')
-                if first_code:
-                    outline_first.add(first_code)
+                first_title = chapter.get('title', '')
                 
-                # 提取 subsections 中的二级章节
+                if not first_code:
+                    continue
+                
+                if first_code not in outline_by_first:
+                    outline_by_first[first_code] = {
+                        'title': first_title,
+                        'subsections': []
+                    }
+                
+                # 提取二级标题列表
                 for sub in chapter.get('subsections', []):
                     if not isinstance(sub, dict):
                         continue
-                    second_code = sub.get('secondary_category_code', '')
-                    if first_code and second_code:
-                        outline_secondary[(first_code, second_code)] = sub.get('title', '')
+                    sub_title = sub.get('title', '')
+                    if sub_title:
+                        outline_by_first[first_code]['subsections'].append(sub_title)
             
-            logger.info(f"[{name}] 获取到 {len(outline_first)} 个一级, {len(outline_secondary)} 个二级")
-
-            # 使用模糊匹配
+            logger.info(f"[{name}] 获取到 {len(outline_by_first)} 个一级章节")
+            
+            # 使用模糊匹配(基于标题)
             matcher = OutlineCatalogueMatcher(csv_path, raw_content_csv)
-            match_result = matcher.match_catalogue(
-                outline_first=outline_first,
-                outline_secondary=outline_secondary,
-                threshold=0.6
+            match_result = matcher.match_catalogue_by_title(
+                outline_by_first=outline_by_first,
+                threshold=0.6  # 阈值0.6
             )
             
             catalogue_result = {

+ 2 - 1
core/construction_review/component/document_processor.py

@@ -99,7 +99,8 @@ SECONDARY_CATEGORY_KEYWORDS = {
     "safety": {
         "SafetySystem": ["安全保证体系"],  # 严格匹配标准目录名
         "Organization": ["组织保证措施"],  # 严格匹配
-        "TechMeasures": ["技术保障措施"],  # 严格匹配
+        "TechMeasures": ["技术保障措施", "技术保证措施"],  # 严格匹配(包含常见变体)
+        "Protection": ["安全防护措施"],  # 🆕 新增缺失的分类
         "Monitoring": ["监测监控措施"],  # 严格匹配
         "Emergency": ["应急处置措施"],  # 严格匹配
     },

+ 171 - 131
core/construction_review/component/outline_catalogue_matcher.py

@@ -8,6 +8,7 @@
 """
 
 import difflib
+import logging
 import re
 from typing import Dict, List, Optional, Set, Tuple, Any
 from collections import defaultdict
@@ -15,6 +16,8 @@ from pathlib import Path
 
 import pandas as pd
 
+logger = logging.getLogger(__name__)
+
 
 class OutlineCatalogueMatcher:
     """
@@ -227,161 +230,198 @@ class OutlineCatalogueMatcher:
         
         return min(sum(scores), 1.0)
     
-    def match_catalogue(
+    def _match_by_title_fuzzy(
+        self,
+        standard_name: str,
+        candidate_titles: List[str],
+        threshold: float
+    ) -> Tuple[bool, float, Optional[str]]:
+        """
+        在候选标题中找到与标准名称最相似的一个
+        
+        Returns:
+            (是否匹配, 最佳分数, 匹配的标题)
+        """
+        best_score = 0.0
+        best_title = None
+        
+        for title in candidate_titles:
+            score = self._calculate_enhanced_similarity(standard_name, title)
+            if score > best_score:
+                best_score = score
+                best_title = title
+        
+        is_match = best_score >= threshold
+        return is_match, best_score, best_title
+    
+    def match_catalogue_by_title(
         self,
-        outline_first: Set[str],
-        outline_secondary: Dict[Tuple[str, str], str],
+        outline_by_first: Dict[str, Dict[str, any]],
         threshold: float = 0.6
     ) -> Dict[str, Any]:
         """
-        执行目录匹配
+        🆕 基于标题的独立模糊匹配(一二级都独立)
         
         Args:
-            outline_first: 从outline中提取的一级code集合
-            outline_secondary: 从outline中提取的二级 {(first_code, second_code): title}
-            threshold: 模糊匹配阈值(默认0.6)
+            outline_by_first: {
+                first_code: {
+                    'title': '一级标题',
+                    'subsections': ['二级标题1', '二级标题2', ...]
+                }
+            }
+            threshold: 匹配阈值,默认0.6
             
         Returns:
-            匹配结果,包含:
-            - matched_first: 匹配的一级code集合
-            - matched_second: 匹配的二级key集合
-            - missing_first: 缺失的一级列表
-            - missing_second: 缺失的二级列表
-            - match_details: 匹配详情
+            匹配结果
         """
-        required_first = set(self.first_names.keys())
-        required_second = set(self.second_names.keys())
+        logger.info(f"[独立模糊匹配] 开始,阈值={threshold}")
+        
+        # ========== 一级目录匹配(独立模糊)==========
+        actual_first_titles = {
+            code: info['title'] 
+            for code, info in outline_by_first.items()
+        }
         
-        # 一级匹配
-        matched_first = outline_first & required_first
-        missing_first = required_first - matched_first
+        matched_first = set()
+        missing_first = []
+        
+        for req_code, req_name in self.first_names.items():
+            # 优先:直接用code精确匹配,因为一级分类通常较准
+            if req_code in actual_first_titles:
+                matched_first.add(req_code)
+                logger.debug(f"[一级匹配] {req_name}: 存在")
+            else:
+                # 尝试用标题模糊匹配
+                is_match, score, matched_title = self._match_by_title_fuzzy(
+                    req_name,
+                    list(actual_first_titles.values()),
+                    threshold
+                )
+                if is_match:
+                    # 找到匹配的标题,反向查找code
+                    for code, title in actual_first_titles.items():
+                        if title == matched_title:
+                            matched_first.add(req_code)
+                            logger.debug(f"[一级模糊匹配] {req_name} -> {matched_title} ({score:.3f})")
+                            break
+                else:
+                    missing_first.append({
+                        'first_code': req_code,
+                        'first_name': req_name,
+                        'first_seq': self.first_seq.get(req_code, 0)
+                    })
+                    logger.debug(f"[一级缺失] {req_name}")
+        
+        # ========== 二级目录匹配(结合一级 + 全局兜底)==========
+        # 🆕 先收集所有二级标题用于全局兜底
+        all_actual_second_titles = []
+        for fc, info in outline_by_first.items():
+            for sub_title in info.get('subsections', []):
+                all_actual_second_titles.append({
+                    'first_code': fc,
+                    'title': sub_title
+                })
         
-        # 二级匹配
         matched_second = set()
-        missing_second = set()
+        missing_second = []
         match_details = []
+        matched_actual_titles = set()  # 防重复
         
-        # 精确匹配
-        outline_second_keys = set(outline_secondary.keys())
-        exact_matches = outline_second_keys & required_second
-        matched_second.update(exact_matches)
-        
-        for key in exact_matches:
-            first_code, second_code = key
+        for req_key, req_name in self.second_names.items():
+            first_code, second_code = req_key
+            
+            # 🆕 步骤1:优先在同一一级下匹配
+            same_group_titles = outline_by_first.get(first_code, {}).get('subsections', [])
+            best_score_same = 0.0
+            best_match_same = None
+            
+            for title in same_group_titles:
+                if title in matched_actual_titles:
+                    continue
+                score = self._calculate_enhanced_similarity(req_name, title)
+                if score > best_score_same:
+                    best_score_same = score
+                    best_match_same = title
+            
+            # 同组匹配成功
+            if best_score_same >= threshold and best_match_same:
+                matched_second.add(req_key)
+                matched_actual_titles.add(best_match_same)
+                match_details.append({
+                    'level': 'second',
+                    'required_first_code': first_code,
+                    'required_second_code': second_code,
+                    'required_second_name': req_name,
+                    'matched': True,
+                    'match_type': 'same_group_fuzzy',
+                    'similarity': best_score_same,
+                    'matched_title': best_match_same
+                })
+                logger.debug(f"[二级同组匹配] {req_name} -> {best_match_same} ({best_score_same:.3f})")
+                continue
+            
+            # 🆕 步骤2:同组失败,尝试全局匹配(提高阈值防误匹配)
+            GLOBAL_THRESHOLD = 0.7  # 全局匹配阈值更高
+            best_score_global = 0.0
+            best_match_global = None
+            best_match_fc = None
+            
+            for actual in all_actual_second_titles:
+                if actual['title'] in matched_actual_titles:
+                    continue
+                score = self._calculate_enhanced_similarity(req_name, actual['title'])
+                if score > best_score_global:
+                    best_score_global = score
+                    best_match_global = actual['title']
+                    best_match_fc = actual['first_code']
+            
+            # 全局匹配成功(且跨组)
+            if best_score_global >= GLOBAL_THRESHOLD and best_match_global:
+                matched_second.add(req_key)
+                matched_actual_titles.add(best_match_global)
+                match_details.append({
+                    'level': 'second',
+                    'required_first_code': first_code,
+                    'required_second_code': second_code,
+                    'required_second_name': req_name,
+                    'matched': True,
+                    'match_type': 'cross_group_fuzzy',  # 标记为跨组匹配
+                    'similarity': best_score_global,
+                    'matched_title': best_match_global,
+                    'matched_actual_first': best_match_fc  # 实际匹配到的一级
+                })
+                logger.warning(f"[二级跨组匹配] {req_name}(应在{first_code}) -> {best_match_global}(实际在{best_match_fc}) ({best_score_global:.3f})")
+                continue
+            
+            # 都失败,记为缺失
+            best_score = max(best_score_same, best_score_global)
+            best_attempt = best_match_same or best_match_global
+            missing_second.append({
+                'first_code': first_code,
+                'first_name': self.first_names.get(first_code, ''),
+                'secondary_code': second_code,
+                'secondary_name': req_name,
+                'second_seq': self.second_seq.get(req_key, 0)
+            })
             match_details.append({
                 'level': 'second',
                 'required_first_code': first_code,
                 'required_second_code': second_code,
-                'required_second_name': self.second_names.get(key, ''),
-                'matched': True,
-                'match_type': 'exact',
-                'similarity': 1.0
-            })
-        
-        # 模糊匹配(对未精确匹配的)
-        required_remaining = required_second - exact_matches
-        outline_remaining = outline_second_keys - exact_matches
-        
-        if required_remaining and outline_remaining:
-            # 准备outline数据
-            outline_list = []
-            for key in outline_remaining:
-                first_code, second_code = key
-                title = outline_secondary.get(key, "")
-                outline_list.append({
-                    'key': key,
-                    'first_code': first_code,
-                    'second_code': second_code,
-                    'title': title
-                })
-            
-            # 对每个required进行模糊匹配
-            for req_key in required_remaining:
-                first_code, second_code = req_key
-                second_name = self.second_names.get(req_key, '')
-                first_name = self.first_names.get(first_code, '')
-                
-                # 获取详细定义
-                raw_content = self.second_raw_content.get((first_name, second_name))
-                
-                best_match = None
-                best_score = 0.0
-                
-                for item in outline_list:
-                    # 计算相似度
-                    score1 = self._calculate_enhanced_similarity(second_name, item['title'])
-                    score2 = self._calculate_enhanced_similarity(
-                        f"{first_name}{second_name}",
-                        item['title']
-                    )
-                    score = max(score1, score2)
-                    
-                    # 如果有详细定义,也计算
-                    if raw_content:
-                        score3 = self._calculate_enhanced_similarity(
-                            second_name,
-                            item['title'],
-                            raw_content
-                        )
-                        score = max(score, score3)
-                    
-                    if score > best_score:
-                        best_score = score
-                        best_match = item
-                
-                if best_score >= threshold:
-                    matched_second.add(req_key)
-                    match_details.append({
-                        'level': 'second',
-                        'required_first_code': first_code,
-                        'required_second_code': second_code,
-                        'required_second_name': second_name,
-                        'matched': True,
-                        'match_type': 'fuzzy',
-                        'similarity': best_score,
-                        'matched_title': best_match['title'] if best_match else None,
-                        'used_raw_content': raw_content is not None
-                    })
-                else:
-                    missing_second.add(req_key)
-                    match_details.append({
-                        'level': 'second',
-                        'required_first_code': first_code,
-                        'required_second_code': second_code,
-                        'required_second_name': second_name,
-                        'matched': False,
-                        'match_type': 'none',
-                        'similarity': best_score
-                    })
-        else:
-            missing_second = required_remaining
-        
-        # 构建缺失详情
-        missing_first_details = []
-        for code in sorted(missing_first, key=lambda x: self.first_seq.get(x, 0)):
-            missing_first_details.append({
-                'first_code': code,
-                'first_name': self.first_names.get(code, code),
-                'first_seq': self.first_seq.get(code, 0)
+                'required_second_name': req_name,
+                'matched': False,
+                'match_type': 'none',
+                'similarity': best_score,
+                'best_attempt': best_attempt
             })
+            logger.debug(f"[二级缺失] {req_name} (最佳尝试: {best_attempt}, {best_score:.3f})")
         
-        missing_second_details = []
-        for key in sorted(missing_second, key=lambda x: (self.first_seq.get(x[0], 0), self.second_seq.get(x, 0))):
-            first_code, second_code = key
-            missing_second_details.append({
-                'first_code': first_code,
-                'first_name': self.first_names.get(first_code, first_code),
-                'first_seq': self.first_seq.get(first_code, 0),
-                'secondary_code': second_code,
-                'secondary_name': self.second_names.get(key, ''),
-                'second_seq': self.second_seq.get(key, 0)
-            })
+        logger.info(f"[独立模糊匹配] 完成:一级缺失 {len(missing_first)} 个,二级缺失 {len(missing_second)} 个")
         
         return {
             'matched_first': matched_first,
             'matched_second': matched_second,
-            'missing_first': missing_first_details,
-            'missing_second': missing_second_details,
+            'missing_first': missing_first,
+            'missing_second': missing_second,
             'missing_first_count': len(missing_first),
             'missing_second_count': len(missing_second),
             'match_details': match_details