Просмотр исходного кода

dev:debug
审查时效性规范性

suhua31 3 недель назад
Родитель
Сommit
045df6a7ee

+ 537 - 0
core/construction_review/component/reviewers/reference_basis_reviewer.py.bak

@@ -0,0 +1,537 @@
+from __future__ import annotations
+
+import asyncio
+import json
+import time
+import yaml
+from typing import Any, Dict, List, Optional
+from functools import partial
+
+from langchain_milvus import Milvus, BM25BuiltInFunction
+from foundation.infrastructure.config.config import config_handler
+from foundation.ai.models.model_handler import model_handler as mh
+from core.construction_review.component.reviewers.utils.directory_extraction import BasisItem, BasisItems
+from core.construction_review.component.reviewers.utils.inter_tool import InterTool
+from core.construction_review.component.reviewers.utils.prompt_loader import PromptLoader
+from core.construction_review.component.reviewers.utils.punctuation_checker import check_punctuation
+from core.construction_review.component.reviewers.utils.punctuation_result_processor import process_punctuation_results
+from core.construction_review.component.reviewers.utils.reference_matcher import match_reference_files
+from foundation.observability.logger.loggering import review_logger as logger
+from langchain_core.prompts import ChatPromptTemplate
+from foundation.ai.agent.generate.model_generate import generate_model_client
+
+class BasisSearchEngine:
+    """编制依据向量搜索引擎"""
+
+    # 类级别的缓存,避免重复创建 Milvus 实例
+    _vectorstore_cache = {}
+
+    def __init__(self):
+        self.emdmodel = None
+        self.host = None
+        self.port = None
+        self.user = None
+        self.password = None
+        self._initialize()
+
+    def _initialize(self):
+        """初始化搜索引擎"""
+        try:
+            # 连接配置
+            self.host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
+            self.port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
+            self.user = config_handler.get('milvus', 'MILVUS_USER')
+            self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
+
+            # 初始化嵌入模型
+            self.emdmodel = mh._get_lq_qwen3_8b_emd()
+            logger.info("嵌入模型初始化成功")
+
+        except Exception as e:
+            logger.error(f" BasisSearchEngine 初始化失败: {e}")
+
+    def _get_vectorstore(self, collection_name: str):
+        """获取或创建 Milvus vectorstore 实例(使用缓存)"""
+        cache_key = f"{self.host}:{self.port}:{collection_name}"
+
+        if cache_key not in BasisSearchEngine._vectorstore_cache:
+            connection_args = {
+                "uri": f"http://{self.host}:{self.port}",
+                "user": self.user,
+                "db_name": "lq_db"
+            }
+            if self.password:
+                connection_args["password"] = self.password
+
+            # 抑制 AsyncMilvusClient 的警告日志
+            import logging
+            original_level = logging.getLogger('pymilvus').level
+            logging.getLogger('pymilvus').setLevel(logging.ERROR)
+
+            try:
+                vectorstore = Milvus(
+                    embedding_function=self.emdmodel,
+                    collection_name=collection_name,
+                    connection_args=connection_args,
+                    consistency_level="Strong",
+                    builtin_function=BM25BuiltInFunction(),
+                    vector_field=["dense", "sparse"]
+                )
+                BasisSearchEngine._vectorstore_cache[cache_key] = vectorstore
+                logger.info(f"创建并缓存 Milvus 连接: {cache_key}")
+            finally:
+                logging.getLogger('pymilvus').setLevel(original_level)
+
+        return BasisSearchEngine._vectorstore_cache[cache_key]
+
+    def hybrid_search(self, collection_name: str, query_text: str,
+                     top_k: int = 3, ranker_type: str = "weighted",
+                     dense_weight: float = 0.7, sparse_weight: float = 0.3):
+        try:
+            # 使用缓存的 vectorstore
+            vectorstore = self._get_vectorstore(collection_name)
+
+            # 执行混合搜索
+            if ranker_type == "weighted":
+                results = vectorstore.similarity_search(
+                    query=query_text,
+                    k=top_k,
+                    ranker_type="weighted",
+                    ranker_params={"weights": [dense_weight, sparse_weight]}
+                )
+            else:  # rrf
+                results = vectorstore.similarity_search(
+                    query=query_text,
+                    k=top_k,
+                    ranker_type="rrf",
+                    ranker_params={"k": 60}
+                )
+
+            # 格式化结果,保持与其他搜索方法一致
+            formatted_results = []
+            for doc in results:
+                formatted_results.append({
+                    'id': doc.metadata.get('pk', 0),
+                    'text_content': doc.page_content,
+                    'metadata': doc.metadata,
+                    'distance': 0.0,
+                    'similarity': 1.0
+                })
+
+            return formatted_results
+
+        except Exception as e:
+            # 回退到传统的向量搜索
+            logger.error(f" 搜索失败: {e}")
+
+class StandardizedResponseProcessor:
+    """标准化响应处理器 - 统一为outline_reviewer.py格式"""
+
+    def __init__(self):
+        self.inter_tool = InterTool()
+
+    def process_llm_response(self, response_text: str, check_name: str, chapter_code: str,check_item_code:str) -> List[Dict[str, Any]]:
+        """
+        处理LLM响应,返回标准格式
+
+        Args:
+            response_text: LLM原始响应文本
+            check_name: 检查项名称
+            chapter_code: 章节代码
+            check_item_code: 检查项代码
+
+        Returns:
+            List[Dict]: 标准格式的审查结果列表
+        """
+        if not self.inter_tool:
+            logger.warning("InterTool未初始化,返回空结果")
+            return []
+
+        try:
+            # 使用inter_tool提取JSON数据
+            json_data = self.inter_tool._extract_json_data(response_text)
+            parsed_result = []
+
+            if json_data and isinstance(json_data, list):
+                for item in json_data:
+                    parsed_result.append(self.inter_tool._create_issue_item(item, check_name, chapter_code,check_item_code))
+            elif json_data and isinstance(json_data, dict):
+                parsed_result.append(self.inter_tool._create_issue_item(json_data, check_name, chapter_code,check_item_code))
+
+            return parsed_result
+
+        except Exception as e:
+            logger.error(f"处理LLM响应失败: {str(e)}")
+            # 返回一个错误条目
+            return [{
+                "check_item": check_name,
+                "chapter_code": "basis",
+                "check_item_code": f"basis_{check_name}",
+                "check_result": {"error": str(e)},
+                "exist_issue": True,
+                "risk_info": {"risk_level": "medium"}
+            }]
+
+
+class MessageBuilder:
+    """消息构建工具类"""
+
+    def __init__(self, prompt_loader_instance=None):
+        self.prompt_loader = prompt_loader_instance
+        
+    def get_prompt_template(self):
+        with open("core/construction_review/component/reviewers/prompt/reference_basis_reviewer.yaml", "r", encoding="utf-8") as f:
+            data = yaml.safe_load(f)
+        return ChatPromptTemplate.from_messages([
+                ("system", data["reference_basis_reviewer"]["system_prompt"]),
+                ("user", data["reference_basis_reviewer"]["user_prompt_template"])
+            ])
+    
+class LLMReviewClient:
+    """LLM审查客户端"""
+
+    def __init__(self):
+        """初始化LLM审查客户端,使用通用模型底座"""
+        self.model_client = generate_model_client
+
+    async def review_basis(self, Message: str, trace_id: str = None) -> str:
+        try:
+            logger.info(f" 模型调用准备阶段: trace_id={trace_id}")
+
+            # 使用通用模型底座调用
+            messages = Message.format_messages() if hasattr(Message, 'format_messages') else Message
+            response = await self.model_client.get_model_generate_invoke(
+                trace_id=trace_id or "ref_basis_review",
+                messages=messages if isinstance(messages, list) else None,
+                prompt=messages if isinstance(messages, str) else None,
+                model_name="qwen3_30b"
+            )
+            return response
+
+        except Exception as e:
+            logger.error(f" 模型调用准备阶段失败: {e}")
+            # 返回空JSON数组字符串以防解析崩溃
+            return "[]"
+
+
+class BasisReviewService:
+    """编制依据审查服务核心类"""
+
+    def __init__(self, max_concurrent: int = 4):
+        self.search_engine = BasisSearchEngine()
+        self.llm_client = LLMReviewClient()
+        self.response_processor = StandardizedResponseProcessor()
+        fresh_prompt_loader = PromptLoader()
+        self.message_builder = MessageBuilder(fresh_prompt_loader)
+        self.max_concurrent = max_concurrent
+        self._semaphore = None
+
+    async def __aenter__(self):
+        """异步上下文管理器入口"""
+        if self._semaphore is None:
+            self._semaphore = asyncio.Semaphore(self.max_concurrent)
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        """异步上下文管理器出口"""
+        return False
+
+    async def review_batch(
+        self,
+        basis_items: List[str],
+        collection_name: str = "first_bfp_collection_status",
+        filters: Optional[Dict[str, Any]] = None,
+        min_score: float = 0.3,
+        top_k_each: int = 3,
+    ) -> List[Dict[str, Any]]:
+        """异步批次审查(通常3条)"""
+        basis_items = [x for x in (basis_items or []) if isinstance(x, str) and x.strip()]
+        if not basis_items:
+            return []
+
+        async with self._semaphore:
+            try:
+                # 第一步:搜索编制依据并通过match_reference_files过滤
+                search_tasks = []
+                for basis in basis_items:
+                    task = asyncio.create_task(
+                        self._async_search_basis(basis, collection_name, top_k_each)
+                    )
+                    search_tasks.append(task)
+
+                # 等待所有搜索完成
+                search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
+
+                grouped_candidates = []
+                for i, result in enumerate(search_results):
+                    if isinstance(result, Exception):
+                        logger.error(f"搜索失败 '{basis_items[i]}': {result}")
+                        grouped_candidates.append([])
+                    else:
+                        # result 是 List[dict],需要遍历
+                        texts = [item["text_content"] for item in result if "text_content" in item]
+                        grouped_candidates.append(texts)
+                
+                # 获取match_reference_files的结果并过滤
+                match_result = await match_reference_files(reference_text=grouped_candidates, review_text=basis_items)
+                # 解析JSON并过滤:same_name_current和exact_match_info都是""的项过滤掉
+                try:
+                    match_data = json.loads(match_result)
+                    # 提取items字段(match_reference_files返回{items: [...]}格式)
+                    items = match_data.get('items', match_data) if isinstance(match_data, dict) else match_data
+                    filtered_data = [item for item in items if not (item.get('same_name_current') == "" and item.get('exact_match_info') == "")]
+                    # 从过滤后的数据中提取review_item用于后续检查
+                    filtered_basis_items = [item.get('review_item') for item in filtered_data if item.get('review_item')]
+                    basis_items_to_check = filtered_basis_items if filtered_basis_items else []
+                    logger.info(f"过滤后参与检查的编制依据: {len(basis_items_to_check)}/{len(basis_items)}")
+                except (json.JSONDecodeError, TypeError) as e:
+                    logger.warning(f"过滤match_reference_files结果时出错: {e}")
+                    # 如果解析失败,使用原始结果
+                    basis_items_to_check = []
+                
+                # 如果没有过滤出数据,直接返回空结果
+                if not basis_items_to_check:
+                    logger.info(f"过滤后没有符合条件的编制依据,跳过后续检查")
+                    return []
+                
+                # 第二步:调用标点符号检查器
+                checker_result = await check_punctuation(basis_items_to_check)
+                print(checker_result)
+                
+                # 第三步:调用结果处理器,生成详细的问题分析报告
+                processor_result = await process_punctuation_results(checker_result)
+                print("\n【第二步】问题分析报告输出:")
+                print(processor_result)
+                
+                # 第四步:转换为标准格式
+                standardized_result = self.response_processor.process_llm_response(
+                    processor_result, 
+                    "reference_check", 
+                    "basis",
+                    "basis_reference_check"
+                )
+
+                # 统计问题数量
+                issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
+                logger.info(f"编制依据批次审查完成:总计 {len(basis_items_to_check)} 项,发现问题 {issue_count} 项")
+
+                return standardized_result
+
+            except Exception as e:
+                logger.error(f" 批次处理失败: {e}")
+                return [{
+                    "check_item": "reference_check",
+                    "chapter_code": "basis",
+                    "check_item_code": "basis_reference_check",
+                    "check_result": {"error": str(e), "basis_items": basis_items},
+                    "exist_issue": True,
+                    "risk_info": {"risk_level": "high"}
+                }]
+
+    async def _async_search_basis(
+        self,
+        basis: str,
+        collection_name: str,
+        top_k_each: int
+    ) -> List[dict]:
+        """异步搜索单个编制依据(Hybrid Search)"""
+        try:
+            loop = asyncio.get_running_loop()
+            func = partial(
+                self.search_engine.hybrid_search,
+                collection_name=collection_name,
+                query_text=basis,
+                top_k=top_k_each,
+                ranker_type="weighted",
+                dense_weight=0.3,
+                sparse_weight=0.7
+            )
+            retrieved = await loop.run_in_executor(None, func)
+            logger.info(f" 搜索 '{basis}' -> 找到 {len(retrieved or [])} 个结果")
+            return retrieved or []
+        except Exception as e:
+            logger.error(f" 搜索失败 '{basis}': {e}")
+            return []
+
+    async def review_all(self, basis_items: BasisItems, collection_name: str = "first_bfp_collection_status",
+                        progress_manager=None, callback_task_id: str = None) -> List[List[Dict[str, Any]]]:
+        """异步批量审查所有编制依据(BasisItems 入参)"""
+        if not basis_items or not getattr(basis_items, "items", None):
+            return []
+        
+        items = [item.raw for item in basis_items.items if getattr(item, "raw", None)]
+        if not items:
+            return []
+
+        start_time = time.time()
+        total_batches = (len(items) + 2) // 3  # 计算总批次数
+        
+        # 发送开始审查的SSE推送(使用独立命名空间,避免与主流程进度冲突)
+        if progress_manager and callback_task_id:
+            try:
+                await progress_manager.update_stage_progress(
+                    callback_task_id=callback_task_id,
+                    stage_name="编制依据审查-子任务",  # 独立命名空间
+                    status="processing",
+                    message=f"开始编制依据审查,共{len(items)}项编制依据",
+                    overall_task_status="processing",
+                    event_type="processing"
+                    # 不设置 current,避免覆盖主流程进度
+                )
+            except Exception as e:
+                logger.error(f"SSE推送开始消息失败: {e}")
+
+        # 分批处理
+        batches = []
+        for i in range(0, len(items), 3):
+            batch = items[i:i + 3]
+            batches.append(batch)
+
+        # 异步并发执行所有批次,使用回调处理SSE推送
+        async def process_batch_with_callback(batch_index: int, batch: List[str]) -> List[Dict[str, Any]]:
+            """处理单个批次并执行SSE回调"""
+            try:
+                # 执行单个批次审查
+                result = await self.review_batch(batch, collection_name)
+
+                # 统计当前批次结果
+                batch_standard_count = 0
+                for item in result:
+                    if isinstance(item, dict) and item.get('is_standard', False):
+                        batch_standard_count += 1
+
+                # 立即推送当前批次完成的SSE消息(使用独立命名空间)
+                logger.info(f"批次{batch_index + 1}完成,准备推送SSE")
+                if progress_manager and callback_task_id:
+                    try:
+                        await progress_manager.update_stage_progress(
+                            callback_task_id=callback_task_id,
+                            stage_name=f"编制依据审查-子任务-批次{batch_index + 1}",  # 独立命名空间
+                            status="processing",
+                            message=f"完成第{batch_index + 1}/{total_batches}批次编制依据审查,{len(batch)}项,其中{batch_standard_count}项为标准",
+                            overall_task_status="processing",
+                            event_type="processing",
+                            issues=result  # 推送该批次的审查结果
+                            # 不设置 current,避免覆盖主流程进度
+                        )
+                        logger.info(f"批次{batch_index + 1} SSE推送成功")
+                    except Exception as e:
+                        logger.error(f"SSE推送批次{batch_index + 1}结果失败: {e}")
+
+                return result
+
+            except Exception as e:
+                logger.error(f" 批次 {batch_index} 处理失败: {e}")
+                error_result = [{"name": name, "is_standard": False, "status": "", "meg": f"批次处理失败: {str(e)}"}
+                                for name in batch]
+
+                # 即使失败也要推送结果(使用独立命名空间)
+                if progress_manager and callback_task_id:
+                    try:
+                        await progress_manager.update_stage_progress(
+                            callback_task_id=callback_task_id,
+                            stage_name=f"编制依据审查-子任务-批次{batch_index + 1}",  # 独立命名空间
+                            status="processing",
+                            message=f"第{batch_index + 1}/{total_batches}批次处理失败",
+                            overall_task_status="processing",
+                            event_type="processing",
+                            issues=error_result
+                            # 不设置 current,避免覆盖主流程进度
+                        )
+                    except Exception as push_e:
+                        logger.error(f"SSE推送失败批次{batch_index + 1}结果失败: {push_e}")
+
+                return error_result
+
+        # 创建所有批次的异步任务
+        batch_tasks = []
+        for i, batch in enumerate(batches):
+            task = process_batch_with_callback(i, batch)
+            batch_tasks.append(task)
+
+        # 并发执行所有批次
+        logger.info(f"开始并发执行{total_batches}个批次编制依据审查")
+        processed_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
+
+        # 处理异常结果并统计
+        total_items = 0
+        issue_items = 0
+        successful_batches = 0
+
+        # 重新构建结果列表,过滤异常
+        final_results = []
+        for i, result in enumerate(processed_results):
+            if isinstance(result, Exception):
+                logger.error(f" 批次 {i} 返回异常: {result}")
+                error_batch = batches[i] if i < len(batches) else []
+                error_result = [{
+                    "check_item": "reference_check",
+                    "chapter_code": "basis",
+                    "check_item_code": "basis_reference_check",
+                    "check_result": {"error": str(result), "basis_items": error_batch},
+                    "exist_issue": True,
+                    "risk_info": {"risk_level": "high"}
+                }]
+                final_results.append(error_result)
+            else:
+                final_results.append(result)
+                successful_batches += 1
+
+                # 过滤空批次结果,避免出现 []
+        final_results = [res for res in final_results if res]
+
+        # 统计总结果
+        for result in final_results:
+            for item in result:
+                total_items += 1
+                if isinstance(item, dict) and item.get('exist_issue', False):
+                    issue_items += 1
+
+        logger.info(f"并发执行完成,成功批次: {successful_batches}/{total_batches}")
+
+
+        # 发送完成审查的SSE推送(使用独立命名空间,不设置current避免覆盖主流程进度)
+        elapsed_time = time.time() - start_time
+        if progress_manager and callback_task_id:
+            try:
+                await progress_manager.update_stage_progress(
+                    callback_task_id=callback_task_id,
+                    stage_name="编制依据审查-子任务",  # 独立命名空间
+                    status="processing",
+                    message=f"编制依据审查完成,共{total_items}项,发现问题{issue_items}项,耗时{elapsed_time:.2f}秒",
+                    overall_task_status="processing",
+                    event_type="processing"
+                    # 不设置 current,避免覆盖主流程进度
+                )
+            except Exception as e:
+                logger.error(f"SSE推送完成消息失败: {e}")
+
+        logger.info(f" 异步审查完成,耗时: {elapsed_time:.4f} 秒")
+        logger.info(f" 总编制依据: {total_items}, 问题项: {issue_items}, 成功批次: {successful_batches}/{total_batches}")
+        print("final_results:\n")
+        print(final_results)    
+        return final_results
+
+
+# 便捷函数
+async def review_basis_batch_async(basis_items: List[str], max_concurrent: int = 4) -> List[Dict[str, Any]]:
+    """异步批次审查便捷函数"""
+    async with BasisReviewService(max_concurrent=max_concurrent) as service:
+        return await service.review_batch(basis_items)
+
+
+async def review_all_basis_async(basis_items: BasisItems, max_concurrent: int = 4) -> List[List[Dict[str, Any]]]:
+    """异步全部审查便捷函数(BasisItems 入参)"""
+    async with BasisReviewService(max_concurrent=max_concurrent) as service:
+        return await service.review_all(basis_items)
+
+if __name__ == "__main__":
+    # 简单测试
+    test_basis_items = BasisItems(items=[
+        BasisItem(title="坠落防护水平生命线装置", suffix="GB 38454", raw="《坠落防护水平生命线装置》GB 38454"),
+        BasisItem(title="电力高处作业防坠器", suffix="DL/T 1147", raw="《电力高处作业防坠器》DL/T 1147"),
+        BasisItem(title="坠落防护挂点装置", suffix="GB 30862", raw="《坠落防护挂点装置》GB 30862"),
+        BasisItem(title="混凝土结构设计规范", suffix="GB 50010-2010", raw="《混凝土结构设计规范》GB 50010-2010"),
+        BasisItem(title="建筑施工组织设计规范", suffix="GB/T 50502-2015", raw="《建筑施工组织设计规范》GB/T 50502-2015"),
+    ])
+    result = asyncio.run(review_all_basis_async(test_basis_items))

