Преглед изворни кода

feat(sgsc-审查模块-xth): 新增目录一二级缺失统计功能

xgo пре 1 недеља
родитељ
комит
6e223b0ef8

+ 141 - 6
core/construction_review/component/ai_review_engine.py

@@ -854,14 +854,149 @@ class AIReviewEngine(BaseReviewer):
             }
             return error_result, trace_id_idx
 
-    async def outline_Check(self, trace_id_idx: str, review_content: Dict[str, Any],
-                            state: str, stage_name: str) -> Dict[str, Any]:
-            if state and isinstance(state, dict):
-                structured = state.get('structured_content', {})
+    async def check_outline_catalogue(
+        self,
+        trace_id_idx: str,
+        outline_data: Dict[str, Any],
+        state: Dict[str, Any],
+        stage_name: str
+    ) -> Dict[str, Any]:
+        """
+        基于文档目录(outline)的一二级分类缺失检查。
+
+        使用模糊匹配算法,支持基于名称相似度的目录匹配。
+
+        Args:
+            trace_id_idx: 追踪ID索引
+            outline_data: 包含 outline 的字典(通常来自 structured_content)
+            state: 状态字典,可从中回退获取 structured_content.outline
+            stage_name: 阶段名称
+
+        Returns:
+            Dict[str, Any]: 包含缺失一级、二级目录的统计结果
+        """
+        from .outline_catalogue_matcher import OutlineCatalogueMatcher
+
+        start_time = time.time()
+        name = "outline_catalogue_check"
+
+        try:
+            logger.info(f"[{name}] 开始目录一二级缺失检查")
+
+            # CSV路径
+            csv_path = str(
+                Path(__file__).parent / 'doc_worker' / 'config' /
+                'StandardCategoryTable.csv'
+            )
+            raw_content_csv = str(
+                Path(__file__).parent / 'doc_worker' / 'config' /
+                'construction_plan_standards.csv'
+            )
+
+            # 从 outline_data 或 state 中获取 chapters 列表
+            outline_chapters = []
+            if outline_data and isinstance(outline_data, dict):
+                outline_raw = outline_data.get('outline') or outline_data
+                if isinstance(outline_raw, dict):
+                    outline_chapters = outline_raw.get('chapters', [])
+                elif isinstance(outline_raw, list):
+                    outline_chapters = outline_raw
 
+            if not outline_chapters and state and isinstance(state, dict):
+                structured = state.get('structured_content', {})
+                outline_raw = structured.get('outline', {})
+                if isinstance(outline_raw, dict):
+                    outline_chapters = outline_raw.get('chapters', [])
+                elif isinstance(outline_raw, list):
+                    outline_chapters = outline_raw
+
+            # 提取一级和二级信息
+            outline_first = set()
+            outline_secondary = {}
+            
+            for chapter in outline_chapters:
+                if not isinstance(chapter, dict):
+                    continue
                 
