فهرست منبع

fix(sgsc-时效性审查模型-xth): 修复时效性bug
- 修复过滤逻辑过于严格导致漏检
- 放宽过滤条件

suhua31 2 هفته پیش
والد
کامیت
46fc37cf1e

+ 101 - 0
core/construction_review/component/ai_review_engine.py

@@ -1129,6 +1129,107 @@ class AIReviewEngine(BaseReviewer):
                 }
             }
         
+    async def timeliness_content_reviewer(self, review_data: Dict[str, Any], trace_id: str,
+                                state: dict = None, stage_name: str = None) -> Dict[str, Any]:
+        """
+        执行三级分类内容时效性审查:检查tertiary_classification_details中引用的规范是否过时
+
+        Args:
+            review_data: 待审查数据,包含tertiary_classification_details
+            trace_id: 追踪ID
+            state: 状态字典
+            stage_name: 阶段名称
+
+        Returns:
+            审查结果字典,包含内容时效性审查结果
+        """
+        start_time = time.time()
+        try:
+            logger.info(f"开始三级分类内容时效性审查,trace_id: {trace_id}")
+
+            # 提取三级分类详情
+            tertiary_details = review_data.get('tertiary_classification_details', [])
+            max_concurrent = review_data.get('max_concurrent', 4)
+
+            if not tertiary_details:
+                logger.warning("三级分类详情为空,将跳过内容时效性审查")
+                return {
+                    "timeliness_content_review_results": {
+                        "review_results": [],
+                        "total_items": 0,
+                        "issue_items": 0,
+                        "execution_time": time.time() - start_time,
+                        "error_message": None,
+                        "message": "未找到三级分类详情,跳过内容时效性审查"
+                    }
+                }
+
+            logger.info(f"提取到 {len(tertiary_details)} 个三级分类详情")
+
+            # 调用内容时效性审查
+            try:
+                # 使用信号量控制并发
+                async with self.semaphore:
+                    # 从state中获取progress_manager和callback_task_id
+                    progress_manager = state.get('progress_manager') if state else None
+                    callback_task_id = state.get('callback_task_id') if state else None
+
+                    # 调用内容时效性审查器
+                    from core.construction_review.component.reviewers.timeliness_content_reviewer import ContentTimelinessReviewer
+                    async with ContentTimelinessReviewer(max_concurrent=max_concurrent) as reviewer:
+                        timeliness_content_results = await reviewer.review_tertiary_content(
+                            tertiary_details=tertiary_details,
+                            collection_name="first_bfp_collection_status",
+                            progress_manager=progress_manager,
+                            callback_task_id=callback_task_id
+                        )
+
+                    logger.info(f"内容时效性审查完成,发现问题数量: {len(timeliness_content_results)}")
+
+                    # 统计审查结果
+                    total_items = len(timeliness_content_results)
+                    issue_items = sum(1 for item in timeliness_content_results if item.get('exist_issue', False))
+
+                    logger.info(f"审查统计 - 总规范引用: {total_items}, 问题项: {issue_items}")
+
+            except Exception as e:
+                logger.error(f"内容时效性审查失败: {str(e)}")
+                return {
+                    "timeliness_content_review_results": {
+                        "review_results": [],
+                        "total_items": 0,
+                        "issue_items": 0,
+                        "execution_time": time.time() - start_time,
+                        "error_message": f"内容时效性审查失败: {str(e)}"
+                    }
+                }
+
+            # 返回完整结果
+            return {
+                "timeliness_content_review_results": {
+                    "review_results": timeliness_content_results,
+                    "total_items": total_items,
+                    "issue_items": issue_items,
+                    "execution_time": time.time() - start_time,
+                    "error_message": None
+                }
+            }
+
+        except Exception as e:
+            execution_time = time.time() - start_time
+            error_msg = f"内容时效性审查失败: {str(e)}"
+            logger.error(error_msg, exc_info=True)
+
+            return {
+                "timeliness_content_review_results": {
+                    "review_results": [],
+                    "total_items": 0,
+                    "issue_items": 0,
+                    "execution_time": execution_time,
+                    "error_message": error_msg
+                }
+            }
+
     async def timeliness_basis_reviewer(self, review_data: Dict[str, Any], trace_id: str,
                                 state: dict = None, stage_name: str = None) -> Dict[str, Any]:
         """

+ 34 - 2
core/construction_review/component/reviewers/timeliness_basis_reviewer.py

@@ -224,12 +224,44 @@ class BasisReviewService:
                 
                 # 获取match_reference_files的结果并过滤
                 match_result = await match_reference_files(reference_text=grouped_candidates, review_text=basis_items)
-                # 解析JSON并过滤:same_name_current和exact_match_info都是""的项过滤掉
+
+                # 记录完整的匹配结果用于调试
+                logger.info(f"批次 match_reference_files 原始结果: {match_result[:500]}...")
+
+                # 解析JSON并过滤:保留有相关信息的项
                 try:
                     match_data = json.loads(match_result)
                     # 提取items字段(match_reference_files返回{items: [...]}格式)
                     items = match_data.get('items', match_data) if isinstance(match_data, dict) else match_data
-                    filtered_data = [item for item in items if item.get('exact_match_info') != ""]
+
+                    logger.info(f"解析到 {len(items)} 个匹配项")
+                    for idx, item in enumerate(items):
+                        logger.info(f"  项{idx}: review_item={item.get('review_item', 'unknown')}, "
+                                  f"has_related_file={item.get('has_related_file')}, "
+                                  f"exact_match_info={item.get('exact_match_info')}, "
+                                  f"same_name_current={item.get('same_name_current')}")
+
+                    # 放宽过滤条件:只要有相关文件信息就进行审查
+                    filtered_data = [
+                        item for item in items
+                        if item.get('has_related_file') or
+                           item.get('exact_match_info') or
+                           item.get('same_name_current')
+                    ]
+
+                    logger.info(f"过滤后保留 {len(filtered_data)} 个项")
+
+                    # 记录被过滤掉的项目用于调试
+                    skipped_items = [
+                        item for item in items
+                        if not (item.get('has_related_file') or
+                               item.get('exact_match_info') or
+                               item.get('same_name_current'))
+                    ]
+                    if skipped_items:
+                        logger.warning(f"跳过了 {len(skipped_items)} 个无参考信息的编制依据: "
+                                     f"{[item.get('review_item', 'unknown') for item in skipped_items]}")
+
                     # 如果没有过滤出数据,直接返回空结果
                     if not filtered_data:
                         logger.info(f"过滤后没有符合条件的编制依据,跳过后续检查")

+ 487 - 0
core/construction_review/component/reviewers/timeliness_content_reviewer.py

@@ -0,0 +1,487 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+三级分类内容时效性审查模块
+
+功能:从三级分类详情的content字段中提取规范引用,并进行时效性审查。
+主要用于检测文本内容中引用的规范是否过时(如JTG B01-2011应更新为JTG B01-2014)。
+"""
+
+import re
+import json
+import asyncio
+from typing import Any, Dict, List, Optional, Tuple
+from dataclasses import dataclass, field
+from functools import partial
+
+from foundation.observability.logger.loggering import review_logger as logger
+from core.construction_review.component.reviewers.utils.reference_matcher import match_reference_files
+from core.construction_review.component.reviewers.utils.timeliness_determiner import determine_timeliness_issue
+from core.construction_review.component.reviewers.timeliness_basis_reviewer import BasisSearchEngine, StandardizedResponseProcessor
+
+
+@dataclass
+class StandardReference:
+    """规范引用数据类"""
+    original_text: str           # 原始文本,如"《公路工程技术标准》(JTG B01-2011)"
+    name: str                    # 规范名称,如"公路工程技术标准"
+    number: str                  # 规范编号,如"JTG B01-2011"
+    context: str                 # 上下文内容
+    location_info: Dict[str, Any] = field(default_factory=dict)  # 位置信息
+
+
+@dataclass
+class ContentTimelinessResult:
+    """内容时效性审查结果"""
+    reference: StandardReference
+    has_issue: bool
+    issue_type: str              # 问题类型
+    suggestion: str
+    reason: str
+    risk_level: str              # 无风险 / 高风险
+
+
+class StandardExtractor:
+    """规范引用提取器"""
+
+    # 规范编号正则模式(匹配类似 GB 50010-2010、JTG B01-2014、GB/T 50502-2020 等格式)
+    STANDARD_NUMBER_PATTERNS = [
+        # 中国国家标准:GB 50010-2010、GB/T 50502-2020
+        r'GB(?:/T)?\s*\d{4,5}(?:\.\d+)?\s*-\s*\d{4}',
+        # 中国行业标准:JTG B01-2014、JTG D60-2015、JTG/T 3650-2020
+        r'[A-Z]{2,3}(?:/T)?\s*[A-Z]?\s*\d{2,4}(?:\.\d+)?\s*-\s*\d{4}',
+        # 地方标准:DB11/T 1234-2020
+        r'DB\d{2}(?:/T)?\s*\d{4,5}\s*-\s*\d{4}',
+        # 团体标准:T/CECS 123-2020
+        r'T/\w+\s*\d{3,5}\s*-\s*\d{4}',
+    ]
+
+    # 规范名称与编号组合的正则模式
+    STANDARD_FULL_PATTERN = re.compile(
+        r'《([^《》]+)》\s*[((]([^))]+)[))]',
+        re.MULTILINE
+    )
+
+    # 仅规范编号模式
+    STANDARD_NUMBER_ONLY_PATTERN = re.compile(
+        r'(' + '|'.join(STANDARD_NUMBER_PATTERNS) + r')',
+        re.MULTILINE | re.IGNORECASE
+    )
+
+    def __init__(self):
+        self.extracted_cache: Dict[str, List[StandardReference]] = {}
+
+    def extract_from_content(self, content: str, location_info: Optional[Dict] = None) -> List[StandardReference]:
+        """
+        从内容文本中提取规范引用
+
+        Args:
+            content: 内容文本(包含行号标记如 <80>)
+            location_info: 位置信息(如三级分类代码、行号范围等)
+
+        Returns:
+            List[StandardReference]: 提取的规范引用列表
+        """
+        if not content:
+            return []
+
+        # 使用缓存
+        cache_key = hash(content)
+        if cache_key in self.extracted_cache:
+            return self.extracted_cache[cache_key]
+
+        references = []
+
+        # 1. 提取完整格式:《名称》(编号)
+        full_matches = self.STANDARD_FULL_PATTERN.findall(content)
+        for name, number in full_matches:
+            # 验证编号是否符合规范格式
+            if self._is_valid_standard_number(number):
+                original = f"《{name}》({number})"
+                # 查找该引用在原文中的位置
+                context = self._extract_context(content, original)
+                ref = StandardReference(
+                    original_text=original,
+                    name=name.strip(),
+                    number=number.strip(),
+                    context=context,
+                    location_info=location_info or {}
+                )
+                references.append(ref)
+
+        # 2. 提取孤立的规范编号(用于补充)
+        number_matches = self.STANDARD_NUMBER_ONLY_PATTERN.findall(content)
+        for match in number_matches:
+            number = match if isinstance(match, str) else match[0]
+            # 检查是否已包含在完整格式中
+            if not any(number in ref.number for ref in references):
+                # 尝试提取该编号附近的上下文作为名称
+                name = self._infer_name_from_context(content, number)
+                original = f"《{name}》({number})" if name else number
+                ref = StandardReference(
+                    original_text=original,
+                    name=name or "",
+                    number=number.strip(),
+                    context=self._extract_context(content, number),
+                    location_info=location_info or {}
+                )
+                references.append(ref)
+
+        # 去重(基于original_text)
+        seen = set()
+        unique_refs = []
+        for ref in references:
+            if ref.original_text not in seen:
+                seen.add(ref.original_text)
+                unique_refs.append(ref)
+
+        self.extracted_cache[cache_key] = unique_refs
+        return unique_refs
+
+    def _is_valid_standard_number(self, number: str) -> bool:
+        """验证是否为有效的规范编号"""
+        number = number.strip().upper()
+        # 检查是否匹配任一规范编号模式
+        for pattern in self.STANDARD_NUMBER_PATTERNS:
+            if re.match(pattern, number, re.IGNORECASE):
+                return True
+        return False
+
+    def _extract_context(self, content: str, target: str, window: int = 50) -> str:
+        """提取目标文本的上下文"""
+        idx = content.find(target)
+        if idx == -1:
+            return ""
+        start = max(0, idx - window)
+        end = min(len(content), idx + len(target) + window)
+        return content[start:end].strip()
+
+    def _infer_name_from_context(self, content: str, number: str) -> str:
+        """从上下文推断规范名称"""
+        # 查找编号附近的《名称》格式
+        pattern = re.compile(r'《([^《》]{3,50})》[^《》]{0,30}' + re.escape(number))
+        match = pattern.search(content)
+        if match:
+            return match.group(1)
+        return ""
+
+
+class ContentTimelinessReviewer:
+    """三级分类内容时效性审查器"""
+
+    def __init__(self, max_concurrent: int = 4):
+        self.extractor = StandardExtractor()
+        self.search_engine = BasisSearchEngine()
+        self.response_processor = StandardizedResponseProcessor()
+        self.max_concurrent = max_concurrent
+        self._semaphore = None
+
+    async def __aenter__(self):
+        """异步上下文管理器入口"""
+        if self._semaphore is None:
+            self._semaphore = asyncio.Semaphore(self.max_concurrent)
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        """异步上下文管理器出口"""
+        return False
+
+    async def review_tertiary_content(
+        self,
+        tertiary_details: List[Dict[str, Any]],
+        collection_name: str = "first_bfp_collection_status",
+        progress_manager=None,
+        callback_task_id: str = None
+    ) -> List[Dict[str, Any]]:
+        """
+        审查三级分类内容中的规范时效性
+
+        Args:
+            tertiary_details: 三级分类详情列表,每项包含content字段
+            collection_name: Milvus集合名称
+            progress_manager: 进度管理器(可选,用于SSE推送)
+            callback_task_id: 回调任务ID(可选)
+
+        Returns:
+            List[Dict]: 标准化的审查结果列表
+        """
+        if not tertiary_details:
+            return []
+
+        # 1. 从所有三级分类内容中提取规范引用
+        all_references = []
+        reference_to_location = {}  # 用于追踪引用来源
+
+        for detail in tertiary_details:
+            content = detail.get("content", "")
+            if not content:
+                continue
+
+            location_info = {
+                "third_category_name": detail.get("third_category_name", ""),
+                "third_category_code": detail.get("third_category_code", ""),
+                "start_line": detail.get("start_line", 0),
+                "end_line": detail.get("end_line", 0),
+            }
+
+            refs = self.extractor.extract_from_content(content, location_info)
+            for ref in refs:
+                all_references.append(ref)
+                # 记录引用来源(用于后续结果关联)
+                if ref.original_text not in reference_to_location:
+                    reference_to_location[ref.original_text] = []
+                reference_to_location[ref.original_text].append(location_info)
+
+        if not all_references:
+            logger.info("未从三级分类内容中提取到规范引用")
+            return []
+
+        logger.info(f"从三级分类内容中提取到 {len(all_references)} 个规范引用")
+
+        # 2. 对提取的规范进行时效性审查
+        all_issues = []
+
+        # 分批处理(每批3个)
+        batch_size = 3
+        ref_texts = [ref.original_text for ref in all_references]
+        total_batches = (len(ref_texts) + batch_size - 1) // batch_size
+
+        for i in range(0, len(ref_texts), batch_size):
+            batch_refs = all_references[i:i + batch_size]
+            batch_texts = [ref.original_text for ref in batch_refs]
+            batch_num = i // batch_size + 1
+
+            try:
+                async with self._semaphore:
+                    # 搜索参考规范
+                    search_tasks = []
+                    for ref in batch_refs:
+                        task = asyncio.create_task(
+                            self._async_search_standard(ref.number, collection_name)
+                        )
+                        search_tasks.append(task)
+
+                    search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
+
+                    # 构建参考文本列表
+                    grouped_candidates = []
+                    for j, result in enumerate(search_results):
+                        if isinstance(result, Exception):
+                            logger.error(f"搜索失败 '{batch_refs[j].original_text}': {result}")
+                            grouped_candidates.append([])
+                        else:
+                            texts = [item.get("text_content", "") for item in result if item]
+                            grouped_candidates.append(texts)
+
+                    # 匹配参考文件
+                    match_result = await match_reference_files(
+                        reference_text=grouped_candidates,
+                        review_text=batch_texts
+                    )
+
+                    # 记录完整的匹配结果用于调试
+                    logger.info(f"批次{batch_num} match_reference_files 原始结果: {match_result[:500]}...")
+
+                    # 过滤:保留有相关信息的项进行审查
+                    # 条件:has_related_file为true 或 exact_match_info不为空 或 same_name_current不为空
+                    try:
+                        match_data = json.loads(match_result)
+                        items = match_data.get('items', match_data) if isinstance(match_data, dict) else match_data
+
+                        logger.info(f"批次{batch_num} 解析到 {len(items)} 个匹配项")
+                        for idx, item in enumerate(items):
+                            logger.info(f"  项{idx}: review_item={item.get('review_item', 'unknown')}, "
+                                      f"has_related_file={item.get('has_related_file')}, "
+                                      f"exact_match_info={item.get('exact_match_info')}, "
+                                      f"same_name_current={item.get('same_name_current')}")
+
+                        # 放宽过滤条件:只要有相关文件信息就进行审查
+                        filtered_data = [
+                            item for item in items
+                            if item.get('has_related_file') or
+                               item.get('exact_match_info') or
+                               item.get('same_name_current')
+                        ]
+
+                        logger.info(f"批次{batch_num} 过滤后保留 {len(filtered_data)} 个项")
+
+                        # 记录被过滤掉的项目用于调试
+                        skipped_items = [
+                            item for item in items
+                            if not (item.get('has_related_file') or
+                                   item.get('exact_match_info') or
+                                   item.get('same_name_current'))
+                        ]
+                        if skipped_items:
+                            logger.warning(f"批次{batch_num} 跳过了 {len(skipped_items)} 个无参考信息的项: "
+                                         f"{[item.get('review_item', 'unknown') for item in skipped_items]}")
+
+                        if not filtered_data:
+                            logger.info(f"批次{batch_num}: 没有符合审查条件的规范引用")
+                            continue
+
+                        # 重新构建JSON
+                        if isinstance(match_data, dict) and 'items' in match_data:
+                            match_result = json.dumps({"items": filtered_data}, ensure_ascii=False)
+                        else:
+                            match_result = json.dumps(filtered_data, ensure_ascii=False)
+
+                        # 判定时效性问题
+                        llm_out = await determine_timeliness_issue(match_result)
+
+                        # 处理响应
+                        standardized_result = self.response_processor.process_llm_response(
+                            llm_out,
+                            "content_timeliness_check",
+                            "content",
+                            "content_timeliness_check"
+                        )
+
+                        # 3. 增强结果:添加位置信息
+                        for item in standardized_result:
+                            review_item = item.get("check_result", {}).get("location", "")
+                            if review_item in reference_to_location:
+                                locations = reference_to_location[review_item]
+                                # 添加位置信息到结果
+                                item["location_info"] = locations
+                                # 添加三级分类上下文
+                                contexts = []
+                                for loc in locations:
+                                    ctx = f"[{loc.get('third_category_name', '')}] 第{loc.get('start_line', 0)}-{loc.get('end_line', 0)}行"
+                                    contexts.append(ctx)
+                                item["content_context"] = "; ".join(contexts)
+
+                                # 更新location字段为更详细的描述
+                                if contexts:
+                                    item["check_result"]["location"] = f"{review_item}(出现在:{item['content_context']})"
+
+                        all_issues.extend(standardized_result)
+
+                        # SSE推送(如果提供了progress_manager)
+                        if progress_manager and callback_task_id:
+                            try:
+                                await progress_manager.update_stage_progress(
+                                    callback_task_id=callback_task_id,
+                                    stage_name=f"内容时效性审查-批次{batch_num}",
+                                    status="processing",
+                                    message=f"完成第{batch_num}/{total_batches}批次内容时效性审查,{len(batch_refs)}项",
+                                    overall_task_status="processing",
+                                    event_type="processing",
+                                    issues=standardized_result
+                                )
+                            except Exception as e:
+                                logger.error(f"SSE推送失败: {e}")
+
+                    except (json.JSONDecodeError, TypeError) as e:
+                        logger.warning(f"处理匹配结果时出错: {e}")
+                        continue
+
+            except Exception as e:
+                logger.error(f"批次 {batch_num} 处理失败: {e}")
+                error_result = {
+                    "check_item": "content_timeliness_check",
+                    "chapter_code": "content",
+                    "check_item_code": "content_timeliness_check",
+                    "check_result": {"error": str(e), "batch_num": batch_num},
+                    "exist_issue": True,
+                    "risk_info": {"risk_level": "medium"}
+                }
+                all_issues.append(error_result)
+
+        # 统计结果
+        issue_count = sum(1 for item in all_issues if item.get("exist_issue", False))
+        logger.info(f"内容时效性审查完成:总计 {len(all_references)} 项引用,发现问题 {issue_count} 项")
+
+        return all_issues
+
+    async def _async_search_standard(
+        self,
+        standard_number: str,
+        collection_name: str,
+        top_k: int = 3
+    ) -> List[dict]:
+        """异步搜索单个规范"""
+        try:
+            loop = asyncio.get_running_loop()
+            func = partial(
+                self.search_engine.hybrid_search,
+                collection_name=collection_name,
+                query_text=standard_number,
+                top_k=top_k,
+                ranker_type="weighted",
+                dense_weight=0.3,
+                sparse_weight=0.7
+            )
+            retrieved = await loop.run_in_executor(None, func)
+            logger.debug(f"搜索 '{standard_number}' -> 找到 {len(retrieved or [])} 个结果")
+            return retrieved or []
+        except Exception as e:
+            logger.error(f"搜索失败 '{standard_number}': {e}")
+            return []
+
+
+# ===== 便捷函数 =====
+
+async def review_tertiary_content_timeliness(
+    tertiary_details: List[Dict[str, Any]],
+    collection_name: str = "first_bfp_collection_status",
+    max_concurrent: int = 4,
+    progress_manager=None,
+    callback_task_id: str = None
+) -> List[Dict[str, Any]]:
+    """
+    审查三级分类内容时效性的便捷函数
+
+    Args:
+        tertiary_details: 三级分类详情列表
+        collection_name: Milvus集合名称
+        max_concurrent: 最大并发数
+        progress_manager: 进度管理器(可选)
+        callback_task_id: 回调任务ID(可选)
+
+    Returns:
+        List[Dict]: 标准化的审查结果列表
+    """
+    async with ContentTimelinessReviewer(max_concurrent=max_concurrent) as reviewer:
+        return await reviewer.review_tertiary_content(
+            tertiary_details=tertiary_details,
+            collection_name=collection_name,
+            progress_manager=progress_manager,
+            callback_task_id=callback_task_id
+        )
+
+
+# ===== 测试代码 =====
+if __name__ == "__main__":
+    # 测试数据
+    test_tertiary_details = [
+        {
+            "third_category_name": "国家方针、政策、标准和设计文件",
+            "third_category_code": "NationalPoliciesStandardsAndDesignDocument",
+            "start_line": 80,
+            "end_line": 82,
+            "content": "<80> 国家方针、政策、标准和设计文件\n<81> 《公路工程技术标准》(JTG B01-2011)\n<82> 《公路桥涵设计通用规范》(JTG D60-2015)"
+        },
+        {
+            "third_category_name": "施工技术标准",
+            "third_category_code": "ConstructionTechnicalStandards",
+            "start_line": 100,
+            "end_line": 102,
+            "content": "<100> 施工技术标准\n<101> 《公路桥涵施工技术规范》(JTG/T 3650-2020)\n<102> 《混凝土结构设计规范》(GB 50010-2010)"
+        }
+    ]
+
+    print(f"测试 {len(test_tertiary_details)} 个三级分类内容...")
+
+    # 测试提取器
+    extractor = StandardExtractor()
+    for detail in test_tertiary_details:
+        refs = extractor.extract_from_content(detail["content"])
+        print(f"\n从 '{detail['third_category_name']}' 提取到 {len(refs)} 个规范引用:")
+        for ref in refs:
+            print(f"  - {ref.original_text}")
+
+    # 测试完整审查流程(需要Milvus连接)
+    # result = asyncio.run(review_tertiary_content_timeliness(test_tertiary_details))
+    # print("\n审查结果:")
+    # print(json.dumps(result, ensure_ascii=False, indent=2))

