Bläddra i källkod

fix:将规范性审查中的向量数据库和大模型投票机制去除

Meric 2 dagar sedan
förälder
incheckning
e8ac29a8dc

+ 0 - 50
core/construction_review/component/reviewers/prompt/reference_basis_reviewer.yaml

@@ -1,50 +0,0 @@
-reference_basis_reviewer:
-  system_prompt: |
-    忘掉你之前所有的内容,完成下面的任务。
-    你是一个“格式校验专家(validator)”,只检查格式是否正确,对内容不做任何检查、建议、修改,忽略全半角符号的区别。
-
-    【检查内容】
-    1) 名称部分必须被书名号《》包裹
-
-    2) 编号部分必须使用括号包裹(中文()和英文()均可)
-
-    3) 一个《名称》应对应一个编号
-
-    【判定过程】
-    1) 只要违反任意规则 => issue_point="编制依据格式错误" 且 risk_level="中风险"
-    2) 否则 => issue_point="编制依据格式正确" 且 risk_level="无风险"
-
-    【输出硬约束】
-    - 你必须只输出一个 JSON 数组
-    - 禁止输出任何额外文字、解释、Markdown、代码块
-    - 数组元素对象只能包含五个字段(不多不少):
-      issue_point, location, suggestion, reason, risk_level
-
-  user_prompt_template: |
-    请对以下输入内容逐条检查格式,并逐条输出一个 JSON 对象,放入同一个 JSON 数组中。
-
-    【字段填写要求】
-    - issue_point:
-      - 格式错误 => "编制依据格式错误"
-      - 格式正确 => "编制依据格式正确"
-
-    - location:
-      - 必须与原输入文本完全一致(原样复制)
-
-    - suggestion:
-      - 对错误内容提出修改建议,格式正确时可填写 "无"
-
-    - reason(只能描述格式,不得涉及语义):
-      - 简洁的说明存在的问题,格式正确时可填写 "无"
-
-    - risk_level:
-      - 只能是 "无风险" 或 "中风险"
-
-    【输出格式规范】
-    - 只输出 JSON 数组
-    - 不要输出 ```json 等任何标记
-    - 不要输出任何额外文字
-
-    输入内容(逐条检查):
-    {check_content}
-    /no_think

+ 146 - 368
core/construction_review/component/reviewers/reference_basis_reviewer.py

@@ -1,239 +1,81 @@
 from __future__ import annotations
 
 import asyncio
-import json
 import time
-import yaml
 from typing import Any, Dict, List, Optional
-from functools import partial
 
-from langchain_milvus import Milvus, BM25BuiltInFunction
-from foundation.infrastructure.config.config import config_handler
-from foundation.ai.models.model_handler import model_handler as mh
 from core.construction_review.component.reviewers.utils.directory_extraction import BasisItem, BasisItems
 from core.construction_review.component.reviewers.utils.inter_tool import InterTool
-from core.construction_review.component.reviewers.utils.prompt_loader import PromptLoader
 from core.construction_review.component.reviewers.utils.punctuation_checker import check_punctuation
 from core.construction_review.component.reviewers.utils.punctuation_result_processor import process_punctuation_results
-from core.construction_review.component.reviewers.utils.reference_matcher import match_reference_files
 from foundation.observability.logger.loggering import review_logger as logger
-from langchain_core.prompts import ChatPromptTemplate
-from foundation.ai.agent.generate.model_generate import generate_model_client
 
-class BasisSearchEngine:
-    """编制依据向量搜索引擎"""
-
-    # 类级别的缓存,避免重复创建 Milvus 实例
-    _vectorstore_cache = {}
-
-    def __init__(self):
-        self.emdmodel = None
-        self.host = None
-        self.port = None
-        self.user = None
-        self.password = None
-        self._initialize()
-
-    def _initialize(self):
-        """初始化搜索引擎"""
-        try:
-            # 连接配置
-            self.host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
-            self.port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
-            self.user = config_handler.get('milvus', 'MILVUS_USER')
-            self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
-
-            # 初始化嵌入模型(从配置获取)
-            self.emdmodel = mh.get_embedding_model()
-            logger.info("嵌入模型初始化成功")
-
-        except Exception as e:
-            logger.error(f" BasisSearchEngine 初始化失败: {e}")
-
-    def _get_vectorstore(self, collection_name: str):
-        """获取或创建 Milvus vectorstore 实例(使用缓存)"""
-        cache_key = f"{self.host}:{self.port}:{collection_name}"
-
-        if cache_key not in BasisSearchEngine._vectorstore_cache:
-            connection_args = {
-                "uri": f"http://{self.host}:{self.port}",
-                "user": self.user,
-                "db_name": "lq_db"
-            }
-            if self.password:
-                connection_args["password"] = self.password
-
-            # 抑制 AsyncMilvusClient 的警告日志
-            import logging
-            original_level = logging.getLogger('pymilvus').level
-            logging.getLogger('pymilvus').setLevel(logging.ERROR)
-
-            try:
-                vectorstore = Milvus(
-                    embedding_function=self.emdmodel,
-                    collection_name=collection_name,
-                    connection_args=connection_args,
-                    consistency_level="Strong",
-                    builtin_function=BM25BuiltInFunction(),
-                    vector_field=["dense", "sparse"]
-                )
-                BasisSearchEngine._vectorstore_cache[cache_key] = vectorstore
-                logger.info(f"创建并缓存 Milvus 连接: {cache_key}")
-            finally:
-                logging.getLogger('pymilvus').setLevel(original_level)
-
-        return BasisSearchEngine._vectorstore_cache[cache_key]
-
-    def hybrid_search(self, collection_name: str, query_text: str,
-                     top_k: int = 3, ranker_type: str = "weighted",
-                     dense_weight: float = 0.7, sparse_weight: float = 0.3):
-        try:
-            # 使用缓存的 vectorstore
-            vectorstore = self._get_vectorstore(collection_name)
-
-            # 执行混合搜索
-            if ranker_type == "weighted":
-                results = vectorstore.similarity_search(
-                    query=query_text,
-                    k=top_k,
-                    ranker_type="weighted",
-                    ranker_params={"weights": [dense_weight, sparse_weight]}
-                )
-            else:  # rrf
-                results = vectorstore.similarity_search(
-                    query=query_text,
-                    k=top_k,
-                    ranker_type="rrf",
-                    ranker_params={"k": 60}
-                )
-
-            # 格式化结果,保持与其他搜索方法一致
-            formatted_results = []
-            for doc in results:
-                formatted_results.append({
-                    'id': doc.metadata.get('pk', 0),
-                    'text_content': doc.page_content,
-                    'metadata': doc.metadata,
-                    'distance': 0.0,
-                    'similarity': 1.0
-                })
-
-            return formatted_results
-
-        except Exception as e:
-            # 回退到传统的向量搜索
-            logger.error(f" 搜索失败: {e}")
 
 class StandardizedResponseProcessor:
-    """标准化响应处理器 - 统一为outline_reviewer.py格式"""
+    """标准化响应处理器 - 统一为 outline reviewer 风格结果"""
 
     def __init__(self):
         self.inter_tool = InterTool()
 
-    def process_llm_response(self, response_text: str, check_name: str, chapter_code: str,check_item_code:str) -> List[Dict[str, Any]]:
-        """
-        处理LLM响应,返回标准格式
-
-        Args:
-            response_text: LLM原始响应文本
-            check_name: 检查项名称
-            chapter_code: 章节代码
-            check_item_code: 检查项代码
-
-        Returns:
-            List[Dict]: 标准格式的审查结果列表
-        """
+    def process_llm_response(
+        self,
+        response_text: str,
+        check_name: str,
+        chapter_code: str,
+        check_item_code: str,
+    ) -> List[Dict[str, Any]]:
         if not self.inter_tool:
-            logger.warning("InterTool未初始化,返回空结果")
+            logger.warning("InterTool 未初始化,返回空结果")
             return []
 
         try:
-            # 使用inter_tool提取JSON数据
             json_data = self.inter_tool._extract_json_data(response_text)
             parsed_result = []
 
             if json_data and isinstance(json_data, list):
                 for item in json_data:
-                    parsed_result.append(self.inter_tool._create_issue_item(item, check_name, chapter_code,check_item_code))
+                    parsed_result.append(
+                        self.inter_tool._create_issue_item(
+                            item, check_name, chapter_code, check_item_code
+                        )
+                    )
             elif json_data and isinstance(json_data, dict):
-                parsed_result.append(self.inter_tool._create_issue_item(json_data, check_name, chapter_code,check_item_code))
+                parsed_result.append(
+                    self.inter_tool._create_issue_item(
+                        json_data, check_name, chapter_code, check_item_code
+                    )
+                )
 
             return parsed_result
 
         except Exception as e:
-            logger.error(f"处理LLM响应失败: {str(e)}")
-            # 返回一个错误条目
-            return [{
-                "check_item": check_name,
-                "chapter_code": "basis",
-                "check_item_code": f"basis_{check_name}",
-                "check_result": {"error": str(e)},
-                "exist_issue": True,
-                "risk_info": {"risk_level": "medium"}
-            }]
-
-
-class MessageBuilder:
-    """消息构建工具类"""
-
-    def __init__(self, prompt_loader_instance=None):
-        self.prompt_loader = prompt_loader_instance
-        
-    def get_prompt_template(self):
-        with open("core/construction_review/component/reviewers/prompt/reference_basis_reviewer.yaml", "r", encoding="utf-8") as f:
-            data = yaml.safe_load(f)
-        return ChatPromptTemplate.from_messages([
-                ("system", data["reference_basis_reviewer"]["system_prompt"]),
-                ("user", data["reference_basis_reviewer"]["user_prompt_template"])
-            ])
-    
-class LLMReviewClient:
-    """LLM审查客户端"""
-
-    def __init__(self):
-        """初始化LLM审查客户端,使用通用模型底座"""
-        self.model_client = generate_model_client
-
-    async def review_basis(self, Message: str, trace_id: str = None) -> str:
-        try:
-            logger.info(f" 模型调用准备阶段: trace_id={trace_id}")
-
-            # 使用 function_name 从 model_setting.yaml 加载模型配置
-            messages = Message.format_messages() if hasattr(Message, 'format_messages') else Message
-            response = await self.model_client.get_model_generate_invoke(
-                trace_id=trace_id or "ref_basis_review",
-                messages=messages if isinstance(messages, list) else None,
-                prompt=messages if isinstance(messages, str) else None,
-                function_name="completeness_review_generate"
-            )
-            return response
-
-        except Exception as e:
-            logger.error(f" 模型调用准备阶段失败: {e}")
-            # 返回空JSON数组字符串以防解析崩溃
-            return "[]"
+            logger.error(f"处理 LLM 响应失败: {str(e)}")
+            return [
+                {
+                    "check_item": check_name,
+                    "chapter_code": chapter_code,
+                    "check_item_code": check_item_code,
+                    "check_result": {"error": str(e)},
+                    "exist_issue": True,
+                    "risk_info": {"risk_level": "medium"},
+                }
+            ]
 
 
 class BasisReviewService:
-    """编制依据审查服务核心类"""
+    """编制依据规范性审查服务,采用纯格式规则检查"""
 
     def __init__(self, max_concurrent: int = 4):
-        self.search_engine = BasisSearchEngine()
-        self.llm_client = LLMReviewClient()
         self.response_processor = StandardizedResponseProcessor()
-        fresh_prompt_loader = PromptLoader()
-        self.message_builder = MessageBuilder(fresh_prompt_loader)
         self.max_concurrent = max_concurrent
         self._semaphore = None
 
     async def __aenter__(self):
-        """异步上下文管理器入口"""
         if self._semaphore is None:
             self._semaphore = asyncio.Semaphore(self.max_concurrent)
         return self
 
     async def __aexit__(self, exc_type, exc_val, exc_tb):
-        """异步上下文管理器出口"""
         return False
 
     async def review_batch(
@@ -244,294 +86,230 @@ class BasisReviewService:
         min_score: float = 0.3,
         top_k_each: int = 3,
     ) -> List[Dict[str, Any]]:
-        """异步批次审查(通常3条)"""
+        """异步批次审查(兼容保留旧参数,但不再使用检索能力)"""
+        del collection_name, filters, min_score, top_k_each
+
         basis_items = [x for x in (basis_items or []) if isinstance(x, str) and x.strip()]
         if not basis_items:
             return []
 
         async with self._semaphore:
             try:
-                # 第一步:搜索编制依据并通过match_reference_files过滤
-                search_tasks = []
-                for basis in basis_items:
-                    task = asyncio.create_task(
-                        self._async_search_basis(basis, collection_name, top_k_each)
-                    )
-                    search_tasks.append(task)
-
-                # 等待所有搜索完成
-                search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
-
-                grouped_candidates = []
-                for i, result in enumerate(search_results):
-                    if isinstance(result, Exception):
-                        logger.error(f"搜索失败 '{basis_items[i]}': {result}")
-                        grouped_candidates.append([])
-                    else:
-                        # result 是 List[dict],需要遍历
-                        texts = [item["text_content"] for item in result if "text_content" in item]
-                        grouped_candidates.append(texts)
-                
-                # 获取match_reference_files的结果并过滤
-                match_result = await match_reference_files(reference_text=grouped_candidates, review_text=basis_items)
-                # 解析JSON并过滤:same_name_current和exact_match_info都是""的项过滤掉
-                try:
-                    match_data = json.loads(match_result)
-                    # 提取items字段(match_reference_files返回{items: [...]}格式)
-                    items = match_data.get('items', match_data) if isinstance(match_data, dict) else match_data
-                    filtered_data = [item for item in items if not (item.get('same_name_current') == "" and item.get('exact_match_info') == "")]
-                    # 从过滤后的数据中提取review_item用于后续检查
-                    filtered_basis_items = [item.get('review_item') for item in filtered_data if item.get('review_item')]
-                    basis_items_to_check = filtered_basis_items if filtered_basis_items else []
-                    logger.info(f"过滤后参与检查的编制依据: {len(basis_items_to_check)}/{len(basis_items)}")
-                except (json.JSONDecodeError, TypeError) as e:
-                    logger.warning(f"过滤match_reference_files结果时出错: {e}")
-                    # 如果解析失败,使用原始结果
-                    basis_items_to_check = []
-                
-                # 如果没有过滤出数据,直接返回空结果
-                if not basis_items_to_check:
-                    logger.info(f"过滤后没有符合条件的编制依据,跳过后续检查")
-                    return []
-                
-                # 第二步:调用标点符号检查器
-                checker_result = await check_punctuation(basis_items_to_check)
-                print(checker_result)
-                
-                # 第三步:调用结果处理器,生成详细的问题分析报告
+                checker_result = await check_punctuation(basis_items)
                 processor_result = await process_punctuation_results(checker_result)
-                print("\n【第二步】问题分析报告输出:")
-                print(processor_result)
-                
-                # 第四步:转换为标准格式
+
                 standardized_result = self.response_processor.process_llm_response(
-                    processor_result, 
-                    "reference_check", 
+                    processor_result,
+                    "reference_check",
                     "basis",
-                    "basis_reference_check"
+                    "basis_reference_check",
                 )
 
-                # 统计问题数量
-                issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
-                logger.info(f"编制依据批次审查完成:总计 {len(basis_items_to_check)} 项,发现问题 {issue_count} 项")
+                issue_count = sum(
+                    1 for item in standardized_result if item.get("exist_issue", False)
+                )
+                logger.info(
+                    f"编制依据批次审查完成:总计 {len(basis_items)} 项,发现问题 {issue_count} 项"
+                )
 
                 return standardized_result
 
             except Exception as e:
-                logger.error(f" 批次处理失败: {e}")
-                return [{
-                    "check_item": "reference_check",
-                    "chapter_code": "basis",
-                    "check_item_code": "basis_reference_check",
-                    "check_result": {"error": str(e), "basis_items": basis_items},
-                    "exist_issue": True,
-                    "risk_info": {"risk_level": "high"}
-                }]
-
-    async def _async_search_basis(
+                logger.error(f"批次处理失败: {e}")
+                return [
+                    {
+                        "check_item": "reference_check",
+                        "chapter_code": "basis",
+                        "check_item_code": "basis_reference_check",
+                        "check_result": {"error": str(e), "basis_items": basis_items},
+                        "exist_issue": True,
+                        "risk_info": {"risk_level": "high"},
+                    }
+                ]
+
+    async def review_all(
         self,
-        basis: str,
-        collection_name: str,
-        top_k_each: int
-    ) -> List[dict]:
-        """异步搜索单个编制依据(Hybrid Search)"""
-        try:
-            loop = asyncio.get_running_loop()
-            func = partial(
-                self.search_engine.hybrid_search,
-                collection_name=collection_name,
-                query_text=basis,
-                top_k=top_k_each,
-                ranker_type="weighted",
-                dense_weight=0.3,
-                sparse_weight=0.7
-            )
-            retrieved = await loop.run_in_executor(None, func)
-            logger.info(f" 搜索 '{basis}' -> 找到 {len(retrieved or [])} 个结果")
-            return retrieved or []
-        except Exception as e:
-            logger.error(f" 搜索失败 '{basis}': {e}")
-            return []
+        basis_items: BasisItems,
+        collection_name: str = "first_bfp_collection_status",
+        progress_manager=None,
+        callback_task_id: str = None,
+    ) -> List[List[Dict[str, Any]]]:
+        """异步批量审查所有编制依据(保留 collection_name 参数用于兼容)"""
+        del collection_name
 
-    async def review_all(self, basis_items: BasisItems, collection_name: str = "first_bfp_collection_status",
-                        progress_manager=None, callback_task_id: str = None) -> List[List[Dict[str, Any]]]:
-        """异步批量审查所有编制依据(BasisItems 入参)"""
         if not basis_items or not getattr(basis_items, "items", None):
             return []
-        
+
         items = [item.raw for item in basis_items.items if getattr(item, "raw", None)]
         if not items:
             return []
 
         start_time = time.time()
-        total_batches = (len(items) + 2) // 3  # 计算总批次数
-        
-        # 发送开始审查的SSE推送(使用独立命名空间,避免与主流程进度冲突)
+        total_batches = (len(items) + 2) // 3
+
         if progress_manager and callback_task_id:
             try:
                 await progress_manager.update_stage_progress(
                     callback_task_id=callback_task_id,
-                    stage_name="编制依据审查-子任务",  # 独立命名空间
+                    stage_name="编制依据审查-子任务",
                     status="processing",
                     message=f"开始编制依据审查,共{len(items)}项编制依据",
                     overall_task_status="processing",
-                    event_type="processing"
-                    # 不设置 current,避免覆盖主流程进度
+                    event_type="processing",
                 )
             except Exception as e:
-                logger.error(f"SSE推送开始消息失败: {e}")
+                logger.error(f"SSE 推送开始消息失败: {e}")
 
-        # 分批处理
         batches = []
         for i in range(0, len(items), 3):
-            batch = items[i:i + 3]
-            batches.append(batch)
+            batches.append(items[i:i + 3])
 
-        # 异步并发执行所有批次,使用回调处理SSE推送
-        async def process_batch_with_callback(batch_index: int, batch: List[str]) -> List[Dict[str, Any]]:
-            """处理单个批次并执行SSE回调"""
+        async def process_batch_with_callback(
+            batch_index: int, batch: List[str]
+        ) -> List[Dict[str, Any]]:
             try:
-                # 执行单个批次审查
-                result = await self.review_batch(batch, collection_name)
-
-                # 统计当前批次结果
-                batch_standard_count = 0
-                for item in result:
-                    if isinstance(item, dict) and item.get('is_standard', False):
-                        batch_standard_count += 1
+                result = await self.review_batch(batch)
+                batch_issue_count = sum(
+                    1 for item in result if isinstance(item, dict) and item.get("exist_issue", False)
+                )
 
-                # 立即推送当前批次完成的SSE消息(使用独立命名空间)
-                logger.info(f"批次{batch_index + 1}完成,准备推送SSE")
                 if progress_manager and callback_task_id:
                     try:
                         await progress_manager.update_stage_progress(
                             callback_task_id=callback_task_id,
-                            stage_name=f"编制依据审查-子任务-批次{batch_index + 1}",  # 独立命名空间
+                            stage_name=f"编制依据审查-子任务-批次{batch_index + 1}",
                             status="processing",
-                            message=f"完成第{batch_index + 1}/{total_batches}批次编制依据审查,{len(batch)}项,其中{batch_standard_count}项为标准",
+                            message=(
+                                f"完成第{batch_index + 1}/{total_batches}批次编制依据审查,"
+                                f"{len(batch)}项,发现问题{batch_issue_count}项"
+                            ),
                             overall_task_status="processing",
                             event_type="processing",
-                            issues=result  # 推送该批次的审查结果
-                            # 不设置 current,避免覆盖主流程进度
+                            issues=result,
                         )
-                        logger.info(f"批次{batch_index + 1} SSE推送成功")
                     except Exception as e:
-                        logger.error(f"SSE推送批次{batch_index + 1}结果失败: {e}")
+                        logger.error(f"SSE 推送批次{batch_index + 1}结果失败: {e}")
 
                 return result
 
             except Exception as e:
-                logger.error(f" 批次 {batch_index} 处理失败: {e}")
-                error_result = [{"name": name, "is_standard": False, "status": "", "meg": f"批次处理失败: {str(e)}"}
-                                for name in batch]
+                logger.error(f"批次 {batch_index} 处理失败: {e}")
+                error_result = [
+                    {
+                        "check_item": "reference_check",
+                        "chapter_code": "basis",
+                        "check_item_code": "basis_reference_check",
+                        "check_result": {"error": str(e), "basis_items": batch},
+                        "exist_issue": True,
+                        "risk_info": {"risk_level": "high"},
+                    }
+                ]
 
-                # 即使失败也要推送结果(使用独立命名空间)
                 if progress_manager and callback_task_id:
                     try:
                         await progress_manager.update_stage_progress(
                             callback_task_id=callback_task_id,
-                            stage_name=f"编制依据审查-子任务-批次{batch_index + 1}",  # 独立命名空间
+                            stage_name=f"编制依据审查-子任务-批次{batch_index + 1}",
                             status="processing",
                             message=f"第{batch_index + 1}/{total_batches}批次处理失败",
                             overall_task_status="processing",
                             event_type="processing",
-                            issues=error_result
-                            # 不设置 current,避免覆盖主流程进度
+                            issues=error_result,
                         )
                     except Exception as push_e:
-                        logger.error(f"SSE推送失败批次{batch_index + 1}结果失败: {push_e}")
+                        logger.error(f"SSE 推送失败批次{batch_index + 1}结果失败: {push_e}")
 
                 return error_result
 
-        # 创建所有批次的异步任务
         batch_tasks = []
         for i, batch in enumerate(batches):
-            task = process_batch_with_callback(i, batch)
-            batch_tasks.append(task)
+            batch_tasks.append(process_batch_with_callback(i, batch))
 
-        # 并发执行所有批次
-        logger.info(f"开始并发执行{total_batches}个批次编制依据审查")
+        logger.info(f"开始并发执行 {total_batches} 个批次编制依据审查")
         processed_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
 
-        # 处理异常结果并统计
         total_items = 0
         issue_items = 0
         successful_batches = 0
-
-        # 重新构建结果列表,过滤异常
         final_results = []
+
         for i, result in enumerate(processed_results):
             if isinstance(result, Exception):
-                logger.error(f" 批次 {i} 返回异常: {result}")
+                logger.error(f"批次 {i} 返回异常: {result}")
                 error_batch = batches[i] if i < len(batches) else []
-                error_result = [{
-                    "check_item": "reference_check",
-                    "chapter_code": "basis",
-                    "check_item_code": "basis_reference_check",
-                    "check_result": {"error": str(result), "basis_items": error_batch},
-                    "exist_issue": True,
-                    "risk_info": {"risk_level": "high"}
-                }]
-                final_results.append(error_result)
+                final_results.append(
+                    [
+                        {
+                            "check_item": "reference_check",
+                            "chapter_code": "basis",
+                            "check_item_code": "basis_reference_check",
+                            "check_result": {"error": str(result), "basis_items": error_batch},
+                            "exist_issue": True,
+                            "risk_info": {"risk_level": "high"},
+                        }
+                    ]
+                )
             else:
                 final_results.append(result)
                 successful_batches += 1
 
-                # 过滤空批次结果,避免出现 []
         final_results = [res for res in final_results if res]
 
-        # 统计总结果
         for result in final_results:
             for item in result:
                 total_items += 1
-                if isinstance(item, dict) and item.get('exist_issue', False):
+                if isinstance(item, dict) and item.get("exist_issue", False):
                     issue_items += 1
 
         logger.info(f"并发执行完成,成功批次: {successful_batches}/{total_batches}")
 
-
-        # 发送完成审查的SSE推送(使用独立命名空间,不设置current避免覆盖主流程进度)
         elapsed_time = time.time() - start_time
         if progress_manager and callback_task_id:
             try:
                 await progress_manager.update_stage_progress(
                     callback_task_id=callback_task_id,
-                    stage_name="编制依据审查-子任务",  # 独立命名空间
+                    stage_name="编制依据审查-子任务",
                     status="processing",
-                    message=f"编制依据审查完成,共{total_items}项,发现问题{issue_items}项,耗时{elapsed_time:.2f}秒",
+                    message=(
+                        f"编制依据审查完成,共{total_items}项,发现问题{issue_items}项,"
+                        f"耗时{elapsed_time:.2f}秒"
+                    ),
                     overall_task_status="processing",
-                    event_type="processing"
-                    # 不设置 current,避免覆盖主流程进度
+                    event_type="processing",
                 )
             except Exception as e:
-                logger.error(f"SSE推送完成消息失败: {e}")
+                logger.error(f"SSE 推送完成消息失败: {e}")
 
-        logger.info(f" 异步审查完成,耗时: {elapsed_time:.4f} 秒")
-        logger.info(f" 总编制依据: {total_items}, 问题项: {issue_items}, 成功批次: {successful_batches}/{total_batches}")
-        print("final_results:\n")
-        print(final_results)    
+        logger.info(f"异步审查完成,耗时: {elapsed_time:.4f} 秒")
+        logger.info(
+            f"总编制依据: {total_items}, 问题项: {issue_items}, 成功批次: {successful_batches}/{total_batches}"
+        )
         return final_results
 
 
-# 便捷函数
-async def review_basis_batch_async(basis_items: List[str], max_concurrent: int = 4) -> List[Dict[str, Any]]:
-    """异步批次审查便捷函数"""
+async def review_basis_batch_async(
+    basis_items: List[str], max_concurrent: int = 4
+) -> List[Dict[str, Any]]:
     async with BasisReviewService(max_concurrent=max_concurrent) as service:
         return await service.review_batch(basis_items)
 
 
-async def review_all_basis_async(basis_items: BasisItems, max_concurrent: int = 4) -> List[List[Dict[str, Any]]]:
-    """异步全部审查便捷函数(BasisItems 入参)"""
+async def review_all_basis_async(
+    basis_items: BasisItems, max_concurrent: int = 4
+) -> List[List[Dict[str, Any]]]:
     async with BasisReviewService(max_concurrent=max_concurrent) as service:
         return await service.review_all(basis_items)
 
+
 if __name__ == "__main__":
-    # 简单测试
-    test_basis_items = BasisItems(items=[
-        BasisItem(title="坠落防护水平生命线装置", suffix="GB 38454", raw="《坠落防护水平生命线装置》GB 38454"),
-        BasisItem(title="电力高处作业防坠器", suffix="DL/T 1147", raw="《电力高处作业防坠器》DL/T 1147"),
-        BasisItem(title="坠落防护挂点装置", suffix="GB 30862", raw="《坠落防护挂点装置》GB 30862"),
-        BasisItem(title="混凝土结构设计规范", suffix="GB 50010-2010", raw="《混凝土结构设计规范》GB 50010-2010"),
-        BasisItem(title="建筑施工组织设计规范", suffix="GB/T 50502-2015", raw="《建筑施工组织设计规范》GB/T 50502-2015"),
-    ])
-    result = asyncio.run(review_all_basis_async(test_basis_items))
+    test_basis_items = BasisItems(
+        items=[
+            BasisItem(title="坠落防护水平生命线装置", suffix="GB 38454", raw="《坠落防护水平生命线装置》GB 38454"),
+            BasisItem(title="电力高处作业防坠器", suffix="DL/T 1147", raw="《电力高处作业防坠器》DL/T 1147"),
+            BasisItem(title="坠落防护挂点装置", suffix="GB 30862", raw="《坠落防护挂点装置》GB 30862"),
+            BasisItem(title="混凝土结构设计规范", suffix="GB 50010-2010", raw="《混凝土结构设计规范》GB 50010-2010"),
+            BasisItem(title="建筑施工组织设计规范", suffix="GB/T 50502-2015", raw="《建筑施工组织设计规范》GB/T 50502-2015"),
+        ]
+    )
+    result = asyncio.run(review_all_basis_async(test_basis_items))
+    print(result)

+ 10 - 1
core/construction_review/component/reviewers/utils/directory_extraction.py

@@ -356,8 +356,17 @@ if __name__ == "__main__":
 (5)本合同段工程现场踏勘、调查所获得的现场情况、自然环境、人文环境、市场环境等参考资料;
 """
 
+    demo2 = """一)编制依据
+    (1)相关法律法规
+    混凝土结构设计规范(GB 50010-2010)
+    《混凝土结构设计规范》GB 50010-2010
+    《《混凝土结构设计规范》(123456)
+    《混凝土结构设》计规范(GB 50010-2010)
+    (GB 50010-2010)《混凝土结构设计规范》
+"""
+
     async def _demo_run():
-        result = await extract_basis_with_langchain_qwen(None, None, demo)
+        result = await extract_basis_with_langchain_qwen(None, None, demo2, "basis")
         print(f"\n提取到 {len(result.items)} 条编制依据:")
         for idx, item in enumerate(result.items, 1):
             print(f"\n{idx}. {item.model_dump()}")

+ 18 - 152
core/construction_review/component/reviewers/utils/punctuation_checker.py

@@ -3,23 +3,7 @@
 
 import json
 import re
-from typing import List, Optional
-
-from pydantic import BaseModel, Field, ValidationError
-from langchain_core.prompts import ChatPromptTemplate
-from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser
-from foundation.ai.agent.generate.model_generate import generate_model_client
-
-
-# ===== 1) 定义结构 =====
-class PunctuationResult(BaseModel):
-    original_text: str = Field(..., description="审查的规范原文,与输入完全一致")
-    title_mark_status: bool = Field(..., description="书名号使用是否正确,true表示正确,false表示错误")
-    bracket_status: Optional[bool] = Field(..., description="括号使用是否正确,true表示正确,false表示错误,null表示没有编号")
-
-
-class PunctuationResults(BaseModel):
-    items: List[PunctuationResult]
+from typing import List
 
 
 # ===== 1.5) 辅助函数:提取和验证编号 =====
@@ -90,11 +74,13 @@ def _extract_bracket_content_smart(text: str) -> tuple:
 def _is_valid_reference_number(number_text: str) -> bool:
     """
     验证编号是否符合规范要求:
-    1. 接受版本号格式(如"2024版"、"2023年")- 企业内部文件常用
-    2. 接受标准编号格式(英文+数字,如GB50010-2010)
-    3. 接受法规编号格式(中文+数字,如令第393号)
+    1. 接受纯数字格式(如"123456")
+    2. 接受版本号格式(如"2024版"、"2023年")- 企业内部文件常用
+    3. 接受标准编号格式(英文+数字,如GB50010-2010)
+    4. 接受法规编号格式(中文+数字,如令第393号)
     
     有效示例:
+    - 123456(纯数字)
     - GB50010-2010、Q/CR9230-2016(英文+数字)
     - 令第393号、第37号令(中文+数字)
     - 2024版、2023年(版本号格式,企业内部文件)
@@ -102,7 +88,6 @@ def _is_valid_reference_number(number_text: str) -> bool:
     
     无效示例:
     - 纯空格、纯特殊字符
-    - 无意义的数字组合
     """
     if not number_text or not number_text.strip():
         return False
@@ -115,6 +100,10 @@ def _is_valid_reference_number(number_text: str) -> bool:
     has_chinese = bool(re.search(r'[\u4e00-\u9fff]', text))
     # 检查是否包含数字
     has_digit = bool(re.search(r'\d', text))
+
+    # 情况0: 纯数字格式
+    if re.fullmatch(r'\d+', text):
+        return True
     
     # 情况1: 标准编号格式(英文+数字)
     if has_english and has_digit:
@@ -141,123 +130,6 @@ def _is_valid_reference_number(number_text: str) -> bool:
     # 有效编号:必须有数字,且满足上述任一格式
     return has_digit and (has_english or has_chinese)
 
-
-# ===== 2) SYSTEM Prompt =====
-SYSTEM = """
-你是【编制依据格式检查专家】。
-
-【任务】
-检查编制依据中规范名称和编号的标点符号使用是否正确。
-
-【判定规则】
-1. title_mark_status:检查《》是否正确包裹规范名称
-   - 规范名称必须完整被《》包裹,不能遗漏部分文字
-   - 书名号必须成对出现
-
-2. bracket_status:检查编号是否被括号正确包裹
-   - 有编号且被括号(中文或英文)完整包裹 → true
-   - 有编号但未被括号包裹 → false
-   - 没有编号 → null
-
-【编号说明】
-编号可以是多种形式:
-- 标准编号:GB50010-2010、GB/T50502、JGJ80-2016等
-- 法规编号:令第393号、第37号令、国务院令第493号等
-- 只要编号与规范名称匹配且真实存在即可
-
-【输出要求】
-- 为每个输入文本输出一个检查结果
-- 确保输出数量与输入一致
-- original_text 必须与输入完全一致
-"""
-
-HUMAN = """
-请检查以下文本中书名号和括号的**内容是否全部被包裹**,以及是否有编号。
-
-【判断原则】
-- 仅检查包裹的**完整性**:书名号是否包裹了规范名称的全部内容;括号是否包裹了内容的全部内容
-- 中文括号()和英文括号()混用视为正常,不区分
-- 若内容在符号外遗漏,或符号包裹了多余内容,则判定为false
-- **<>、<>不是书名号**,是文本内容的一部分,忽略它们
-- **括号内的任何内容都视为编号/版本信息**,不判断编号内容是否正确
-
-【编号说明】
-编号可以是多种形式,包括但不限于:
-- GB50010-2010、GB/T50502(标准编号)
-- 令第393号、第37号令(法规编号)
-- 只要编号被括号包裹且与规范名称对应即可
-
-【示例】
-示例1:《建筑抗震设计规范》(GB 50011-2001)
-- 书名号包裹完整 → title_mark_status=true
-- 编号被括号包裹 → bracket_status=true
-
-示例2:《建筑抗震设计规范》GB 50011-2001
-- 书名号包裹完整 → title_mark_status=true
-- 编号未被括号包裹 → bracket_status=false
-
-示例3:《建筑抗震设计规范》(令第X号)
-- 书名号包裹完整 → title_mark_status=true
-- 编号被括号包裹 → bracket_status=true
-
-示例4:《建筑抗震设》计规范 (GB 50011-2001)
-- 规范名称是"建筑抗震设计规范",但只有"建筑抗震设"被包裹 → title_mark_status=false
-- 编号被包裹 → bracket_status=true
-
-示例5:《关于实施<危险性较大的分部分项工程安全管理规定>有关问题的通知》(建办质〔2018〕31号)
-- 书名号包裹完整(<>是内容的一部分,不是书名号)→ title_mark_status=true
-- 编号被括号包裹 → bracket_status=true
-
-示例6:《专项施工方案实施管理细则》(2024版)
-- 书名号包裹完整 → title_mark_status=true
-- 括号内有内容(2024版),视为编号 → bracket_status=true
-
-【重要区分】
-- "没有编号"(如只有《规范名称》)→ bracket_status = null
-- "有编号但无括号"(如《规范名称》GB1234)→ bracket_status = false
-- "编号被括号包裹"(如《规范名称》(GB1234))→ bracket_status = true
-
-【待检查文本】
-{items}
-
-【输出格式要求】
-{format_instructions}
-/no_think
-"""
-
-# ===== 3) Output Parser =====
-parser = PydanticOutputParser(pydantic_object=PunctuationResults)
-
-# ===== 4) Prompt =====
-prompt = ChatPromptTemplate.from_messages([
-    ("system", SYSTEM),
-    ("human", HUMAN)
-])
-
-# ===== 5) LLM Client =====
-model_client = generate_model_client
-
-
-# ===== 6) 提取第一个 JSON =====
-def extract_first_json(text: str) -> dict:
-    """从任意模型输出中提取第一个完整 JSON 对象 { ... }"""
-    start = text.find("{")
-    if start == -1:
-        raise ValueError("未找到 JSON 起始 '{'")
-
-    depth = 0
-    for i in range(start, len(text)):
-        ch = text[i]
-        if ch == "{":
-            depth += 1
-        elif ch == "}":
-            depth -= 1
-            if depth == 0:
-                return json.loads(text[start:i + 1])
-
-    raise ValueError("JSON 花括号未闭合")
-
-
 # ===== 7) 核心方法 =====
 async def check_punctuation(items: List[str]) -> str:
     """
@@ -309,48 +181,44 @@ async def check_punctuation(items: List[str]) -> str:
             })
             continue
         
-        # 情况4:两者都存在,检查位置关系
-        # 检查括号是否在书名号之后(找最后一个》之后的第一个括号)
         bracket_after_title = True
         if bracket_pair_ok and title_pair_ok:
             last_title_pos = max(text.rfind("《"), text.rfind("》"))
-            # 从最后一个》之后开始找第一个括号(避免书名内的括号干扰)
             text_after_title = text[last_title_pos + 1:]
             first_bracket_pos = float('inf')
             if "(" in text_after_title:
                 first_bracket_pos = text_after_title.find("(")
             if "(" in text_after_title:
                 first_bracket_pos = min(first_bracket_pos, text_after_title.find("("))
-            # 检查是否在最后一个》之后找到了括号
             bracket_after_title = first_bracket_pos != float('inf')
 
-        # 【修改】使用智能提取逻辑处理括号
         bracket_content, is_pair, has_extra_chars = _extract_bracket_content_smart(text)
-        
-        # 检查是否找到括号内容
+
         if bracket_content is None:
-            # 没有括号
             pre_results.append({
                 "original_text": text,
                 "title_mark_status": bool(title_pair_ok),
                 "bracket_status": None
             })
+        elif not bracket_after_title:
+            pre_results.append({
+                "original_text": text,
+                "title_mark_status": bool(title_pair_ok),
+                "bracket_status": False
+            })
         elif not is_pair:
-            # 括号不成对(缺少右括号或左括号)
             pre_results.append({
                 "original_text": text,
                 "title_mark_status": bool(title_pair_ok),
                 "bracket_status": False
             })
         elif has_extra_chars:
-            # 括号成对,但括号后有多余字符(如分号、数字等)
             pre_results.append({
                 "original_text": text,
                 "title_mark_status": bool(title_pair_ok),
                 "bracket_status": False
             })
         elif not _is_valid_reference_number(bracket_content):
-            # 括号成对且无多余字符,但编号格式不正确(纯数字或无效格式)
             pre_results.append({
                 "original_text": text,
                 "title_mark_status": bool(title_pair_ok),
@@ -359,15 +227,13 @@ async def check_punctuation(items: List[str]) -> str:
                 "invalid_number_content": bracket_content
             })
         else:
-            # 括号成对、无多余字符、编号格式正确
             pre_results.append({
                 "original_text": text,
-                "title_mark_status": True,
+                "title_mark_status": bool(title_pair_ok),
                 "bracket_status": True,
                 "invalid_number_format": False
             })
 
-    # 直接返回预检结果
     return json.dumps(pre_results, ensure_ascii=False, indent=2)
 
 

+ 48 - 401
core/construction_review/component/reviewers/utils/punctuation_result_processor.py

@@ -2,446 +2,93 @@
 # -*- coding: utf-8 -*-
 
 import json
-import asyncio
-import re
-from typing import List, Literal, Optional
 
-from pydantic import BaseModel, Field, ValidationError
-from langchain_core.prompts import ChatPromptTemplate
-from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser
-from langchain_openai import ChatOpenAI
 
-# 导入多模型编号生成器和验证器
-try:
-    from .reference_number_generator import (
-        validate_reference_number,
-        generate_reference_number,
-        ModelVoteResult,
-        ValidationResult,
-        _extract_reference_number
-    )
-except ImportError:
-    from reference_number_generator import (
-        validate_reference_number,
-        generate_reference_number,
-        ModelVoteResult,
-        ValidationResult,
-        _extract_reference_number
-    )
-
-
-# ===== 1) 定义结构 =====
-RiskLevel = Literal["无风险", "中风险"]
-
-
-class PunctuationIssueResult(BaseModel):
-    issue_point: str = Field(..., description="问题点描述")
-    location: str = Field(..., description="审查内容,与输入完全一致")
-    suggestion: str = Field(..., description="修改建议,可执行动作")
-    reason: str = Field(..., description="问题的原因分析")
-    risk_level: RiskLevel = Field(..., description='风险水平,只能是 "无风险" / "中风险"')
-
-
-class PunctuationIssueResults(BaseModel):
-    items: List[PunctuationIssueResult]
-
-
-# ===== 2) SYSTEM Prompt =====
-SYSTEM = """
-你是【编制依据格式问题分析专家】。
-
-【任务】
-根据标点符号检查结果,生成详细的问题分析报告。
-
-【重要说明(必须严格遵守)】
-- location 字段必须与输入的 original_text 完全一致(一字不差)
-- 根据 title_mark_status 和 bracket_status 的值判断问题类型
-- 提供具体的修改建议和原因分析
-
-【编号说明】
-编号可以是多种形式:
-- 标准编号:GB50010-2010、GB/T50502等
-- 法规编号:令第393号、第37号令、国务院令第493号等
-- 只要编号与规范名称匹配且真实存在,即为有效
-
-【输出要求】
-- 为每个检查结果输出一个详细的问题分析
-- 确保输出数量与输入一致
-- location 必须与 original_text 完全一致
-- 严格按照判定规则生成内容
-"""
-
-HUMAN = """
-请根据以下标点符号检查结果,生成详细的问题分析报告:
-
-【判定规则】
-
-首先明确 bracket_status 的三种状态含义:
-- bracket_status = true:有编号且被括号完整包裹
-- bracket_status = false:有编号但未被括号包裹
-- bracket_status = null:没有编号
-
-【问题类型判定】
-
-当 title_mark_status = true 且 bracket_status = true:
-- issue_point:编制依据格式正确
-- reason:规范名称和编号的标点符号使用规范
-- suggestion:无
-- risk_level:无风险
-
-当 title_mark_status = true 且 bracket_status = null:
-- issue_point:编制依据格式错误
-- reason:缺少规范编号
-- suggestion:补充规范编号,正确格式:《规范名称》(编号)
-- risk_level:中风险
-
-当 title_mark_status = true 且 invalid_number_format = true:
-- issue_point:编制依据格式错误
-- reason:格式错误!当前编号为纯数字
-- suggestion:规范编号应为英文加数字或中文加数字,而不是纯数字
-- risk_level:中风险
-
-当 title_mark_status = true 且 bracket_status = false 且 invalid_number_format 不存在或为 false:
-- issue_point:编制依据格式错误
-- reason:格式错误!正确格式:《规范名称》(编号)
-- suggestion:将规范编号用括号包裹,正确格式:《规范名称》(编号)
-- risk_level:中风险
-
-当 title_mark_status != true 时:
-- issue_point:编制依据格式错误
-- reason:格式错误!正确格式:《规范名称》(编号)
-- suggestion:将规范名称用书名号《》包裹,正确格式:《规范名称》(编号)
-- risk_level:中风险
-
-【标点符号检查结果】
-{check_results}
-
-【输出格式要求】
-{format_instructions}
-/no_think
-"""
-
-# ===== 3) Output Parser =====
-parser = PydanticOutputParser(pydantic_object=PunctuationIssueResults)
-
-# ===== 4) Prompt =====
-prompt = ChatPromptTemplate.from_messages([
-    ("system", SYSTEM),
-    ("human", HUMAN)
-])
-
-# ===== 5) LLM =====
-llm = ChatOpenAI(
-    model="qwen3-30b",
-    base_url="http://192.168.91.253:8003/v1",
-    api_key="sk-123456",
-    temperature=0,
-)
-
-
-# ===== 6) 提取第一个 JSON =====
-def extract_first_json(text: str) -> dict:
-    """从任意模型输出中提取第一个完整 JSON 对象 { ... }"""
-    start = text.find("{")
-    if start == -1:
-        raise ValueError("未找到 JSON 起始 '{'")
-
-    depth = 0
-    for i in range(start, len(text)):
-        ch = text[i]
-        if ch == "{":
-            depth += 1
-        elif ch == "}":
-            depth -= 1
-            if depth == 0:
-                return json.loads(text[start:i + 1])
-
-    raise ValueError("JSON 花括号未闭合")
-
-
-# ===== 6.5) 辅助函数:提取规范名称 =====
-def _extract_regulation_name(text: str) -> str:
-    """从原文中提取规范名称(书名号内的内容)"""
-    match = re.search(r'《([^《》]+)》', text)
-    if match:
-        return match.group(1).strip()
-    return text.strip()
-
-
-# ===== 6.6) 辅助函数:提取现有编号 =====
-def _extract_existing_number(text: str) -> Optional[str]:
-    """
-    从原文中提取现有编号(智能处理嵌套括号)n    
-    规则:
-    1. 找到书名号后的第一个配对括号
-    2. 验证括号内容是否符合编号格式(英文+数字 或 中文+数字)
-    3. 返回有效的编号内容
-    """
-    # 统一括号类型
-    text_normalized = text.replace('(', '(').replace(')', ')')
-    
-    # 找到书名号后的文本
-    last_title_end = max(text_normalized.rfind('《'), text_normalized.rfind('》'))
-    if last_title_end == -1:
-        return None
-    
-    text_after_title = text_normalized[last_title_end + 1:]
-    
-    # 使用栈找到第一个配对的括号
-    stack = []
-    for i, char in enumerate(text_after_title):
-        if char == '(':
-            stack.append(i)
-        elif char == ')':
-            if stack:
-                start = stack.pop()
-                if len(stack) == 0:  # 最外层配对
-                    content = text_after_title[start + 1:i].strip()
-                    # 验证是否为有效编号格式
-                    if _is_valid_number_format(content):
-                        return content
-    
-    # 如果没有找到有效的配对括号,尝试提取第一个左括号后的内容
-    first_left = text_after_title.find('(')
-    if first_left != -1:
-        # 找到第一个右括号的位置,限制提取范围
-        first_right = text_after_title.find(')', first_left + 1)
-        next_left = text_after_title.find('(', first_left + 1)
-        
-        # 确定提取的结束位置:右括号或下一个左括号,取先出现的
-        end_pos = None
-        if first_right != -1 and next_left != -1:
-            end_pos = min(first_right, next_left)
-        elif first_right != -1:
-            end_pos = first_right
-        elif next_left != -1:
-            end_pos = next_left
-        
-        if end_pos is not None:
-            content = text_after_title[first_left + 1:end_pos].strip()
-        else:
-            content = text_after_title[first_left + 1:].strip()
-            
-        if _is_valid_number_format(content):
-            return content
-    
-    return None
-
-
-def _is_valid_number_format(content: str) -> bool:
-    """
-    验证内容是否符合编号格式:
-    1. 接受版本号格式(如"2024版"、"2023年")- 企业内部文件常用
-    2. 接受标准编号格式(英文+数字,如GB50010-2010)
-    3. 接受法规编号格式(中文+数字,如令第393号)
+async def process_punctuation_results(check_results: str, enterprise_items: list = None) -> str:
     """
-    if not content:
-        return False
-    
-    # 检查是否包含英文字母
-    has_english = bool(re.search(r'[a-zA-Z]', content))
-    # 检查是否包含中文字符
-    has_chinese = bool(re.search(r'[\u4e00-\u9fff]', content))
-    # 检查是否包含数字
-    has_digit = bool(re.search(r'\d', content))
-    
-    # 情况1: 标准编号格式(英文+数字)
-    if has_english and has_digit:
-        return True
-    
-    # 情况2: 法规编号格式(中文+数字)
-    if has_chinese and has_digit:
-        chinese_pattern = r'^[\d\s]*[\u4e00-\u9fff]+[\d\s]*[\u4e00-\u9fff]*$'
-        if re.search(chinese_pattern, content):
-            return True
-    
-    # 情况3: 版本号格式(年份+版/年,企业内部文件常用)
-    version_pattern = r'^\d{4}\s*[版年修订]+[版本]?$'
-    if re.match(version_pattern, content):
-        return True
-    
-    # 情况4: 版本号格式(V/VERSION + 数字)
-    v_version_pattern = r'^[vV][\d\.]+|[Vv]ersion\s*[\d\.]+$'
-    if re.match(v_version_pattern, content):
-        return True
-    
-    # 有效编号:必须有数字,且满足上述任一格式
-    return has_digit and (has_english or has_chinese)
+    根据标点符号检查结果生成详细的问题分析报告。
 
+    方案A下,规范性审查只做纯格式检查:
+    - 不校验编号真伪
+    - 不生成推测编号
+    - 不依赖外部模型或向量检索
 
-# ===== 6.7) 辅助函数:生成建议(新流程) =====
-async def _generate_suggestion_with_validation(
-    original_text: str,
-    issue_type: str,
-    base_suggestion: str
-) -> str:
-    """
-    使用新流程生成编号建议:
-    1. 首先验证现有编号是否正确
-    2. 如果验证通过,接受该编号
-    3. 如果验证失败,调用5个模型生成正确编号
+    enterprise_items 参数仅为兼容保留,当前不参与判定。
     """
-    try:
-        regulation_name = _extract_regulation_name(original_text)
-        existing_number = _extract_existing_number(original_text)
-        
-        # 第一步:如果存在编号,先验证是否正确
-        if existing_number and existing_number.strip():
-            validation_result = await validate_reference_number(
-                regulation_name=regulation_name,
-                existing_number=existing_number
-            )
-            
-            if validation_result and validation_result.is_valid:
-                # 验证通过,接受该编号(不再计较是否为字母+数字格式)
-                if issue_type == "missing_bracket":
-                    return f"将规范编号用括号包裹,正确格式:《{regulation_name}》({existing_number})"
-                else:
-                    return f"补充规范编号,正确格式:《{regulation_name}》({existing_number})"
-        
-        # 第二步:验证失败或没有编号,调用5个模型生成
-        existing_info = existing_number if existing_number else ""
-        vote_result = await generate_reference_number(
-            regulation_name=regulation_name,
-            existing_info=existing_info
-        )
-        
-        if vote_result and vote_result.confidence >= 0.5:
-            # 有可信的AI生成结果
-            ai_number = vote_result.generated_number
-            
-            if issue_type == "missing_bracket":
-                return f"将规范编号用括号包裹,正确格式:《{regulation_name}》({ai_number})"
-            else:
-                return f"补充规范编号,正确格式:《{regulation_name}》({ai_number})"
-        else:
-            # AI生成失败,返回基础建议
-            return base_suggestion
-            
-    except Exception:
-        # 异常时返回基础建议
-        return base_suggestion
-
+    del enterprise_items
 
-# ===== 7) 核心方法 =====
-async def process_punctuation_results(check_results: str, enterprise_items: list = None) -> str:
-    """
-    根据标点符号检查结果生成详细的问题分析报告
-    
-    新流程:
-    1. 首先验证现有编号是否正确(调用模型验证)
-    2. 如果验证通过,接受该编号
-    3. 如果验证失败,调用5个模型生成正确编号
-    
-    Args:
-        check_results: 标点符号检查结果的JSON字符串
-        enterprise_items: 企业内部文件列表(在标准库中未找到匹配的文件)
-    """
-    # 首先解析检查结果
     try:
         check_data = json.loads(check_results)
         if not isinstance(check_data, list):
             check_data = [check_data]
     except json.JSONDecodeError:
         check_data = []
-    
-    # 企业内部文件集合(用于快速查找)
-    enterprise_set = set(enterprise_items or [])
-    
-    # 为每个检查结果生成问题分析
+
     results = []
     for item in check_data:
         original_text = item.get("original_text", "")
         title_status = item.get("title_mark_status", False)
         bracket_status = item.get("bracket_status")
-        
-        # 根据状态生成问题分析
+        invalid_number_format = item.get("invalid_number_format", False)
+
         issue_point = "编制依据格式错误"
         risk_level = "中风险"
-        
-        # 判断问题类型并生成建议
-        invalid_number_format = item.get("invalid_number_format", False)
-        invalid_number_content = item.get("invalid_number_content", "")
-        
+
         if title_status is not True:
-            # 书名号问题
             reason = "格式错误!正确格式:《规范名称》(编号)"
             suggestion = "将规范名称用书名号《》包裹,正确格式:《规范名称》(编号)"
         elif bracket_status is None:
-            # 缺少编号
-            # 【修改】判断是否为企业内部文件
-            if original_text in enterprise_set:
-                # 企业内部文件可以没有编号,不是格式问题
-                issue_point = "编制依据格式正确(企业内部文件)"
-                reason = "企业内部文件,无标准规范编号"
-                suggestion = "无"
-                risk_level = "无风险"
-            else:
-                # 标准规范文件缺少编号
-                reason = "缺少规范编号"
-                base_suggestion = "补充规范编号,正确格式:《规范名称》(编号)"
-                # 调用新流程生成建议(验证+生成)
-                suggestion = await _generate_suggestion_with_validation(
-                    original_text, "missing_number", base_suggestion
-                )
+            reason = "缺少规范编号"
+            suggestion = "补充规范编号,正确格式:《规范名称》(编号)"
         elif invalid_number_format:
-            # 括号内有编号,但格式不正确(纯数字)
-            reason = f"格式错误!当前编号为纯数字"
-            suggestion = f"规范编号应为英文加数字或中文加数字,而不是纯数字"
+            reason = "格式错误!当前编号格式无法识别"
+            suggestion = "请补充可识别的规范编号,并保持格式为《规范名称》(编号)"
         elif bracket_status is False:
-            # 有编号但无括号
             reason = "格式错误!正确格式:《规范名称》(编号)"
-            base_suggestion = "将规范编号用括号包裹,正确格式:《规范名称》(编号)"
-            # 调用新流程生成建议(验证+生成)
-            suggestion = await _generate_suggestion_with_validation(
-                original_text, "missing_bracket", base_suggestion
-            )
+            suggestion = "将规范编号用括号包裹,正确格式:《规范名称》(编号)"
         else:
-            # 格式正确:《文件名》(文件编号)
-            # 编制依据格式检查只检查格式,不检查编号内容正确性
-            # 编号是否正确(如是否已废止)由时效性检查处理
             issue_point = "编制依据格式正确"
             reason = "规范名称和编号的标点符号使用规范"
             suggestion = "无"
             risk_level = "无风险"
-        
-        # 构建结果
-        result_item = {
-            "issue_point": issue_point,
-            "location": original_text,
-            "suggestion": suggestion,
-            "reason": reason,
-            "risk_level": risk_level
-        }
-        results.append(result_item)
-    
-    # 返回JSON格式结果
+
+        results.append(
+            {
+                "issue_point": issue_point,
+                "location": original_text,
+                "suggestion": suggestion,
+                "reason": reason,
+                "risk_level": risk_level,
+            }
+        )
+
     return json.dumps(results, ensure_ascii=False, indent=2)
 
 
-# ===== 8) 示例 =====
 if __name__ == "__main__":
     import asyncio
 
     async def main():
-        # 模拟标点符号检查结果
-        check_results = json.dumps([
-            {
-                "original_text": "《建设工程安全生产管理条例》(国务院令第393号)",
-                "title_mark_status": True,
-                "bracket_status": True
-            },
-            {
-                "original_text": "《混凝土结构设计规范》GB50010-2010",
-                "title_mark_status": True,
-                "bracket_status": False
-            },
-            {
-                "original_text": "《起重机械安全规程》",
-                "title_mark_status": True,
-                "bracket_status": None
-            }
-        ], ensure_ascii=False)
+        check_results = json.dumps(
+            [
+                {
+                    "original_text": "《建设工程安全生产管理条例》(国务院令第393号)",
+                    "title_mark_status": True,
+                    "bracket_status": True,
+                },
+                {
+                    "original_text": "《混凝土结构设计规范》GB50010-2010",
+                    "title_mark_status": True,
+                    "bracket_status": False,
+                },
+                {
+                    "original_text": "《起重机械安全规程》",
+                    "title_mark_status": True,
+                    "bracket_status": None,
+                },
+            ],
+            ensure_ascii=False,
+        )
 
         result = await process_punctuation_results(check_results)
         print("\n问题分析结果:")

+ 0 - 641
core/construction_review/component/reviewers/utils/reference_matcher.py

@@ -1,641 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import json
-import asyncio
-import time
-import re
-from typing import List, Optional, Tuple
-from dataclasses import dataclass
-
-from core.construction_review.component.reviewers.utils.reference_number_generator import generate_reference_number, validate_reference_number
-from pydantic import BaseModel, Field, ValidationError
-from langchain_core.prompts import ChatPromptTemplate
-from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser
-from foundation.ai.agent.generate.model_generate import generate_model_client
-from foundation.observability.logger.loggering import review_logger as logger
-
-
-# ===== 1) 定义结构 =====
-class MatchResult(BaseModel):
-    review_item: str = Field(..., description="审查的规范原文,与输入完全一致")
-    has_related_file: bool = Field(..., description="是否有相关文件")
-    has_exact_match: bool = Field(..., description="是否有名称编号都相同的文件")
-    exact_match_info: str = Field("", description="名称编号相同的文件及状态,格式:《名称》(编号)状态为XXX,可为空")
-    same_name_current: str = Field("", description="名称相同的现行文件,格式:《名称》(编号)状态为现行,可为空")
-
-
-class MatchResults(BaseModel):
-    items: List[MatchResult]
-
-
-@dataclass
-class ValidationMatchResult:
-    """验证匹配结果"""
-    review_item: str
-    reference_candidates: List[str]  # 参考文件候选列表
-    is_valid: bool  # 验证是否通过
-    validated_number: Optional[str] = None  # 验证/生成的正确编号
-    status: str = ""  # 状态(现行/废止)
-
-
-# ===== 2) SYSTEM Prompt(用于初步匹配) =====
-SYSTEM = """
-/no_think
-你是【规范文件匹配助手】。
-
-【任务】
-从参考规范库中查找每个审查规范的相关信息,用于后续的编号验证。
-
-【输出要求】
-- 为每个审查规范输出一个匹配结果
-- 确保输出数量与输入的审查规范数量一致
-- review_item 必须与输入完全一致
-- exact_match_info 和 same_name_current 可以为空字符串
-"""
-
-HUMAN = """
-请从参考规范库中查找每个审查规范的匹配信息:
-
-【匹配规则】
-1. **review_item**(审查的规范原文)
-   - 必须与输入的审查规范完全一致,逐字复制
-
-2. **has_related_file**(是否有相关文件)
-   - 在参考规范库中找到名称相似或相关的文件,返回 true
-   - 完全找不到任何相关文件,返回 false
-
-3. **has_exact_match**(是否有名称编号都相同的文件)
-   - 参考文件中的编号和文件名与审查规范完全匹配,返回 true
-   - **重要**:比较时忽略括号格式差异(半角()和全角()视为相同)
-   - 例如:《规范》(GB 1234-2020)与《规范》(GB 1234-2020)视为完全匹配
-   - 否则返回 false
-
-4. **exact_match_info**(名称编号相同的文件及状态)
-   - 如果 has_exact_match 为 true,返回该文件的完整信息
-   - 格式:《规范名称》(规范编号)状态为XXX
-
-5. **same_name_current**(名称相同的现行文件)
-   - 在参考规范库中查找与审查规范名称相同且状态为"现行"的文件
-   - 格式:《规范名称》(规范编号)状态为现行
-
-【参考规范库】
-{reference_text}
-
-【审查规范】
-{review_text}
-
-【输出格式要求】
-{format_instructions}
-/no_think
-"""
-
-# ===== 3) Output Parser =====
-parser = PydanticOutputParser(pydantic_object=MatchResults)
-
-# ===== 4) Prompt =====
-prompt = ChatPromptTemplate.from_messages([
-    ("system", SYSTEM),
-    ("human", HUMAN)
-])
-
-# ===== 5) LLM Client =====
-model_client = generate_model_client
-
-# ===== 6) 重试配置 =====
-MAX_RETRIES = 5
-BASE_DELAY = 2
-MAX_DELAY = 30
-
-
-def _is_retryable_error(error: Exception) -> bool:
-    """判断是否为可重试的错误"""
-    error_str = str(error).lower()
-    retryable_codes = ['502', '503', '504', '429', 'timeout', 'connection', 'overload']
-    return any(code in error_str for code in retryable_codes)
-
-
-def _get_user_friendly_error(error: Exception) -> str:
-    """将技术错误转换为用户友好的提示"""
-    error_str = str(error).lower()
-
-    if '502' in error_str or '503' in error_str or '504' in error_str:
-        return "模型服务暂时不可用,请稍后重试"
-    elif '429' in error_str or 'rate limit' in error_str:
-        return "请求过于频繁,请稍后重试"
-    elif 'timeout' in error_str:
-        return "模型响应超时,请稍后重试"
-    elif 'connection' in error_str:
-        return "网络连接异常,请检查网络后重试"
-    else:
-        return f"规范匹配服务暂时不可用,请稍后重试"
-
-
-# ===== 7) 提取第一个 JSON =====
-def extract_first_json(text: str) -> dict:
-    """从任意模型输出中提取第一个完整 JSON 对象 { ... }"""
-    start = text.find("{")
-    if start == -1:
-        raise ValueError("未找到 JSON 起始 '{'")
-
-    depth = 0
-    for i in range(start, len(text)):
-        ch = text[i]
-        if ch == "{":
-            depth += 1
-        elif ch == "}":
-            depth -= 1
-            if depth == 0:
-                return json.loads(text[start:i + 1])
-
-    raise ValueError("JSON 花括号未闭合")
-
-
-# ===== 8) 辅助函数:提取规范名称和编号 =====
-def _extract_regulation_info(text: str) -> Tuple[str, Optional[str]]:
-    """从文本中提取规范名称和编号"""
-    # 提取书名号内的名称
-    name_match = re.search(r'《([^《》]+)》', text)
-    name = name_match.group(1) if name_match else text.strip()
-    
-    # 提取括号内的编号
-    number_match = re.search(r'[((]([^))]+)[))]', text)
-    number = number_match.group(1).strip() if number_match else None
-    
-    return name, number
-
-
-def _normalize_text(text: str) -> str:
-    """
-    标准化文本,统一括号格式用于比较
-    将全角括号转换为半角括号,去除多余空格
-    """
-    if not text:
-        return text
-    # 全角括号转为半角括号
-    text = text.replace('(', '(').replace(')', ')')
-    # 统一书名号(中文书名号保持不变,但统一全角半角)
-    text = text.replace('『', '《').replace('』', '》')
-    text = text.replace('﹄', '《').replace('﹃', '》')
-    # 去除多余空格
-    text = ' '.join(text.split())
-    return text.strip()
-
-
-def _extract_core_number(number: str) -> str:
-    """
-    提取规范编号的核心部分(去掉年份)
-    例如:JGJ 65-2013 -> JGJ65, GB/T 50010-2010 -> GB/T50010
-    
-    Args:
-        number: 规范编号,如 "JGJ 65-2013"
-        
-    Returns:
-        核心编号,如 "JGJ65"
-    """
-    if not number:
-        return ""
-    
-    # 标准化:转大写、去空格
-    normalized = number.upper().replace(' ', '')
-    
-    # 去掉年份部分(-YYYY 或 —YYYY)
-    # 匹配末尾的年份 -4位数字 或 —4位数字 或 - 4位数字
-    normalized = re.sub(r'[-—]\s*\d{4}$', '', normalized)
-    
-    return normalized
-
-
-def _is_same_regulation_family(original_number: str, generated_number: str, threshold: int = 100) -> bool:
-    """
-    判断两个编号是否属于同一规范家族(核心部分相同或高度相似)
-    
-    Args:
-        original_number: 原始编号
-        generated_number: 生成的编号
-        threshold: 数字差异阈值,默认100
-        
-    Returns:
-        bool: 是否属于同一规范家族
-    """
-    original_core = _extract_core_number(original_number)
-    generated_core = _extract_core_number(generated_number)
-    
-    if not original_core or not generated_core:
-        return False
-    
-    # 如果核心部分完全相同,肯定是同一规范
-    if original_core == generated_core:
-        return True
-    
-    # 提取前缀(如 JGJ、GB/T 等)和数字部分
-    def _split_core(core: str) -> tuple:
-        """将核心编号拆分为前缀和数字部分"""
-        match = re.match(r'^([A-Z]+(?:/[A-Z])?)(\d+(?:\.\d+)?)$', core)
-        if match:
-            return match.group(1), match.group(2)
-        return core, ""
-    
-    orig_prefix, orig_num = _split_core(original_core)
-    gen_prefix, gen_num = _split_core(generated_core)
-    
-    # 如果前缀相同但数字不同,可能是同一系列的不同规范
-    # 例如 JGJ65 和 JGJ300 都是 JGJ 系列,但是完全不同的规范
-    # 我们认为:如果前缀相同且数字相似(差值在一定范围内),才算同一规范家族
-    if orig_prefix == gen_prefix and orig_num and gen_num:
-        try:
-            orig_val = float(orig_num)
-            gen_val = float(gen_num)
-            # 【关键阈值】如果数字差异达到或超过阈值,认为是完全不同的规范
-            if abs(orig_val - gen_val) >= threshold:
-                return False
-            return True
-        except ValueError:
-            # 无法转换为数字,直接比较字符串
-            pass
-    
-    return False
-
-
-# ===== 9) 新流程:验证并生成正确编号 =====
-async def validate_and_generate_number(
-    review_item: str,
-    reference_candidates: List[str]
-) -> ValidationMatchResult:
-    """
-    新流程:
-    1. 基于参考文件验证现有编号是否正确
-    2. 如果验证失败,调用5模型生成正确编号
-    
-    Args:
-        review_item: 审查的规范原文
-        reference_candidates: 参考文件候选列表
-    
-    Returns:
-        ValidationMatchResult 包含验证结果和正确编号
-    """
-    regulation_name, existing_number = _extract_regulation_info(review_item)
-    
-    # 构建参考文件信息
-    reference_info = "\n".join(reference_candidates) if reference_candidates else "无参考文件"
-    
-    # 第一步:如果有现有编号,先验证是否正确
-    if existing_number:
-        logger.info(f"[时效性验证] 验证编号: 《{regulation_name}》 {existing_number}")
-        
-        # 先进行本地标准化比较:检查参考候选中是否有名称和编号都完全匹配(忽略括号差异)的
-        normalized_existing_number = _normalize_text(existing_number)
-        normalized_regulation_name = _normalize_text(regulation_name)
-        for candidate in reference_candidates:
-            # 从候选中提取名称和编号
-            candidate_name, candidate_number = _extract_regulation_info(candidate)
-            if (candidate_name and candidate_number and
-                _normalize_text(candidate_name) == normalized_regulation_name and
-                _normalize_text(candidate_number) == normalized_existing_number):
-                logger.info(f"[时效性验证] 本地验证通过(名称和编号都匹配): 《{regulation_name}》 {existing_number}")
-                return ValidationMatchResult(
-                    review_item=review_item,
-                    reference_candidates=reference_candidates,
-                    is_valid=True,
-                    validated_number=existing_number,
-                    status="验证通过"
-                )
-
-        # 【关键】检查是否有编号相同但名称不同的情况(规范名称错误)
-        for candidate in reference_candidates:
-            candidate_name, candidate_number = _extract_regulation_info(candidate)
-            if (candidate_name and candidate_number and
-                _normalize_text(candidate_number) == normalized_existing_number and
-                _normalize_text(candidate_name) != normalized_regulation_name):
-                logger.info(f"[时效性验证] 编号相同但名称不同: 《{regulation_name}》-> 应为《{candidate_name}》")
-                return ValidationMatchResult(
-                    review_item=review_item,
-                    reference_candidates=reference_candidates,
-                    is_valid=False,
-                    validated_number=existing_number,
-                    status="规范名称错误"
-                )
-        
-        # 调用3模型验证
-        validation = await validate_reference_number(
-            regulation_name=regulation_name,
-            existing_number=existing_number
-        )
-        
-        if validation and validation.is_valid:
-            logger.info(f"[时效性验证] 验证通过: 《{regulation_name}》 {existing_number}")
-            return ValidationMatchResult(
-                review_item=review_item,
-                reference_candidates=reference_candidates,
-                is_valid=True,
-                validated_number=existing_number,
-                status="验证通过"
-            )
-        else:
-            logger.info(f"[时效性验证] 验证失败,需要生成新编号")
-    
-    # 第二步:验证失败或没有编号,调用5模型生成
-    logger.info(f"[时效性验证] 生成编号: 《{regulation_name}》")
-    
-    existing_info = existing_number if existing_number else ""
-    vote_result = await generate_reference_number(
-        regulation_name=regulation_name,
-        existing_info=existing_info
-    )
-    
-    if vote_result and vote_result.confidence >= 0.5:
-        logger.info(f"[时效性验证] 生成成功: {vote_result.generated_number}")
-        return ValidationMatchResult(
-            review_item=review_item,
-            reference_candidates=reference_candidates,
-            is_valid=False,  # 原始编号不正确
-            validated_number=vote_result.generated_number,
-            status="生成新编号"
-        )
-    else:
-        logger.warning(f"[时效性验证] 生成失败")
-        return ValidationMatchResult(
-            review_item=review_item,
-            reference_candidates=reference_candidates,
-            is_valid=False,
-            validated_number=None,
-            status="无法确定"
-        )
-
-
-# ===== 10) 核心方法:匹配参考文件 =====
-async def match_reference_files(reference_text: str, review_text: str) -> str:
-    """
-    从参考规范库中查找审查规范的匹配信息(带验证和生成新流程)
-
-    Args:
-        reference_text: 参考规范库内容
-        review_text: 审查规范内容
-
-    Returns:
-        匹配结果的JSON字符串
-    """
-    # 第一步:使用LLM进行初步匹配
-    format_instructions = parser.get_format_instructions()
-
-    messages = prompt.format_messages(
-        reference_text=reference_text,
-        review_text=review_text,
-        format_instructions=format_instructions
-    )
-
-    last_err = None
-    raw_result = None
-
-    for attempt in range(1, MAX_RETRIES + 1):
-        try:
-            # 使用 function_name 从 model_setting.yaml 加载模型配置(规范性审查)
-            logger.info(f"[规范匹配] 第 {attempt}/{MAX_RETRIES} 次尝试调用模型")
-
-            raw = await model_client.get_model_generate_invoke(
-                trace_id="reference_match",
-                messages=messages,
-                function_name="reference_review"
-            )
-
-            logger.debug(f"[规范匹配] 模型输出: {raw[:200]}...")
-            data = extract_first_json(raw)
-            findings = MatchResults.model_validate(data)
-            raw_result = [x.model_dump() for x in findings.items]
-
-            logger.info(f"[规范匹配] 初步匹配成功,返回 {len(raw_result)} 个结果")
-            break
-
-        except Exception as e:
-            last_err = e
-            error_type = type(e).__name__
-            logger.warning(f"[规范匹配] 第 {attempt} 次尝试失败: {error_type}: {str(e)[:100]}")
-
-            if not _is_retryable_error(e):
-                logger.error(f"[规范匹配] 遇到不可重试的错误: {error_type}")
-                raise RuntimeError(_get_user_friendly_error(e)) from e
-
-            if attempt < MAX_RETRIES:
-                delay = min(BASE_DELAY * (2 ** (attempt - 1)), MAX_DELAY)
-                import random
-                jitter = random.uniform(0, 1)
-                actual_delay = delay + jitter
-
-                logger.info(f"[规范匹配] 等待 {actual_delay:.1f} 秒后重试...")
-                await asyncio.sleep(actual_delay)
-            else:
-                logger.error(f"[规范匹配] 达到最大重试次数 {MAX_RETRIES},最终失败")
-                raise RuntimeError(_get_user_friendly_error(e)) from e
-
-    if raw_result is None:
-        raise RuntimeError(_get_user_friendly_error(last_err))
-
-    # 第二步:对初步匹配结果进行验证/生成
-    final_results = []
-    
-    # 解析 review_text
-    try:
-        review_items = json.loads(review_text) if isinstance(review_text, str) else review_text
-        if not isinstance(review_items, list):
-            review_items = [review_items]
-    except json.JSONDecodeError:
-        review_items = [review_text]
-
-    # 解析 reference_text
-    ref_candidates = reference_text if isinstance(reference_text, list) else [reference_text]
-
-    for i, raw_item in enumerate(raw_result):
-        review_item = raw_item.get("review_item", review_items[i] if i < len(review_items) else "")
-        has_related = raw_item.get("has_related_file", False)
-        has_exact = raw_item.get("has_exact_match", False)
-        exact_info = raw_item.get("exact_match_info", "")
-        same_name_current = raw_item.get("same_name_current", "")
-        
-        # 【校正逻辑】如果LLM判断has_exact_match=false,但本地比较发现名称和编号都相同(忽略括号差异),则校正为true
-        if not has_exact and exact_info:
-            review_name, review_number = _extract_regulation_info(review_item)
-            exact_name, exact_number = _extract_regulation_info(exact_info)
-            if (review_name and exact_name and
-                _normalize_text(review_name) == _normalize_text(exact_name) and
-                review_number and exact_number and
-                _normalize_text(review_number) == _normalize_text(exact_number)):
-                logger.info(f"[规范匹配校正] review_item='{review_item}' 名称和编号都相同,校正has_exact_match为true")
-                has_exact = True
-        
-        # 【第一步】检查向量搜索候选中的匹配情况
-        # ref_candidates 是 List[List[str]],需要获取当前项对应的候选列表
-        current_candidates = ref_candidates[i] if i < len(ref_candidates) else []
-        review_name, review_number = _extract_regulation_info(review_item)
-
-        if review_name and review_number and current_candidates:
-            normalized_review_name = _normalize_text(review_name)
-            normalized_review_number = _normalize_text(review_number)
-
-            # 先检查是否有完全匹配(名称和编号都相同)
-            for candidate in current_candidates:
-                if isinstance(candidate, str):
-                    candidate_name, candidate_number = _extract_regulation_info(candidate)
-                    if (candidate_name and candidate_number and
-                        _normalize_text(candidate_name) == normalized_review_name and
-                        _normalize_text(candidate_number) == normalized_review_number):
-                        # 向量库中找到精确匹配(名称和编号都相同)
-                        logger.info(f"[规范匹配] 向量库中找到精确匹配: '{review_item}' -> '{candidate}'")
-                        final_results.append({
-                            "review_item": review_item,
-                            "has_related_file": True,
-                            "has_exact_match": True,
-                            "exact_match_info": candidate,
-                            "same_name_current": candidate
-                        })
-                        has_exact = True
-                        break
-
-            if has_exact:
-                continue
-
-            # 【关键】检查是否有编号相同但名称不同的情况(规范名称错误)
-            for candidate in current_candidates:
-                if isinstance(candidate, str):
-                    candidate_name, candidate_number = _extract_regulation_info(candidate)
-                    if (candidate_name and candidate_number and
-                        _normalize_text(candidate_number) == normalized_review_number and
-                        _normalize_text(candidate_name) != normalized_review_name):
-                        # 编号相同但名称不同 - 判定为规范名称错误
-                        logger.info(f"[规范匹配] 编号相同但名称不同: '{review_item}' -> '{candidate}'")
-                        final_results.append({
-                            "review_item": review_item,
-                            "has_related_file": True,
-                            "has_exact_match": False,
-                            "exact_match_info": "",
-                            "same_name_current": candidate,
-                            "name_mismatch": True,  # 标记为名称不匹配
-                            "correct_name": candidate_name  # 正确的名称
-                        })
-                        has_exact = True  # 标记为已处理,跳过后续逻辑
-                        break
-
-            if has_exact:
-                continue
-        
-        # 如果有精确匹配(由LLM判断),直接接受
-        if has_exact and exact_info:
-            final_results.append({
-                "review_item": review_item,
-                "has_related_file": True,
-                "has_exact_match": True,
-                "exact_match_info": exact_info,
-                "same_name_current": same_name_current
-            })
-            continue
-        
-        # 【第二步】如果没有精确匹配,但有相关文件,进行验证/生成
-        # 使用当前项的候选列表(不是整个二维列表)
-        if has_related or current_candidates:
-            try:
-                validation_result = await validate_and_generate_number(
-                    review_item=review_item,
-                    reference_candidates=current_candidates
-                )
-                
-                if validation_result.validated_number:
-                    # 【关键逻辑】检查生成的编号与原始编号是否属于同一规范家族
-                    is_same_family = _is_same_regulation_family(
-                        review_number or "",
-                        validation_result.validated_number
-                    )
-
-                    # 【特殊处理】检查参考候选中是否有名称完全匹配的文件
-                    # 如果名称相同但编号不同(如 GB 51-2001 vs GB 50021-2001),应接受生成的编号
-                    has_same_name_in_candidates = False
-                    for candidate in current_candidates:
-                        if isinstance(candidate, str):
-                            candidate_name, _ = _extract_regulation_info(candidate)
-                            if (candidate_name and
-                                _normalize_text(candidate_name) == _normalize_text(review_name)):
-                                has_same_name_in_candidates = True
-                                break
-
-                    if not is_same_family and not has_same_name_in_candidates:
-                        # 生成的编号与原始编号完全不同,且参考库中没有名称匹配的文件
-                        # 说明参考库中找到的文件实际上不相关
-                        logger.info(f"[规范匹配] '{review_item}' 生成的编号({validation_result.validated_number})"
-                                  f"与原始编号({review_number})不属于同一规范家族,判定为无相关文件")
-                        final_results.append({
-                            "review_item": review_item,
-                            "has_related_file": False,  # 【关键】标记为无相关文件
-                            "has_exact_match": False,
-                            "exact_match_info": "",
-                            "same_name_current": ""
-                        })
-                        continue
-                    
-                    if validation_result.is_valid:
-                        # 验证通过,原始编号正确
-                        final_results.append({
-                            "review_item": review_item,
-                            "has_related_file": True,
-                            "has_exact_match": True,
-                            "exact_match_info": f"《{_extract_regulation_info(review_item)[0]}》({validation_result.validated_number})状态为现行",
-                            "same_name_current": f"《{_extract_regulation_info(review_item)[0]}》({validation_result.validated_number})状态为现行"
-                        })
-                    else:
-                        # 验证失败,生成了新编号
-                        final_results.append({
-                            "review_item": review_item,
-                            "has_related_file": True,
-                            "has_exact_match": False,
-                            "exact_match_info": "",
-                            "same_name_current": f"《{_extract_regulation_info(review_item)[0]}》({validation_result.validated_number})状态为现行"
-                        })
-                else:
-                    # 无法确定,保持原结果
-                    final_results.append({
-                        "review_item": review_item,
-                        "has_related_file": has_related,
-                        "has_exact_match": False,
-                        "exact_match_info": "",
-                        "same_name_current": same_name_current
-                    })
-            except Exception as e:
-                logger.error(f"[规范匹配] 验证/生成失败: {e}")
-                # 保持原结果
-                final_results.append(raw_item)
-        else:
-            # 无相关文件
-            final_results.append({
-                "review_item": review_item,
-                "has_related_file": False,
-                "has_exact_match": False,
-                "exact_match_info": "",
-                "same_name_current": ""
-            })
-
-    return json.dumps(final_results, ensure_ascii=False, indent=2)
-
-
-# ===== 11) 示例 =====
-if __name__ == "__main__":
-    import asyncio
-
-    reference_file = """
-    《混凝土结构设计规范》(GB 50010-2010)状态为现行
-    《混凝土结构设计规范》(GB 50010-2015)状态为废止
-    《建筑施工组织设计规范》(GB/T 50502-2015)状态为废止
-    《建筑施工组织设计规范》(GB/T 50502-2020)状态为现行
-    《建筑抗震设计规范》(GB 50011-2001)状态为废止
-    《建设工程安全生产管理条例》(国务院令第393号)状态为现行
-    """
-
-    review_file = """
-    [
-        "《混凝土结构设计规范》(GB 50010-2029)",
-        "《建筑施工组织设计规范》(GB/T 50502-2015)",
-        "《建筑抗震设计规范》(GB 50011-2001)",
-        "《城市道路工程设计规范》(CJJ 37-2012)",
-        "《建设工程安全生产管理条例》(国务院令第393号)"
-    ]
-    """
-
-    result = asyncio.run(match_reference_files(reference_file, review_file))
-    print("\n匹配结果(含验证和生成):")
-    print(result)

+ 0 - 545
core/construction_review/component/reviewers/utils/reference_number_generator.py

@@ -1,545 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-"""
-多模型投票生成标准编号
-当检测到编号缺失或需要验证时,调用多个大模型生成或验证编号
-"""
-
-import asyncio
-import json
-import re
-from typing import List, Dict, Optional, Tuple
-from dataclasses import dataclass
-from collections import Counter
-
-import httpx
-
-# 导入日志或使用标准logging
-try:
-    from foundation.observability.logger.loggering import review_logger as logger
-except ImportError:
-    import logging
-    # 配置默认logger
-    logging.basicConfig(level=logging.INFO, format='%(message)s')
-    logger = logging.getLogger(__name__)
-
-
-# DashScope API 配置
-DASHSCOPE_API_KEY = "sk-ae805c991b6a4a8da3a09351c34963a5"
-DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
-
-# 模型列表(DashScope API可用模型)
-REFERENCE_MODELS = [
-    "qwen-plus",              # 阿里云Qwen Plus
-    "qwen-max",               # 阿里云Qwen Max
-    "deepseek-r1",            # DeepSeek R1
-    "deepseek-v3",            # DeepSeek V3
-    "qwen2.5-72b-instruct",   # Qwen2.5 72B
-]
-
-# 并发控制
-MAX_CONCURRENT_MODELS = 3
-
-
-@dataclass
-class ModelVoteResult:
-    """模型投票结果"""
-    generated_number: str
-    confidence: float  # 投票比例
-    all_results: Dict[str, str]  # 每个模型的原始输出
-    vote_count: int
-    total_models: int
-
-
-@dataclass
-class ValidationResult:
-    """验证结果"""
-    is_valid: bool  # 编号是否有效
-    regulation_name: str  # 规范名称
-    existing_number: str  # 现有编号
-    reason: str  # 验证理由
-
-
-def _extract_reference_number(text: str) -> Optional[str]:
-    """
-    从模型输出中提取编号
-    支持标准编号(GB/T1234)和法规编号(令第393号)
-    """
-    if not text:
-        return None
-    
-    text_clean = text.strip()
-    
-    # 模式1:标准编号格式(字母+数字)
-    # GB50278-2010, GB/T50502, JGJ80-2016, JGJ37-2018等
-    standard_pattern = r'([A-Z]{2,6})\s*(?:/\s*([A-Z]))?\s*-?\s*([0-9]{1,6})\s*(?:\.\s*([0-9]))?\s*-?\s*([0-9]{4})?'
-    
-    matches = list(re.finditer(standard_pattern, text_clean.upper()))
-    for match in matches:
-        prefix = match.group(1) or ''
-        slash = match.group(2) or ''
-        number = match.group(3) or ''
-        dot = match.group(4) or ''
-        year = match.group(5) or ''
-        
-        if prefix and number:
-            result = prefix
-            if slash:
-                result += '/' + slash
-            result += number
-            if dot:
-                result += '.' + dot
-            if year:
-                result += '-' + year
-            return result
-    
-    # 模式2:法规编号格式(汉字+数字)
-    # 如:令第393号、第493号、37号令等
-    regulation_patterns = [
-        r'(令[第\s]*[0-9]+[号\s]*)',  # 令第393号、令 393 号
-        r'([第\s]*[0-9]+[号\s]*令)',  # 第393号令、393号令
-        r'([第\s]*[0-9]+[号\s]*)',     # 第393号、393号
-    ]
-    
-    for pattern in regulation_patterns:
-        match = re.search(pattern, text_clean)
-        if match:
-            # 标准化法规编号格式
-            num_match = re.search(r'[0-9]+', match.group(1))
-            if num_match:
-                return f"令第{num_match.group()}号"
-    
-    # 模式3:直接在文本中查找类似编号的格式(最宽松)
-    text_no_space = text_clean.replace(' ', '')
-    
-    # 查找标准编号
-    standard_match = re.search(r'([A-Z]{2,6}(?:/[A-Z])?[0-9]{1,6}(?:\.[0-9])?(?:-[0-9]{4})?)', text_no_space.upper())
-    if standard_match:
-        return standard_match.group(1)
-    
-    # 查找法规编号
-    regulation_match = re.search(r'(令[第]?[0-9]+号)', text_no_space)
-    if regulation_match:
-        num = re.search(r'[0-9]+', regulation_match.group(1)).group()
-        return f"令第{num}号"
-    
-    return None
-
-
-async def _call_validation_model(
-    model_name: str,
-    regulation_name: str,
-    existing_number: str
-) -> Tuple[str, Optional[ValidationResult]]:
-    """
-    调用单个模型验证编号是否正确
-    
-    Args:
-        model_name: 模型名称
-        regulation_name: 规范/法规名称
-        existing_number: 现有编号
-    
-    Returns:
-        (model_name, ValidationResult 或 None)
-    """
-    try:
-        system_prompt = """你是标准规范验证专家。请验证给定的规范/法规名称和编号是否匹配且真实存在。
-
-【验证要求】
-1. 判断该编号是否是该规范/法规的正确编号
-2. 支持各种编号类型:
-   - 国家标准:GB、GB/T等
-   - 行业标准:JGJ、JTG、JTJ、DL/T、SL等  
-   - 部门规章:令第X号、第X号令等
-   - 行政法规:国务院令第X号等
-
-【重要判定规则】
-- 确定匹配且真实存在 → is_valid: true
-- 确定不匹配或不存在 → is_valid: false  
-- **不确定、不认识、无法验证时 → is_valid: true(宽松通过,避免误判)**
-
-【输出格式】
-仅输出JSON格式:{"is_valid": true/false, "reason": "验证理由"}
-
-【示例】
-输入:《建设工程安全生产管理条例》,国务院令第393号
-输出:{"is_valid": true, "reason": "该条例确实是国务院2003年颁布的第393号令"}
-
-输入:《混凝土结构设计规范》,GB50010-2010
-输出:{"is_valid": true, "reason": "GB50010-2010是混凝土结构设计规范的正确编号"}
-
-输入:《某冷门行业规范》,XYZ123-2020(你不认识的编号)
-输出:{"is_valid": true, "reason": "无法确认,宽松通过避免误判"}"""
-
-        user_prompt = f"请验证以下规范/法规的编号是否正确:\n\n名称:《{regulation_name}》\n编号:{existing_number}\n\n请输出JSON格式结果。不确定时请返回is_valid: true,避免误判真实存在的标准。"
-        
-        async with httpx.AsyncClient(timeout=30.0) as client:
-            response = await client.post(
-                f"{DASHSCOPE_BASE_URL}/chat/completions",
-                headers={
-                    "Authorization": f"Bearer {DASHSCOPE_API_KEY}",
-                    "Content-Type": "application/json"
-                },
-                json={
-                    "model": model_name,
-                    "messages": [
-                        {"role": "system", "content": system_prompt},
-                        {"role": "user", "content": user_prompt}
-                    ],
-                    "temperature": 0.1,
-                    "max_tokens": 200
-                }
-            )
-            
-            if response.status_code != 200:
-                logger.error(f"[{model_name}] API调用失败: {response.status_code}")
-                return model_name, None
-            
-            data = response.json()
-            content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
-            
-            # 提取JSON
-            try:
-                # 查找JSON内容
-                json_match = re.search(r'\{[^}]*"is_valid"[^}]*\}', content, re.DOTALL)
-                if json_match:
-                    result = json.loads(json_match.group())
-                    is_valid = result.get("is_valid", False)
-                    reason = result.get("reason", "")
-                    return model_name, ValidationResult(
-                        is_valid=is_valid,
-                        regulation_name=regulation_name,
-                        existing_number=existing_number,
-                        reason=reason
-                    )
-            except (json.JSONDecodeError, Exception) as e:
-                logger.debug(f"[{model_name}] JSON解析失败: {e}")
-                
-            return model_name, None
-            
-    except Exception as e:
-        logger.error(f"[{model_name}] 调用异常: {str(e)[:50]}")
-        return model_name, None
-
-
-async def validate_reference_number(
-    regulation_name: str,
-    existing_number: str,
-    models: List[str] = None
-) -> Optional[ValidationResult]:
-    """
-    多模型投票验证现有编号是否正确
-    
-    Args:
-        regulation_name: 规范/法规名称
-        existing_number: 现有编号
-        models: 要使用的模型列表
-    
-    Returns:
-        ValidationResult 或 None(验证失败)
-    """
-    models = models or REFERENCE_MODELS[:3]  # 验证用3个模型即可
-    
-    logger.info(f"开始验证编号: 名称='{regulation_name}', 编号='{existing_number}'")
-    
-    # 创建信号量限制并发
-    semaphore = asyncio.Semaphore(MAX_CONCURRENT_MODELS)
-    
-    async def call_with_semaphore(model: str) -> Tuple[str, Optional[ValidationResult]]:
-        async with semaphore:
-            return await _call_validation_model(model, regulation_name, existing_number)
-    
-    # 并发调用所有模型
-    tasks = [call_with_semaphore(model) for model in models]
-    results = await asyncio.gather(*tasks, return_exceptions=True)
-    
-    # 处理结果
-    valid_count = 0
-    invalid_count = 0
-    reasons = []
-    
-    for result in results:
-        if isinstance(result, Exception):
-            continue
-        _, validation = result
-        if validation:
-            if validation.is_valid:
-                valid_count += 1
-            else:
-                invalid_count += 1
-            reasons.append(validation.reason)
-    
-    total = valid_count + invalid_count
-    if total == 0:
-        return None
-    
-    # 超过半数认为有效
-    if valid_count > invalid_count:
-        return ValidationResult(
-            is_valid=True,
-            regulation_name=regulation_name,
-            existing_number=existing_number,
-            reason=f"{valid_count}/{total}个模型验证通过"
-        )
-    else:
-        return ValidationResult(
-            is_valid=False,
-            regulation_name=regulation_name,
-            existing_number=existing_number,
-            reason=f"验证未通过({valid_count}/{total})"
-        )
-
-
-async def _call_generation_model(
-    model_name: str,
-    regulation_name: str,
-    existing_info: str = ""
-) -> Tuple[str, Optional[str]]:
-    """
-    调用单个模型生成编号
-    
-    Args:
-        model_name: 模型名称
-        regulation_name: 规范/法规名称
-        existing_info: 现有信息(如"住建部37号令")
-    
-    Returns:
-        (model_name, generated_number 或 None)
-    """
-    try:
-        system_prompt = """你是标准规范编号专家。请根据给定的规范/法规名称,生成对应的国家标准编号。
-
-【编号格式规则】
-- 只能包含大写字母、数字、斜杠(/)和横杠(-)
-- 常见格式:GB50010、GB/T50502、JGJ80-2016、DL/T1147-2009、JGJ37-2018等
-- 年份用4位数字表示,如-2010
-- 不要输出任何解释,只输出编号
-
-【输出示例】
-输入:《混凝土结构设计规范》
-输出:GB50010-2010
-
-输入:《建筑施工高处作业安全技术规范》
-输出:JGJ80-2016
-
-输入:《危险性较大的分部分项工程安全管理规定》(住建部37号令)
-输出:JGJ37-2018"""
-
-        user_prompt = f"请为标准/法规生成编号:\n\n名称:《{regulation_name}》"
-        if existing_info:
-            user_prompt += f"\n现有信息:{existing_info}"
-        user_prompt += "\n\n请只输出标准编号(如GB50010-2010),不要其他内容。"
-        
-        async with httpx.AsyncClient(timeout=30.0) as client:
-            response = await client.post(
-                f"{DASHSCOPE_BASE_URL}/chat/completions",
-                headers={
-                    "Authorization": f"Bearer {DASHSCOPE_API_KEY}",
-                    "Content-Type": "application/json"
-                },
-                json={
-                    "model": model_name,
-                    "messages": [
-                        {"role": "system", "content": system_prompt},
-                        {"role": "user", "content": user_prompt}
-                    ],
-                    "temperature": 0.1,
-                    "max_tokens": 100
-                }
-            )
-            
-            if response.status_code != 200:
-                logger.error(f"[{model_name}] API调用失败: {response.status_code}")
-                return model_name, None
-            
-            data = response.json()
-            content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
-            
-            # 清理输出
-            content = content.strip().strip('`').strip()
-            
-            # 提取编号
-            extracted = _extract_reference_number(content)
-            if not extracted and content and content != "未知":
-                # 如果没有匹配到模式,但内容不为空,尝试直接使用
-                # 去除常见前缀和多余内容
-                cleaned = re.sub(r'^(编号|标准号|文号)[::\s]*', '', content)
-                cleaned = cleaned.split('\n')[0].strip()
-                if cleaned:
-                    extracted = cleaned
-            
-            logger.debug(f"[{model_name}] 生成: '{content}' -> '{extracted}'")
-            return model_name, extracted
-            
-    except Exception as e:
-        logger.error(f"[{model_name}] 调用异常: {str(e)[:50]}")
-        return model_name, None
-
-
-async def generate_reference_number(
-    regulation_name: str,
-    existing_info: str = "",
-    models: List[str] = None
-) -> Optional[ModelVoteResult]:
-    """
-    多模型投票生成规范编号
-    
-    Args:
-        regulation_name: 规范/法规名称
-        existing_info: 现有信息(如"住建部37号令")
-        models: 要使用的模型列表
-    
-    Returns:
-        ModelVoteResult 或 None(生成失败)
-    """
-    models = models or REFERENCE_MODELS
-    
-    logger.info(f"开始生成编号: 名称='{regulation_name}'")
-    
-    # 创建信号量限制并发
-    semaphore = asyncio.Semaphore(MAX_CONCURRENT_MODELS)
-    
-    async def call_with_semaphore(model: str) -> Tuple[str, Optional[str]]:
-        async with semaphore:
-            return await _call_generation_model(model, regulation_name, existing_info)
-    
-    # 并发调用所有模型
-    tasks = [call_with_semaphore(model) for model in models]
-    results = await asyncio.gather(*tasks, return_exceptions=True)
-    
-    # 处理结果
-    model_results = {}
-    for result in results:
-        if isinstance(result, Exception):
-            continue
-        model_name, number = result
-        model_results[model_name] = number
-    
-    logger.debug(f"模型调用完成: {len(model_results)}/{len(models)}")
-    
-    # 投票
-    vote_result = _vote_for_number(model_results)
-    return vote_result
-
-
-def _vote_for_number(results: Dict[str, Optional[str]]) -> Optional[ModelVoteResult]:
-    """
-    对多个模型的生成结果进行投票
-    """
-    # 过滤有效结果
-    valid_numbers = [(model, num) for model, num in results.items() if num and num != "未知"]
-    
-    if not valid_numbers:
-        logger.warning("所有模型都未生成有效编号")
-        return None
-    
-    total_valid = len(valid_numbers)
-    
-    # 标准化编号(用于投票统计)
-    def normalize(num: str) -> str:
-        # 统一为大写,移除空格,但保留斜杠和横杠
-        return num.upper().replace(' ', '')
-    
-    # 统计投票
-    normalized_votes = {}
-    for model, num in valid_numbers:
-        normalized = normalize(num)
-        if normalized not in normalized_votes:
-            normalized_votes[normalized] = []
-        normalized_votes[normalized].append((model, num))
-    
-    # 找到得票最多的编号
-    winner_normalized = None
-    winner_votes = []
-    max_count = 0
-    
-    for norm, votes in normalized_votes.items():
-        if len(votes) > max_count:
-            max_count = len(votes)
-            winner_normalized = norm
-            winner_votes = votes
-    
-    vote_count = len(winner_votes)
-    confidence = vote_count / total_valid
-    
-    # 检查是否超过半数
-    if confidence <= 0.5:
-        logger.info(f"投票未过半数: '{winner_normalized}' 得票 {vote_count}/{total_valid}")
-        return None
-    
-    # 使用第一个原始格式的获胜编号
-    original_winner = winner_votes[0][1]
-    
-    logger.info(f"投票结果: '{original_winner}' 得票 {vote_count}/{total_valid} ({confidence:.0%})")
-    
-    return ModelVoteResult(
-        generated_number=original_winner,
-        confidence=confidence,
-        all_results=results,
-        vote_count=vote_count,
-        total_models=len(results)
-    )
-
-
-# 便捷函数
-def validate_reference_number_sync(
-    regulation_name: str,
-    existing_number: str
-) -> Optional[ValidationResult]:
-    """同步版本的验证函数"""
-    return asyncio.run(validate_reference_number(regulation_name, existing_number))
-
-
-def generate_reference_number_sync(
-    regulation_name: str,
-    existing_info: str = ""
-) -> Optional[ModelVoteResult]:
-    """同步版本的生成函数"""
-    return asyncio.run(generate_reference_number(regulation_name, existing_info))
-
-
-if __name__ == "__main__":
-    # 测试
-    async def test():
-        # 测试验证
-        print("=" * 60)
-        print("测试编号验证")
-        print("=" * 60)
-        
-        test_cases = [
-            ("建设工程安全生产管理条例", "国务院令第393号"),
-            ("混凝土结构设计规范", "GB50010-2010"),
-            ("危险性较大的分部分项工程安全管理规定", "住建部令第37号"),
-        ]
-        
-        for name, number in test_cases:
-            print(f"\n验证: 《{name}》 {number}")
-            result = await validate_reference_number(name, number)
-            if result:
-                print(f"  结果: {'✓ 有效' if result.is_valid else '✗ 无效'}")
-                print(f"  理由: {result.reason}")
-            else:
-                print("  验证失败")
-        
-        # 测试生成
-        print("\n" + "=" * 60)
-        print("测试编号生成")
-        print("=" * 60)
-        
-        generate_cases = [
-            "建设工程安全生产管理条例",
-            "起重机械安全规程",
-            "中华人民共和国环境保护法",
-        ]
-        
-        for name in generate_cases:
-            print(f"\n生成: 《{name}》")
-            result = await generate_reference_number(name)
-            if result:
-                print(f"  结果: {result.generated_number}")
-                print(f"  置信度: {result.confidence:.0%}")
-            else:
-                print("  生成失败")
-    
-    asyncio.run(test())