-                outline = structured.get('outline')
-                all_chunks = structured.get('chunks', [])
+                first_code = chapter.get('chapter_classification', '')
+                if first_code:
+                    outline_first.add(first_code)
+                
+                # 提取 subsections 中的二级章节
+                for sub in chapter.get('subsections', []):
+                    if not isinstance(sub, dict):
+                        continue
+                    second_code = sub.get('secondary_category_code', '')
+                    if first_code and second_code:
+                        outline_secondary[(first_code, second_code)] = sub.get('title', '')
+            
+            logger.info(f"[{name}] 获取到 {len(outline_first)} 个一级, {len(outline_secondary)} 个二级")
+
+            # 使用模糊匹配
+            matcher = OutlineCatalogueMatcher(csv_path, raw_content_csv)
+            match_result = matcher.match_catalogue(
+                outline_first=outline_first,
+                outline_secondary=outline_secondary,
+                threshold=0.6
+            )
+            
+            catalogue_result = {
+                "level": "primary_and_secondary",
+                "is_complete": match_result['missing_first_count'] == 0 and match_result['missing_second_count'] == 0,
+                "first_level": {
+                    "total_required": len(matcher.first_names),
+                    "actual_present": len(match_result['matched_first']),
+                    "missing_count": match_result['missing_first_count'],
+                    "missing": match_result['missing_first']
+                },
+                "second_level": {
+                    "total_required": len(matcher.second_names),
+                    "actual_present": len(match_result['matched_second']),
+                    "missing_count": match_result['missing_second_count'],
+                    "missing": match_result['missing_second']
+                },
+                "match_details": match_result['match_details']
+            }
+
+            execution_time = time.time() - start_time
+            logger.info(
+                f"[{name}] 检查完成,耗时: {execution_time:.2f}s, "
+                f"缺失一级: {match_result['missing_first_count']} 个, "
+                f"缺失二级: {match_result['missing_second_count']} 个"
+            )
+
+            return {
+                "success": True,
+                "execution_time": execution_time,
+                "details": {
+                    "name": name,
+                    "missing_first_count": match_result['missing_first_count'],
+                    "missing_second_count": match_result['missing_second_count'],
+                    "missing_first": match_result['missing_first'],
+                    "missing_second": match_result['missing_second'],
+                    "catalogue_check": catalogue_result
+                }
+            }
+
+        except Exception as e:
+            execution_time = time.time() - start_time
+            error_msg = f"[{name}] 检查失败: {str(e)}"
+            logger.error(error_msg, exc_info=True)
+
+            return {
+                "success": False,
+                "execution_time": execution_time,
+                "error": str(e),
+                "details": {
+                    "name": name,
+                    "missing_first_count": 0,
+                    "missing_second_count": 0,
+                    "missing_first": [],
+                    "missing_second": [],
+                    "catalogue_check": {}
+                }
+            }
+
     async def check_sensitive(self, trace_id_idx: str, review_content: str,
                             state: str, stage_name: str) -> Dict[str, Any]:
         """

+ 388 - 0
core/construction_review/component/outline_catalogue_matcher.py

@@ -0,0 +1,388 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+目录缺失检查 - 模糊匹配模块
+
+独立模块,用于 AIReviewEngine.check_outline_catalogue 方法
+提供基于模糊匹配的目录缺失统计功能
+"""
+
+import difflib
+import re
+from typing import Dict, List, Optional, Set, Tuple, Any
+from collections import defaultdict
+from pathlib import Path
+
+import pandas as pd
+
+
+class OutlineCatalogueMatcher:
+    """
+    目录模糊匹配器
+    
+    提供独立于 LightweightCompletenessChecker 的模糊匹配功能
+    支持基于名称相似度的目录匹配
+    """
+    
+    def __init__(self, standard_csv_path: str, raw_content_csv_path: str = None):
+        """
+        初始化匹配器
+        
+        Args:
+            standard_csv_path: StandardCategoryTable.csv 路径
+            raw_content_csv_path: construction_plan_standards.csv 路径(可选)
+        """
+        self.standard_csv_path = standard_csv_path
+        self.raw_content_csv_path = raw_content_csv_path
+        
+        # 加载标准数据
+        self.first_names: Dict[str, str] = {}  # code -> name
+        self.second_names: Dict[Tuple[str, str], str] = {}  # (first_code, second_code) -> name
+        self.first_seq: Dict[str, int] = {}  # code -> seq
+        self.second_seq: Dict[Tuple[str, str], int] = {}  # (first_code, second_code) -> seq
+        
+        # 详细定义内容
+        self.second_raw_content: Dict[Tuple[str, str], str] = {}  # (first_name, second_name) -> content
+        
+        self._load_standard_csv()
+        if raw_content_csv_path:
+            self._load_raw_content_csv()
+    
+    def _load_standard_csv(self) -> None:
+        """加载标准分类表"""
+        encodings = ['utf-8-sig', 'utf-16', 'gbk', 'utf-8']
+        df = None
+        
+        for encoding in encodings:
+            try:
+                df = pd.read_csv(self.standard_csv_path, encoding=encoding, sep=None, engine='python')
+                break
+            except UnicodeDecodeError:
+                continue
+        
+        if df is None:
+            raise ValueError(f"无法读取CSV文件: {self.standard_csv_path}")
+        
+        df.columns = [c.strip().lower().replace(' ', '_') for c in df.columns]
+        
+        # 提取一级和二级信息(去重)
+        for _, row in df.iterrows():
+            first_code = str(row.get('first_code', '')).strip()
+            second_code = str(row.get('second_code', '')).strip()
+            first_name = str(row.get('first_name', '')).strip()
+            second_name = str(row.get('second_name', '')).strip()
+            
+            if not all([first_code, second_code, first_name, second_name]):
+                continue
+            
+            try:
+                first_seq = int(row.get('first_seq', 0) or 0)
+                second_seq = int(row.get('second_seq', 0) or 0)
+            except:
+                first_seq = 0
+                second_seq = 0
+            
+            # 存储一级信息
+            if first_code not in self.first_names:
+                self.first_names[first_code] = first_name
+                self.first_seq[first_code] = first_seq
+            
+            # 存储二级信息
+            sec_key = (first_code, second_code)
+            if sec_key not in self.second_names:
+                self.second_names[sec_key] = second_name
+                self.second_seq[sec_key] = second_seq
+    
+    def _load_raw_content_csv(self) -> None:
+        """加载详细定义表"""
+        try:
+            encodings = ['utf-8-sig', 'utf-16', 'gbk', 'utf-8']
+            df = None
+            
+            for encoding in encodings:
+                try:
+                    df = pd.read_csv(self.raw_content_csv_path, encoding=encoding, sep=None, engine='python')
+                    break
+                except UnicodeDecodeError:
+                    continue
+            
+            if df is None:
+                return
+            
+            df.columns = [c.strip().lower().replace(' ', '_') for c in df.columns]
+            
+            if 'second_raw_content' not in df.columns:
+                return
+            
+            for _, row in df.iterrows():
+                first_name = str(row.get('first_name', '')).strip()
+                second_name = str(row.get('second_name', '')).strip()
+                raw_content = str(row.get('second_raw_content', '')).strip()
+                
+                if first_name and second_name and raw_content and raw_content != 'nan':
+                    self.second_raw_content[(first_name, second_name)] = raw_content
+                    
+        except Exception:
+            pass  # 加载失败不影响主功能
+    
+    def _normalize_text(self, text: str) -> str:
+        """文本标准化"""
+        if not text:
+            return ""
+        text = re.sub(r'[\s\n\r\t.,;:!?,。;:!?、""''()()【】\[\]《》<>]', '', text)
+        return text.lower().strip()
+    
+    def _calculate_similarity(self, text1: str, text2: str) -> float:
+        """计算两个文本的相似度"""
+        if not text1 or not text2:
+            return 0.0
+        
+        norm1 = self._normalize_text(text1)
+        norm2 = self._normalize_text(text2)
+        
+        if not norm1 or not norm2:
+            return 0.0
+        
+        return difflib.SequenceMatcher(None, norm1, norm2).ratio()
+    
+    def _extract_keywords(self, text: str) -> List[str]:
+        """提取关键词"""
+        stopwords = {'的', '及', '与', '或', '和', '等', '之', '第', '章', '节', '条',
+                     '编制', '施工', '措施', '要求', '管理', '保证', '质量', '安全',
+                     '技术', '计划', '人员', '组织', '体系', '条件', '概述', '概况'}
+        
+        words = []
+        for word in text:
+            if word not in stopwords and len(word.strip()) > 0:
+                words.append(word)
+        
+        if not words and text:
+            return list(text)
+        
+        return words
+    
+    def _calculate_enhanced_similarity(
+        self,
+        standard_name: str,
+        actual_title: str,
+        standard_raw_content: str = None
+    ) -> float:
+        """
+        增强的相似度计算 - 基础相似度主导
+        
+        策略:
+        1. 基础相似度(SequenceMatcher)- 核心,必须 >= 0.3 才能进入加分
+        2. 关键词匹配(+0.2)- 辅助
+        3. 包含关系(+0.1)- 辅助
+        4. 详细定义匹配(+0.2)- 辅助
+        
+        规则:基础相似度 < 0.3 时,直接返回基础分(避免完全不相关的匹配)
+        """
+        if not standard_name or not actual_title:
+            return 0.0
+        
+        # 1. 基础相似度(核心)
+        base_similarity = self._calculate_similarity(standard_name, actual_title)
+        
+        # 基础相似度太低,说明完全不相关,不进入加分阶段
+        if base_similarity < 0.3:
+            return base_similarity
+        
+        # 基础相似度达标,开始计算加分
+        scores = [base_similarity]
+        
+        norm_standard = self._normalize_text(standard_name)
+        norm_actual = self._normalize_text(actual_title)
+        
+        # 2. 关键词匹配(权重0.2,比原来降低)
+        keyword_bonus = 0.0
+        standard_keywords = self._extract_keywords(norm_standard)
+        actual_keywords = self._extract_keywords(norm_actual)
+        
+        if standard_keywords and actual_keywords:
+            matched = len(set(standard_keywords) & set(actual_keywords))
+            total = len(set(standard_keywords) | set(actual_keywords))
+            if total > 0:
+                # 权重从0.3降到0.2,避免关键词过度影响
+                keyword_bonus = (matched / total) * 0.2
+        
+        scores.append(keyword_bonus)
+        
+        # 3. 包含关系(权重0.1,比原来降低)
+        contain_bonus = 0.0
+        if norm_standard in norm_actual or norm_actual in norm_standard:
+            contain_bonus = 0.1
+        scores.append(contain_bonus)
+        
+        # 4. 详细定义匹配(权重0.2,比原来降低)
+        if standard_raw_content and standard_raw_content != 'nan':
+            raw_content_score = self._calculate_similarity(
+                self._normalize_text(standard_raw_content),
+                norm_actual
+            )
+            # 阈值提高到0.6(原来0.5),确保详细定义必须足够相关才加分
+            if raw_content_score > 0.6:
+                # 权重从0.4降到0.2,避免详细定义过度影响
+                scores.append(raw_content_score * 0.2)
+        
+        return min(sum(scores), 1.0)
+    
+    def match_catalogue(
+        self,
+        outline_first: Set[str],
+        outline_secondary: Dict[Tuple[str, str], str],
+        threshold: float = 0.6
+    ) -> Dict[str, Any]:
+        """
+        执行目录匹配
+        
+        Args:
+            outline_first: 从outline中提取的一级code集合
+            outline_secondary: 从outline中提取的二级 {(first_code, second_code): title}
+            threshold: 模糊匹配阈值(默认0.6)
+            
+        Returns:
+            匹配结果,包含:
+            - matched_first: 匹配的一级code集合
+            - matched_second: 匹配的二级key集合
+            - missing_first: 缺失的一级列表
+            - missing_second: 缺失的二级列表
+            - match_details: 匹配详情
+        """
+        required_first = set(self.first_names.keys())
+        required_second = set(self.second_names.keys())
+        
+        # 一级匹配
+        matched_first = outline_first & required_first
+        missing_first = required_first - matched_first
+        
+        # 二级匹配
+        matched_second = set()
+        missing_second = set()
+        match_details = []
+        
+        # 精确匹配
+        outline_second_keys = set(outline_secondary.keys())
+        exact_matches = outline_second_keys & required_second
+        matched_second.update(exact_matches)
+        
+        for key in exact_matches:
+            first_code, second_code = key
+            match_details.append({
+                'level': 'second',
+                'required_first_code': first_code,
+                'required_second_code': second_code,
+                'required_second_name': self.second_names.get(key, ''),
+                'matched': True,
+                'match_type': 'exact',
+                'similarity': 1.0
+            })
+        
+        # 模糊匹配(对未精确匹配的)
+        required_remaining = required_second - exact_matches
+        outline_remaining = outline_second_keys - exact_matches
+        
+        if required_remaining and outline_remaining:
+            # 准备outline数据
+            outline_list = []
+            for key in outline_remaining:
+                first_code, second_code = key
+                title = outline_secondary.get(key, "")
+                outline_list.append({
+                    'key': key,
+                    'first_code': first_code,
+                    'second_code': second_code,
+                    'title': title
+                })
+            
+            # 对每个required进行模糊匹配
+            for req_key in required_remaining:
+                first_code, second_code = req_key
+                second_name = self.second_names.get(req_key, '')
+                first_name = self.first_names.get(first_code, '')
+                
+                # 获取详细定义
+                raw_content = self.second_raw_content.get((first_name, second_name))
+                
+                best_match = None
+                best_score = 0.0
+                
+                for item in outline_list:
+                    # 计算相似度
+                    score1 = self._calculate_enhanced_similarity(second_name, item['title'])
+                    score2 = self._calculate_enhanced_similarity(
+                        f"{first_name}{second_name}",
+                        item['title']
+                    )
+                    score = max(score1, score2)
+                    
+                    # 如果有详细定义,也计算
+                    if raw_content:
+                        score3 = self._calculate_enhanced_similarity(
+                            second_name,
+                            item['title'],
+                            raw_content
+                        )
+                        score = max(score, score3)
+                    
+                    if score > best_score:
+                        best_score = score
+                        best_match = item
+                
+                if best_score >= threshold:
+                    matched_second.add(req_key)
+                    match_details.append({
+                        'level': 'second',
+                        'required_first_code': first_code,
+                        'required_second_code': second_code,
+                        'required_second_name': second_name,
+                        'matched': True,
+                        'match_type': 'fuzzy',
+                        'similarity': best_score,
+                        'matched_title': best_match['title'] if best_match else None,
+                        'used_raw_content': raw_content is not None
+                    })
+                else:
+                    missing_second.add(req_key)
+                    match_details.append({
+                        'level': 'second',
+                        'required_first_code': first_code,
+                        'required_second_code': second_code,
+                        'required_second_name': second_name,
+                        'matched': False,
+                        'match_type': 'none',
+                        'similarity': best_score
+                    })
+        else:
+            missing_second = required_remaining
+        
+        # 构建缺失详情
+        missing_first_details = []
+        for code in sorted(missing_first, key=lambda x: self.first_seq.get(x, 0)):
+            missing_first_details.append({
+                'first_code': code,
+                'first_name': self.first_names.get(code, code),
+                'first_seq': self.first_seq.get(code, 0)
+            })
+        
+        missing_second_details = []
+        for key in sorted(missing_second, key=lambda x: (self.first_seq.get(x[0], 0), self.second_seq.get(x, 0))):
+            first_code, second_code = key
+            missing_second_details.append({
+                'first_code': first_code,
+                'first_name': self.first_names.get(first_code, first_code),
+                'first_seq': self.first_seq.get(first_code, 0),
+                'secondary_code': second_code,
+                'secondary_name': self.second_names.get(key, ''),
+                'second_seq': self.second_seq.get(key, 0)
+            })
+        
+        return {
+            'matched_first': matched_first,
+            'matched_second': matched_second,
+            'missing_first': missing_first_details,
+            'missing_second': missing_second_details,
+            'missing_first_count': len(missing_first),
+            'missing_second_count': len(missing_second),
+            'match_details': match_details
+        }