+ 22 - 1
core/construction_review/component/reviewers/utils/inter_tool.py

@@ -298,7 +298,7 @@ class InterTool:
                 reference_data = check_result.get('reference_basis_review_results', {})
                 batch_results = reference_data.get('review_results', [])
                 logger.debug(f"🔍 [DEBUG] 处理规范性审查结果,批次数: {len(batch_results)}")
-                
+
                 for batch in batch_results:
                     if isinstance(batch, list):
                         for item in batch:
@@ -323,6 +323,27 @@ class InterTool:
                 logger.info(f"🔍 规范性审查结果处理完成,添加 {len(review_lists)} 个问题项")
                 continue
 
+            # 🔧 特殊处理:timeliness_content_reviewer 的返回格式
+            if check_key == 'timeliness_content_reviewer' and isinstance(check_result, dict):
+                content_timeliness_data = check_result.get('timeliness_content_review_results', {})
+                batch_results = content_timeliness_data.get('review_results', [])
+                logger.debug(f"🔍 [DEBUG] 处理内容时效性审查结果,问题数: {len(batch_results)}")
+
+                for item in batch_results:
+                    if isinstance(item, dict):
+                        review_lists.append({
+                            "check_item": item.get('check_item', 'content_timeliness_check'),
+                            "chapter_code": item.get('chapter_code', chapter_code),
+                            "check_item_code": item.get('check_item_code', f"{chapter_code}_content_timeliness_check"),
+                            "check_result": item.get('check_result', item),
+                            "exist_issue": item.get('exist_issue', False),
+                            "risk_info": item.get('risk_info', {"risk_level": "low"}),
+                            "location_info": item.get('location_info', []),
+                            "content_context": item.get('content_context', '')
+                        })
+                logger.info(f"🔍 内容时效性审查结果处理完成,添加 {len(batch_results)} 个问题项")
+                continue
+
             # 🔧 类型安全检查:支持字典和 base_reviewer.ReviewResult 对象
             is_dict = isinstance(check_result, dict)
             is_review_result = hasattr(check_result, 'details') and hasattr(check_result, 'success')