+ 149 - 4
core/construction_review/component/reviewers/utils/punctuation_checker.py

@@ -22,6 +22,126 @@ class PunctuationResults(BaseModel):
     items: List[PunctuationResult]
 
 
+# ===== 1.5) 辅助函数:提取和验证编号 =====
+def _extract_bracket_content_smart(text: str) -> tuple:
+    """
+    智能提取括号内容,处理嵌套括号情况
+    
+    返回: (括号内容, 是否成对, 括号后是否有额外字符)
+    """
+    # 统一括号类型
+    text_normalized = text.replace('(', '(').replace(')', ')')
+    
+    # 找到书名号后的文本
+    last_title_end = max(text_normalized.rfind('《'), text_normalized.rfind('》'))
+    if last_title_end == -1:
+        return None, False, True
+    
+    text_after_title = text_normalized[last_title_end + 1:]
+    
+    # 使用栈找到所有配对的括号
+    stack = []
+    pairs = []
+    
+    for i, char in enumerate(text_after_title):
+        if char == '(':
+            stack.append(i)
+        elif char == ')':
+            if stack:
+                start = stack.pop()
+                content = text_after_title[start + 1:i]
+                pairs.append({
+                    'content': content,
+                    'start': start,
+                    'end': i,
+                    'full': text_after_title[start:i+1]
+                })
+    
+    # 找到第一个配对的括号内容(最外层)
+    if pairs:
+        # 按起始位置排序,取第一个(最外层)
+        pairs.sort(key=lambda x: x['start'])
+        first_pair = pairs[0]
+        
+        # 检查括号后是否有多余字符(除了空白字符)
+        after_bracket = text_after_title[first_pair['end'] + 1:]
+        has_extra_chars = bool(after_bracket.strip())
+        
+        is_pair = len(stack) == 0
+        return first_pair['content'], is_pair, has_extra_chars
+    
+    # 有左括号但没有配对的右括号
+    unpaired_left = text_after_title.count('(') - text_after_title.count(')')
+    if unpaired_left > 0:
+        first_left = text_after_title.find('(')
+        if first_left != -1:
+            # 提取左括号后到右括号(或字符串结尾)的内容
+            first_right = text_after_title.find(')', first_left + 1)
+            if first_right != -1:
+                content = text_after_title[first_left + 1:first_right]
+            else:
+                content = text_after_title[first_left + 1:]
+            # 检查是否有多余字符(肯定有多余,因为没有配对的右括号)
+            return content, False, True
+    
+    return None, False, True
+
+
+def _is_valid_reference_number(number_text: str) -> bool:
+    """
+    验证编号是否符合规范要求:
+    1. 接受版本号格式(如"2024版"、"2023年")- 企业内部文件常用
+    2. 接受标准编号格式(英文+数字,如GB50010-2010)
+    3. 接受法规编号格式(中文+数字,如令第393号)
+    
+    有效示例:
+    - GB50010-2010、Q/CR9230-2016(英文+数字)
+    - 令第393号、第37号令(中文+数字)
+    - 2024版、2023年(版本号格式,企业内部文件)
+    - V1.0、Version 2.0(版本号格式)
+    
+    无效示例:
+    - 纯空格、纯特殊字符
+    - 无意义的数字组合
+    """
+    if not number_text or not number_text.strip():
+        return False
+    
+    text = number_text.strip()
+    
+    # 检查是否包含英文字母
+    has_english = bool(re.search(r'[a-zA-Z]', text))
+    # 检查是否包含中文字符
+    has_chinese = bool(re.search(r'[\u4e00-\u9fff]', text))
+    # 检查是否包含数字
+    has_digit = bool(re.search(r'\d', text))
+    
+    # 情况1: 标准编号格式(英文+数字)
+    if has_english and has_digit:
+        return True
+    
+    # 情况2: 法规编号格式(中文+数字,且不是纯数字后缀)
+    if has_chinese and has_digit:
+        # 排除纯数字+少量中文后缀的情况(如"2024版"不算中文+数字格式)
+        chinese_pattern = r'^[\d\s]*[\u4e00-\u9fff]+[\d\s]*[\u4e00-\u9fff]*$'
+        if re.search(chinese_pattern, text):
+            return True
+    
+    # 情况3: 版本号格式(年份+版/年,企业内部文件常用)
+    # 匹配:4位年份 + 版/年,如"2024版"、"2023年"、"2024 版"、"2023修订版"
+    version_pattern = r'^\d{4}\s*[版年修订]+[版本]?$'
+    if re.match(version_pattern, text):
+        return True
+    
+    # 情况4: 版本号格式(V/VERSION + 数字)
+    v_version_pattern = r'^[vV][\d\.]+|[Vv]ersion\s*[\d\.]+$'
+    if re.match(v_version_pattern, text):
+        return True
+    
+    # 有效编号:必须有数字,且满足上述任一格式
+    return has_digit and (has_english or has_chinese)
+
+
 # ===== 2) SYSTEM Prompt =====
 SYSTEM = """
 你是【编制依据格式检查专家】。
@@ -204,15 +324,40 @@ async def check_punctuation(items: List[str]) -> str:
             # 检查是否在最后一个》之后找到了括号
             bracket_after_title = first_bracket_pos != float('inf')
 
-        if not title_pair_ok or not bracket_pair_ok or not bracket_after_title:
+        # 【修改】使用智能提取逻辑处理括号
+        bracket_content, is_pair, has_extra_chars = _extract_bracket_content_smart(text)
+        
+        # 检查是否找到括号内容
+        if bracket_content is None:
+            # 没有括号
+            pre_results.append({
+                "original_text": text,
+                "title_mark_status": bool(title_pair_ok),
+                "bracket_status": None
+            })
+        elif not is_pair:
+            # 括号不成对(缺少右括号或左括号)
+            pre_results.append({
+                "original_text": text,
+                "title_mark_status": bool(title_pair_ok),
+                "bracket_status": False
+            })
+        elif has_extra_chars:
+            # 括号成对,但括号后有多余字符(如分号、数字等)
+            pre_results.append({
+                "original_text": text,
+                "title_mark_status": bool(title_pair_ok),
+                "bracket_status": False
+            })
+        elif not _is_valid_reference_number(bracket_content):
+            # 括号成对且无多余字符,但编号格式不正确(纯数字或无效格式)
             pre_results.append({
                 "original_text": text,
                 "title_mark_status": bool(title_pair_ok),
-                "bracket_status": bool(bracket_pair_ok and bracket_after_title)
+                "bracket_status": False
             })
         else:
-            # 成对且位置正确,直接视为格式正确
-            # 不再调用LLM判断,避免LLM误判
+            # 括号成对、无多余字符、编号格式正确
             pre_results.append({
                 "original_text": text,
                 "title_mark_status": True,

+ 117 - 19
core/construction_review/component/reviewers/utils/punctuation_result_processor.py

@@ -164,21 +164,103 @@ def _extract_regulation_name(text: str) -> str:
 
 # ===== 6.6) 辅助函数:提取现有编号 =====
 def _extract_existing_number(text: str) -> Optional[str]:
-    """从原文中提取现有编号(编号通常是最后一个括号对)"""
-    # 找最后一个右括号的位置
-    last_right_pos = max(text.rfind(')'), text.rfind(')'))
-    if last_right_pos == -1:
-        return None
+    """
+    从原文中提取现有编号(智能处理嵌套括号)n    
+    规则:
+    1. 找到书名号后的第一个配对括号
+    2. 验证括号内容是否符合编号格式(英文+数字 或 中文+数字)
+    3. 返回有效的编号内容
+    """
+    # 统一括号类型
+    text_normalized = text.replace('(', '(').replace(')', ')')
     
-    # 在最后一个右括号之前找匹配的左括号
-    text_before = text[:last_right_pos]
-    last_left_pos = max(text_before.rfind('('), text_before.rfind('('))
-    if last_left_pos == -1:
+    # 找到书名号后的文本
+    last_title_end = max(text_normalized.rfind('《'), text_normalized.rfind('》'))
+    if last_title_end == -1:
         return None
     
-    # 提取括号内的内容(编号)
-    number = text[last_left_pos + 1:last_right_pos].strip()
-    return number if number else None
+    text_after_title = text_normalized[last_title_end + 1:]
+    
+    # 使用栈找到第一个配对的括号
+    stack = []
+    for i, char in enumerate(text_after_title):
+        if char == '(':
+            stack.append(i)
+        elif char == ')':
+            if stack:
+                start = stack.pop()
+                if len(stack) == 0:  # 最外层配对
+                    content = text_after_title[start + 1:i].strip()
+                    # 验证是否为有效编号格式
+                    if _is_valid_number_format(content):
+                        return content
+    
+    # 如果没有找到有效的配对括号,尝试提取第一个左括号后的内容
+    first_left = text_after_title.find('(')
+    if first_left != -1:
+        # 找到第一个右括号的位置,限制提取范围
+        first_right = text_after_title.find(')', first_left + 1)
+        next_left = text_after_title.find('(', first_left + 1)
+        
+        # 确定提取的结束位置:右括号或下一个左括号,取先出现的
+        end_pos = None
+        if first_right != -1 and next_left != -1:
+            end_pos = min(first_right, next_left)
+        elif first_right != -1:
+            end_pos = first_right
+        elif next_left != -1:
+            end_pos = next_left
+        
+        if end_pos is not None:
+            content = text_after_title[first_left + 1:end_pos].strip()
+        else:
+            content = text_after_title[first_left + 1:].strip()
+            
+        if _is_valid_number_format(content):
+            return content
+    
+    return None
+
+
+def _is_valid_number_format(content: str) -> bool:
+    """
+    验证内容是否符合编号格式:
+    1. 接受版本号格式(如"2024版"、"2023年")- 企业内部文件常用
+    2. 接受标准编号格式(英文+数字,如GB50010-2010)
+    3. 接受法规编号格式(中文+数字,如令第393号)
+    """
+    if not content:
+        return False
+    
+    # 检查是否包含英文字母
+    has_english = bool(re.search(r'[a-zA-Z]', content))
+    # 检查是否包含中文字符
+    has_chinese = bool(re.search(r'[\u4e00-\u9fff]', content))
+    # 检查是否包含数字
+    has_digit = bool(re.search(r'\d', content))
+    
+    # 情况1: 标准编号格式(英文+数字)
+    if has_english and has_digit:
+        return True
+    
+    # 情况2: 法规编号格式(中文+数字)
+    if has_chinese and has_digit:
+        chinese_pattern = r'^[\d\s]*[\u4e00-\u9fff]+[\d\s]*[\u4e00-\u9fff]*$'
+        if re.search(chinese_pattern, content):
+            return True
+    
+    # 情况3: 版本号格式(年份+版/年,企业内部文件常用)
+    version_pattern = r'^\d{4}\s*[版年修订]+[版本]?$'
+    if re.match(version_pattern, content):
+        return True
+    
+    # 情况4: 版本号格式(V/VERSION + 数字)
+    v_version_pattern = r'^[vV][\d\.]+|[Vv]ersion\s*[\d\.]+$'
+    if re.match(v_version_pattern, content):
+        return True
+    
+    # 有效编号:必须有数字,且满足上述任一格式
+    return has_digit and (has_english or has_chinese)
 
 
 # ===== 6.7) 辅助函数:生成建议(新流程) =====
@@ -236,7 +318,7 @@ async def _generate_suggestion_with_validation(
 
 
 # ===== 7) 核心方法 =====
-async def process_punctuation_results(check_results: str) -> str:
+async def process_punctuation_results(check_results: str, enterprise_items: list = None) -> str:
     """
     根据标点符号检查结果生成详细的问题分析报告
     