+ 14 - 5
core/construction_review/component/reviewers/completeness_reviewer.py

@@ -355,7 +355,9 @@ JSON输出:"""
         second_name: str = None,
         tertiary_items: List[TertiaryItem] = None,
         outline_title: str = None,
-        timeout: int = 30
+        timeout: int = 30,
+        first_seq: int = 0,
+        second_seq: int = 0
     ) -> Dict[str, str]:
         """
         使用大模型生成建议
@@ -905,7 +907,8 @@ JSON输出:"""
                     level="一级",
                     first_code=first_code,
                     first_name=first_name,
-                    first_seq=first_seq
+                    first_seq=first_seq,
+                    second_seq=0
                 )
 
                 if llm_result and llm_result.get("suggestion"):
@@ -945,7 +948,9 @@ JSON输出:"""
                         first_code=cat1,
                         first_name=first_name,
                         second_code=cat2,
-                        second_name=second_name
+                        second_name=second_name,
+                        first_seq=first_seq,
+                        second_seq=second_seq
                     )
 
                     if llm_result and llm_result.get("suggestion"):
@@ -988,7 +993,9 @@ JSON输出:"""
                     first_name=first_name,
                     second_code=cat2,
                     second_name=second_name,
-                    tertiary_items=missing_t_items
+                    tertiary_items=missing_t_items,
+                    first_seq=first_seq,
+                    second_seq=second_seq
                 )
 
                 if llm_result and llm_result.get("suggestion"):
@@ -1029,7 +1036,9 @@ JSON输出:"""
                     first_code="",
                     first_name=f_name,
                     second_name=sec_title,