+ 1 - 0
core/construction_review/workflows/ai_review_workflow.py

@@ -293,6 +293,7 @@ class AIReviewWorkflow:
                 'semantic_logic_check': 'check_semantic_logic',
                 'completeness_check': 'check_completeness',
                 'timeliness_check': 'timeliness_basis_reviewer',
+                'timeliness_content_check': 'timeliness_content_reviewer',
                 'reference_check': 'reference_basis_reviewer',
                 'sensitive_check': 'check_sensitive',
                 'non_parameter_compliance_check': 'check_non_parameter_compliance',

+ 25 - 0
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -546,6 +546,31 @@ class AIReviewCoreFun:
                 is_sse_push=True
             )
 
+        # timeliness_content_reviewer:三级分类内容时效性审查(逐块处理)
+        elif func_name == "timeliness_content_reviewer" and not is_complete_field:
+            # 从chunk中获取三级分类详情
+            tertiary_details = chunk.get("tertiary_classification_details", [])
+            review_data = {
+                "tertiary_classification_details": tertiary_details,  # 三级分类详情
+                "max_concurrent": 4
+            }
+            raw_result = await method(
+                review_data=review_data,
+                trace_id=trace_id,
+                state=state,
+                stage_name=stage_name
+            )
+            # 基础审查方法,放入 basic_compliance
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={func_name: raw_result},
+                technical_compliance={},
+                rag_enhanced={},
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
+            )
+
         else:
             # 处理 check_completeness 但 is_complete_field=False 的情况
             if func_name == "check_completeness" and not is_complete_field:

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 145 - 0
problem.json