@@ -244,6 +326,10 @@ async def process_punctuation_results(check_results: str) -> str:
     1. 首先验证现有编号是否正确(调用模型验证)
     2. 如果验证通过,接受该编号
     3. 如果验证失败,调用5个模型生成正确编号
+    
+    Args:
+        check_results: 标点符号检查结果的JSON字符串
+        enterprise_items: 企业内部文件列表(在标准库中未找到匹配的文件)
     """
     # 首先解析检查结果
     try:
@@ -253,6 +339,9 @@ async def process_punctuation_results(check_results: str) -> str:
     except json.JSONDecodeError:
         check_data = []
     
+    # 企业内部文件集合(用于快速查找)
+    enterprise_set = set(enterprise_items or [])
+    
     # 为每个检查结果生成问题分析
     results = []
     for item in check_data:
@@ -271,12 +360,21 @@ async def process_punctuation_results(check_results: str) -> str:
             suggestion = "将规范名称用书名号《》包裹,正确格式:《规范名称》(编号)"
         elif bracket_status is None:
             # 缺少编号
-            reason = "缺少规范编号"
-            base_suggestion = "补充规范编号,正确格式:《规范名称》(编号)"
-            # 调用新流程生成建议(验证+生成)
-            suggestion = await _generate_suggestion_with_validation(
-                original_text, "missing_number", base_suggestion
-            )
+            # 【修改】判断是否为企业内部文件
+            if original_text in enterprise_set:
+                # 企业内部文件可以没有编号,不是格式问题
+                issue_point = "编制依据格式正确(企业内部文件)"
+                reason = "企业内部文件,无标准规范编号"
+                suggestion = "无"
+                risk_level = "无风险"
+            else:
+                # 标准规范文件缺少编号
+                reason = "缺少规范编号"
+                base_suggestion = "补充规范编号,正确格式:《规范名称》(编号)"
+                # 调用新流程生成建议(验证+生成)
+                suggestion = await _generate_suggestion_with_validation(
+                    original_text, "missing_number", base_suggestion
+                )
         elif bracket_status is False:
             # 有编号但无括号
             reason = "格式错误!正确格式:《规范名称》(编号)"