-                    outline_title=sec_title
+                    outline_title=sec_title,
+                    first_seq=0,
+                    second_seq=0
                 )
 
                 if llm_result and llm_result.get("suggestion"):

+ 104 - 0
core/construction_review/component/reviewers/utils/inter_tool.py

@@ -344,6 +344,110 @@ class InterTool:
                 logger.info(f"🔍 内容时效性审查结果处理完成,添加 {len(batch_results)} 个问题项")
                 continue
 
+            # 🔧 特殊处理:outline_catalogue_check 的返回格式(目录一二级缺失统计)
+            if check_key == 'outline_catalogue_check' and isinstance(check_result, dict):
+                details = check_result.get('details', {})
+                missing_first = details.get('missing_first', [])
+                missing_second = details.get('missing_second', [])
+                missing_first_count = details.get('missing_first_count', 0)
+                missing_second_count = details.get('missing_second_count', 0)
+                
+                logger.debug(f"🔍 [DEBUG] 处理目录缺失统计结果,缺失一级: {missing_first_count}, 缺失二级: {missing_second_count}")
+                
+                # 如果没有缺失目录,添加一个通过记录
+                if not missing_first and not missing_second:
+                    review_lists.append({
+                        "check_item": "outline_catalogue_check",
+                        "chapter_code": chapter_code,
+                        "check_item_code": f"{chapter_code}_outline_catalogue_check",
+                        "check_result": {
+                            "issue_point": "目录结构完整",
+                            "location": "整篇文档",
+                            "suggestion": "无",
+                            "reason": "一二级目录结构符合标准规范要求",
+                            "risk_level": "low"
+                        },
+                        "exist_issue": False,
+                        "risk_info": {"risk_level": "low"}
+                    })
+                else:
+                    # 有缺失目录时,先生成汇总统计
+                    total_missing = missing_first_count + missing_second_count
+                    
+                    # 构建缺失目录名称列表
+                    missing_first_names = [item.get('first_name', '未知') for item in missing_first if isinstance(item, dict)]
+                    missing_second_names = [f"{item.get('first_name', '')}.{item.get('secondary_name', '未知')}" for item in missing_second if isinstance(item, dict)]
+                    
+                    # 构建建议文本
+                    suggestion_parts = []
+                    if missing_first_names:
+                        suggestion_parts.append(f"一级目录({missing_first_count}个):{', '.join(missing_first_names)}")
+                    if missing_second_names:
+                        suggestion_parts.append(f"二级目录({missing_second_count}个):{', '.join(missing_second_names)}")
+                    suggestion_text = "建议补充以下缺失目录:\n" + "\n".join(suggestion_parts) if suggestion_parts else "无"
+                    
+                    # 添加汇总统计问题(放在最前面)
+                    review_lists.append({
+                        "check_item": "outline_catalogue_check",
+                        "chapter_code": chapter_code,
+                        "check_item_code": f"{chapter_code}_outline_catalogue_check",
+                        "check_result": {
+                            "issue_point": f"目录缺失汇总统计(共缺失 {total_missing} 个目录)",
+                            "location": "整篇文档",
+                            "suggestion": suggestion_text,
+                            "reason": f"根据标准分类表对比,共发现 {total_missing} 个缺失目录:缺失一级 {missing_first_count} 个,缺失二级 {missing_second_count} 个",
+                            "risk_level": "medium",
+                            "summary": {
+                                "missing_first_count": missing_first_count,
+                                "missing_second_count": missing_second_count,
+                                "missing_total": total_missing,
+                                "missing_first_names": missing_first_names,
+                                "missing_second_names": missing_second_names
+                            }
+                        },
+                        "exist_issue": True,
+                        "risk_info": {"risk_level": "medium"}
+                    })
+                    
+                    # 然后逐个添加详细的缺失目录问题
+                    for item in missing_first:
+                        if isinstance(item, dict):
+                            review_lists.append({
+                                "check_item": "outline_catalogue_check",
+                                "chapter_code": chapter_code,
+                                "check_item_code": f"{chapter_code}_outline_catalogue_check",
+                                "check_result": {
+                                    "issue_point": f"缺失一级目录:{item.get('first_name', '未知')}",
+                                    "location": f"{item.get('first_seq', '')}. {item.get('first_name', '')}",
+                                    "suggestion": f"补充一级目录:{item.get('first_name', '')}(编码:{item.get('first_code', '')})",
+                                    "reason": "根据标准分类表,该一级目录应当出现但未找到",
+                                    "risk_level": "medium"
+                                },
+                                "exist_issue": True,
+                                "risk_info": {"risk_level": "medium"}
+                            })
+                    
+                    for item in missing_second:
+                        if isinstance(item, dict):
+                            review_lists.append({
+                                "check_item": "outline_catalogue_check",
+                                "chapter_code": chapter_code,
+                                "check_item_code": f"{chapter_code}_outline_catalogue_check",
+                                "check_result": {
+                                    "issue_point": f"缺失二级目录:{item.get('secondary_name', '未知')}",
+                                    "location": f"{item.get('first_seq', '')}.{item.get('second_seq', '')} {item.get('first_name', '')} > {item.get('secondary_name', '')}",
+                                    "suggestion": f"补充二级目录:{item.get('secondary_name', '')}(编码:{item.get('secondary_code', '')})",
+                                    "reason": f"在'{item.get('first_name', '未知')}'章节下,该二级目录应当出现但未找到",
+                                    "risk_level": "medium"
+                                },
+                                "exist_issue": True,
+                                "risk_info": {"risk_level": "medium"}
+                            })
+                
+                total_missing = missing_first_count + missing_second_count
+                logger.info(f"🔍 目录缺失统计处理完成,缺失一级: {missing_first_count}, 缺失二级: {missing_second_count}, 共生成 {len(review_lists)} 个问题项(含汇总)")
+                continue
+
             # 🔧 类型安全检查:支持字典和 base_reviewer.ReviewResult 对象
             is_dict = isinstance(check_result, dict)
             is_review_result = hasattr(check_result, 'details') and hasattr(check_result, 'success')