+ 141 - 0
test_content_timeliness.py

@@ -0,0 +1,141 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+测试内容时效性审查是否正确处理 JTG B01-2011 的情况
+"""
+
+import json
+import asyncio
+from core.construction_review.component.reviewers.timeliness_content_reviewer import (
+    StandardExtractor, ContentTimelinessReviewer
+)
+
+# 测试数据 - 模拟 problem.json 中的情况
+test_tertiary_details = [
+    {
+        "third_category_name": "国家方针、政策、标准和设计文件",
+        "third_category_code": "NationalPoliciesStandardsAndDesignDocument",
+        "start_line": 80,
+        "end_line": 82,
+        "content": """<80> 国家方针、政策、标准和设计文件
+<81> 《公路工程技术标准》(JTG B01-2011)
+<82> 《公路桥涵设计通用规范》(JTG D60-2015)"""
+    }
+]
+
+# 测试提取器
+def test_extractor():
+    print("=" * 60)
+    print("测试规范提取器")
+    print("=" * 60)
+
+    extractor = StandardExtractor()
+
+    for detail in test_tertiary_details:
+        refs = extractor.extract_from_content(detail["content"])
+        print(f"\n从 '{detail['third_category_name']}' 提取到 {len(refs)} 个规范引用:")
+        for ref in refs:
+            print(f"  - 原始文本: {ref.original_text}")
+            print(f"    名称: {ref.name}")
+            print(f"    编号: {ref.number}")
+            print(f"    上下文: {ref.context[:100]}...")
+
+    return refs
+
+# 测试过滤逻辑
+def test_filter_logic():
+    print("\n" + "=" * 60)
+    print("测试过滤逻辑")
+    print("=" * 60)
+
+    # 模拟 match_reference_files 返回的数据
+    mock_match_result = [
+        {
+            "review_item": "《公路工程技术标准》(JTG B01-2011)",
+            "has_related_file": True,
+            "has_exact_match": False,
+            "exact_match_info": "",
+            "same_name_current": "《公路工程技术标准》(JTG B01-2014)状态为现行"
+        },
+        {
+            "review_item": "《公路桥涵设计通用规范》(JTG D60-2015)",
+            "has_related_file": True,
+            "has_exact_match": True,
+            "exact_match_info": "《公路桥涵设计通用规范》(JTG D60-2015)状态为现行",
+            "same_name_current": ""
+        }
+    ]
+
+    print("\n模拟 match_reference_files 返回数据:")
+    for idx, item in enumerate(mock_match_result):
+        print(f"\n  项{idx}:")
+        print(f"    review_item: {item['review_item']}")
+        print(f"    has_related_file: {item['has_related_file']}")
+        print(f"    has_exact_match: {item['has_exact_match']}")
+        print(f"    exact_match_info: {item['exact_match_info']}")
+        print(f"    same_name_current: {item['same_name_current']}")
+
+    # 测试旧过滤逻辑(只保留 exact_match_info 不为空的)
+    old_filtered = [item for item in mock_match_result if item.get('exact_match_info')]
+    print(f"\n旧过滤逻辑(只保留 exact_match_info 不为空的): {len(old_filtered)} 个项")
+    for item in old_filtered:
+        print(f"  - {item['review_item']}")
+
+    # 测试新过滤逻辑(保留有相关信息的)
+    new_filtered = [
+        item for item in mock_match_result
+        if item.get('has_related_file') or
+           item.get('exact_match_info') or
+           item.get('same_name_current')
+    ]
+    print(f"\n新过滤逻辑(保留有相关信息的): {len(new_filtered)} 个项")
+    for item in new_filtered:
+        print(f"  - {item['review_item']}")
+
+    # 分析差异
+    missed = [item for item in mock_match_result if item not in old_filtered]
+    if missed:
+        print(f"\n[警告] 旧逻辑漏检的项:")
+        for item in missed:
+            print(f"  - {item['review_item']}")
+            print(f"    has_related_file: {item['has_related_file']}")
+            print(f"    same_name_current: {item['same_name_current']}")
+
+# 完整测试
+async def test_full_review():
+    print("\n" + "=" * 60)
+    print("完整审查测试(需要 Milvus 连接)")
+    print("=" * 60)
+
+    try:
+        async with ContentTimelinessReviewer(max_concurrent=4) as reviewer:
+            results = await reviewer.review_tertiary_content(
+                tertiary_details=test_tertiary_details,
+                collection_name="first_bfp_collection_status"
+            )
+
+            print(f"\n审查完成,共 {len(results)} 个结果:")
+            for idx, result in enumerate(results):
+                print(f"\n  结果{idx}:")
+                print(f"    check_item: {result.get('check_item')}")
+                print(f"    exist_issue: {result.get('exist_issue')}")
+                print(f"    risk_info: {result.get('risk_info')}")
+                check_result = result.get('check_result', {})
+                print(f"    issue_point: {check_result.get('issue_point')}")
+                print(f"    suggestion: {check_result.get('suggestion')}")
+                print(f"    reason: {check_result.get('reason')}")
+
+    except Exception as e:
+        print(f"测试失败: {e}")
+        import traceback
+        traceback.print_exc()
+
+if __name__ == "__main__":
+    # 测试提取器
+    refs = test_extractor()
+
+    # 测试过滤逻辑
+    test_filter_logic()
+
+    # 完整测试(可选)
+    # asyncio.run(test_full_review())

+ 2 - 4
views/construction_review/file_upload.py

@@ -4,19 +4,17 @@
 """
 import ast
 import traceback
-import uuid
 import time
 from datetime import datetime
 
-from pydantic import BaseModel, Field
-from typing import Optional,List
+from pydantic import BaseModel
+from typing import List
 from foundation.utils import md5
 from foundation.infrastructure.config import config_handler
 from .schemas.error_schemas import FileUploadErrors
 from core.base.workflow_manager import WorkflowManager
 from foundation.observability.logger.loggering import review_logger as logger
 from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Request
-from core.base.redis_duplicate_checker import RedisDuplicateChecker
 from foundation.infrastructure.tracing import TraceContext, auto_trace
 
 

+ 1 - 6
views/construction_review/review_results.py

@@ -3,14 +3,9 @@
 模拟风险统计、总结报告和问题条文返回
 """
 
-import random
-import os
-import json
-from datetime import datetime
 from fastapi import APIRouter, HTTPException, Query
 from pydantic import BaseModel
-from typing import Optional, Dict, Any
-from .schemas.error_schemas import ReviewResultsErrors
+from typing import Dict, Any
 from foundation.observability.cachefiles import cache, CacheBaseDir
 
 

+ 0 - 1
views/construction_review/task_control.py

@@ -3,7 +3,6 @@
 提供任务终止、查询等控制功能
 """
 
-import asyncio
 from typing import List, Optional, Dict, Any
 from pydantic import BaseModel, Field
 from fastapi import APIRouter, HTTPException, Query

+ 0 - 1
views/construction_write/content_completion.py

@@ -6,7 +6,6 @@ API URL: https://dashscope.aliyuncs.com/compatible-mode/v1
 模型:qwen3-30b-a3b-instruct-2507
 """
 
-import os
 import uuid
 import json
 import time

+ 0 - 2
views/construction_write/outline_views.py

@@ -10,7 +10,6 @@
 - POST /sgbx/context_generate: SSE 流式上下文生成 (新增)
 """
 
-import os
 import uuid
 import json
 import time
@@ -26,7 +25,6 @@ from foundation.infrastructure.config.config import config_handler
 from core.base.workflow_manager import WorkflowManager
 from core.base.sse_manager import unified_sse_manager
 from core.base.progress_manager import ProgressManager
-from redis import asyncio as redis_async  # 新增这行
 from redis.asyncio import Redis as AsyncRedis
 
 # 创建路由

برخی فایل ها در این مقایسه diff نمایش داده نمی شوند زیرا تعداد فایل ها بسیار زیاد است