+ 11 - 2
core/construction_review/workflows/ai_review_workflow.py

@@ -298,7 +298,8 @@ class AIReviewWorkflow:
                 'reference_check': 'reference_basis_reviewer',
                 'sensitive_check': 'check_sensitive',
                 'non_parameter_compliance_check': 'check_non_parameter_compliance',
-                'parameter_compliance_check': 'check_parameter_compliance'
+                'parameter_compliance_check': 'check_parameter_compliance',
+                'outline_catalogue_check': 'check_outline_catalogue'  # 目录一二级缺失检查(模糊匹配)
             }
 
             # 获取审查项配置
@@ -457,10 +458,18 @@ class AIReviewWorkflow:
             # 主流程完成后,串行处理 catalogue(目录审查)
             # 注意:catalogue 是系统强制添加的审查单元,已计入 total_chunks
             logger.info("开始处理目录审查(catalogue)")
+            # 从配置中获取 catalogue 章节的方法列表,默认包含 check_completeness 和 outline_catalogue_check
+            catalogue_funcs = review_item_dict_sorted.get("catalogue", ["check_completeness", "outline_catalogue_check"])
+            # 确保 check_completeness 在列表中(向后兼容)
+            if "check_completeness" not in catalogue_funcs:
+                catalogue_funcs = ["check_completeness"] + catalogue_funcs
+            # 确保 outline_catalogue_check 在列表中(新增目录缺失统计)
+            if "outline_catalogue_check" not in catalogue_funcs:
+                catalogue_funcs.append("outline_catalogue_check")
             chunks_completed, all_issues = await self.core_fun._process_chapter_item(
                 "catalogue",                 # chapter_code
                 catalogue,                   # chapter_content
-                ["check_completeness"],      # func_names
+                catalogue_funcs,             # func_names(从配置获取)
                 state,
                 all_issues,
                 completed_chunks,

+ 47 - 4
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -354,7 +354,18 @@ class AIReviewCoreFun:
         """
       
         # 从ai_review_engine获取对应的方法
-        if not hasattr(self.ai_review_engine, func_name):
+        # 方法名映射:配置名 -> 实际方法名
+        # 用于处理配置标识名与AIReviewEngine方法名不一致的情况
+        method_name_mapping = {
+            'outline_catalogue_check': 'check_outline_catalogue',
+        }
+
+        # 转换方法名(如果存在映射)
+        actual_method_name = method_name_mapping.get(func_name, func_name)
+        original_func_name = func_name
+
+        # 从ai_review_engine获取对应的方法
+        if not hasattr(self.ai_review_engine, actual_method_name):
             logger.warning(f"AIReviewEngine中未找到方法: {func_name}")
             # 返回错误结果的 UnitReviewResult
             return UnitReviewResult(
@@ -367,16 +378,16 @@ class AIReviewCoreFun:
                 is_sse_push=True
             )
 
-        method = getattr(self.ai_review_engine, func_name)
+        method = getattr(self.ai_review_engine, actual_method_name)
 
         # 基础参数
         trace_id = f"{state['callback_task_id']}_{chapter_code}_chunk{chunk_index}"
-        stage_name = f"{chapter_code}_{func_name}"
+        stage_name = f"{chapter_code}_{original_func_name}"
 
         # 获取块内容
         review_content = chunk.get("content", "")
         is_complete_field = chunk.get("is_complete_field", False)
-        logger.debug(f"执行审查: {trace_id} -> {func_name}")
+        logger.debug(f"执行审查: {trace_id} -> {original_func_name}")
 
         # 根据func_name构建对应的参数并调用
         if func_name == "sensitive_word_check" and not is_complete_field:
@@ -571,6 +582,38 @@ class AIReviewCoreFun:
                 is_sse_push=True
             )
 
+        elif original_func_name == "outline_catalogue_check":
+            # 目录一二级缺失检查(模糊匹配)- 针对整个文档的 outline 进行检查
+            outline_data = state.get("structured_content", {})
+            outline_result = await self.ai_review_engine.check_outline_catalogue(
+                trace_id_idx=trace_id,
+                outline_data=outline_data,
+                state=state,
+                stage_name=stage_name
+            )
+            
+            # 计算风险等级:如果有缺失目录则标记为 medium
+            overall_risk = "low"
+            if outline_result.get("success"):
+                missing_count = (
+                    outline_result.get("details", {}).get("missing_first_count", 0) +
+                    outline_result.get("details", {}).get("missing_second_count", 0)
+                )
+                if missing_count > 0:
+                    overall_risk = "medium"
+            else:
+                overall_risk = "error"
+            
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={"outline_catalogue_check": outline_result},
+                technical_compliance={},
+                rag_enhanced={},
+                overall_risk=overall_risk,
+                is_sse_push=True
+            )
+
         else:
             # 处理 check_completeness 但 is_complete_field=False 的情况
             if func_name == "check_completeness" and not is_complete_field:

+ 2 - 2
views/construction_review/launch_review.py

@@ -190,8 +190,8 @@ def validate_review_item_config(review_item_config: List[str]) -> None:
             invalid_chapter.append(chapter_code)
             continue  # 章节不支持时不继续检查审查项
 
-        # 5. 特殊规则:目录章节只能使用完整性审查
-        if chapter_code == "catalogue" and review_dim != "completeness_check":
+        # 5. 特殊规则:目录章节只能使用完整性审查或目录缺失统计
+        if chapter_code == "catalogue" and review_dim not in ["completeness_check", "outline_catalogue_check"]:
             catalogue_invalid.append(item)
             continue  # 目录章节违反规则时不继续检查