Jelajahi Sumber

新增时效性检查

ZengChao 2 bulan lalu
induk
melakukan
97ec414b53

+ 55 - 0
core/construction_review/component/reviewers/prompt/timeliness_basis_reviewer.yaml

@@ -0,0 +1,55 @@
+timeliness_basis_reviewer:
+  system_prompt: |
+    /no_think
+    【角色】
+    你是一个【规范性引用文件时效性风险点评审助手】。
+
+    【任务】
+    你的任务是基于参考文件对审查文件的时效性进行评估,并提出改进建议。
+
+    【字段说明】
+    - **issue_point**:时效性判断
+    - **location**:依据的参考文件
+    - **suggestion**:建议(可执行动作)
+    - **reason**:问题的原因分析和依据说明,基于标准规范要求的详细说明
+    - **risk_level**:风险水平,只能是 "LOW" / "MEDIUM" / "HIGH"
+
+    【时效性判定类型(仅限以下四类)】
+    个别修订未体现(存在修订/修正单,但 location 未反映)
+    规范版本号错误(location 版本号与候选不一致)
+    引用已被替代的标准(存在明确替代标准)
+    引用已废止的规范(明确标注为废止/停止使用)
+
+    【输出格式规范】
+    - 你只能输出一个数组对象
+    - 数组中每个元素为一个 JSON 对象
+    - 为每一个审查文件输出一个结果对象
+    - 每个对象只能包含五个字段:
+      {"issue_point":"", "location":"", "suggestion":"", "reason":"", "risk_level":"LOW|MEDIUM|HIGH"}
+
+    【输出示例】
+    ```json
+    [
+      {{
+        "issue_point": "GB 50204-2015 引用规范 ✔ 符合要求",
+        "location": "《建筑工程施工质量验收统一标准》(GB 50204-2015)",
+        "suggestion": "标准引用格式规范,包含完整的标准编号和标准名称,符合要求",
+        "reason": "依据国家标准编写规范,标准引用应包含标准编号和标准名称,该引用格式正确",
+        "risk_level": "无风险"
+      }},
+      {{
+        "issue_point": "缺少安全技术规范 ✘ 不符合要求",
+        "location": "编制依据中缺少相关安全技术规范标准",
+        "suggestion": "建议补充《建筑施工安全检查标准》(JGJ 59-2011)等安全技术规范",
+        "reason": "依据施工组织设计编制规范,应包含相关的安全技术标准",
+        "risk_level": "高风险"
+      }}
+    ]
+    ```
+    
+  user_prompt_template: |
+    请审查以下内容的编制依据时效性和相关性: 
+    参考内容:
+    {reference_content}
+    审查内容:
+    {check_content}

+ 569 - 0
core/construction_review/component/reviewers/timeliness_basis_reviewer.py

@@ -0,0 +1,569 @@
+import os
+import sys
+import json
+import re
+import time
+from typing import Any, Dict, List, Optional
+import asyncio
+
+
+
+# 导入必要的依赖
+try:
+    from pymilvus import connections, Collection
+    from foundation.infrastructure.config.config import config_handler
+    from foundation.ai.models.model_handler import model_handler as mh
+    from foundation.ai.agent.generate.model_generate import generate_model_client
+    from core.construction_review.component.reviewers.utils.prompt_loader import prompt_loader
+    from core.construction_review.component.reviewers.utils.inter_tool import InterTool
+    from foundation.observability.logger.loggering import server_logger as logger
+    from langchain_core.prompts import ChatPromptTemplate
+except ImportError as e:
+    logger.warning(f"Warning: 无法导入依赖: {e}")
+    # 设置默认值,避免程序崩溃
+    mh = None
+    generate_model_client = None
+    prompt_loader = None
+    logger = None
+    InterTool = None
+
+
+class TextProcessor:
+    """文本处理工具类"""
+
+    @staticmethod
+    def extract_basis(text: str) -> List[str]:
+        """从文本中提取编制依据"""
+        pattern = re.compile(r'《[^》]+》(?:([^)]+))?')
+        return pattern.findall(text)
+
+
+
+
+class StandardizedResponseProcessor:
+    """标准化响应处理器 - 统一为outline_reviewer.py格式"""
+
+    def __init__(self):
+        if InterTool:
+            self.inter_tool = InterTool()
+        else:
+            self.inter_tool = None
+
+    def process_llm_response(self, response_text: str, check_name: str = "编制依据检查") -> List[Dict[str, Any]]:
+        """
+        处理LLM响应,返回标准格式
+
+        Args:
+            response_text: LLM原始响应文本
+            check_name: 检查项名称
+
+        Returns:
+            List[Dict]: 标准格式的审查结果列表
+        """
+        if not self.inter_tool:
+            logger.warning("InterTool未初始化,返回空结果")
+            return []
+
+        try:
+            # 使用inter_tool提取JSON数据
+            json_data = self.inter_tool._extract_json_data(response_text)
+            parsed_result = []
+
+            if json_data and isinstance(json_data, list):
+                for item in json_data:
+                    parsed_result.append(self.inter_tool._create_issue_item(item, check_name))
+            elif json_data and isinstance(json_data, dict):
+                parsed_result.append(self.inter_tool._create_issue_item(json_data, check_name))
+
+            return parsed_result
+
+        except Exception as e:
+            logger.error(f"处理LLM响应失败: {str(e)}")
+            # 返回一个错误条目
+            return [{
+                "check_item": check_name,
+                "check_result": {"error": str(e)},
+                "exist_issue": True,
+                "risk_info": {"risk_level": "medium"}
+            }]
+
+
+class MessageBuilder:
+    """消息构建工具类"""
+
+    def __init__(self, prompt_loader_instance=None):
+        self.prompt_loader = prompt_loader_instance
+
+    
+
+    def get_prompt_template(self):
+        import yaml
+        with open("core/construction_review/component/reviewers/prompt/timeliness_basis_reviewer.yaml", "r", encoding="utf-8") as f:
+            data = yaml.safe_load(f)
+        return ChatPromptTemplate.from_messages([
+                ("system", data["timeliness_basis_reviewer"]["system_prompt"]),
+                ("user", data["timeliness_basis_reviewer"]["user_prompt_template"])
+            ])
+
+    def build_user_content(
+        self,
+        basis_items: List[str],
+        grouped_candidates: List[List[Dict[str, Any]]],
+    ) -> str:
+        """构建用户内容"""
+        items = []
+        for raw, cands in zip(basis_items, grouped_candidates):
+            items.append({
+                "raw_text": raw,
+                "candidates": [
+                    {
+                        "id": c.get("id"),
+                        "similarity": c.get("similarity"),
+                        "text": c.get("text") or c.get("text_content") or "",
+                    }
+                    for c in (cands or [])
+                ],
+            })
+
+        user_content = {
+            "items": items,
+            "required_output_example": [
+                {"is_standard": False, "status": "", "meg": ""} for _ in items
+            ],
+        }
+
+        return json.dumps(user_content, ensure_ascii=False)
+
+
+class BasisSearchEngine:
+    """编制依据向量搜索引擎"""
+
+    def __init__(self):
+        self.emdmodel = None
+        self._initialize()
+
+    def _initialize(self):
+        """初始化搜索引擎"""
+        try:
+            # 连接配置
+            self.host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
+            self.port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
+            self.user = config_handler.get('milvus', 'MILVUS_USER')
+            self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
+
+            # 连接到 Milvus
+            connections.connect(
+                alias="default",
+                host=self.host,
+                port=self.port,
+                user=self.user,
+                db_name="lq_db"
+            )
+            logger.info(f" 成功连接到 Milvus {self.host}:{self.port}")
+
+            # 初始化嵌入模型
+            if mh:
+                self.emdmodel = mh._get_lq_qwen3_8b_emd()
+                logger.info(" 嵌入模型初始化成功")
+            else:
+                raise ImportError("无法获取嵌入模型")
+
+        except Exception as e:
+            logger.error(f" BasisSearchEngine 初始化失败: {e}")
+            self.emdmodel = None
+
+    def text_to_vector(self, text: str) -> List[float]:
+        """将文本转换为向量"""
+        if not self.emdmodel:
+            raise ValueError("嵌入模型未初始化")
+
+        try:
+            embedding = self.emdmodel.embed_query(text)
+            return embedding.tolist() if hasattr(embedding, 'tolist') else list(embedding)
+        except Exception as e:
+            logger.error(f"文本向量化失败: {e}")
+            raise
+
+    def similarity_search(self, collection_name: str, query_text: str,
+                         min_score: float = 0.3, top_k: int = 3,
+                         filters: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
+        """执行相似度搜索"""
+        try:
+            if not self.emdmodel:
+                raise ValueError("搜索器未正确初始化")
+
+            # 获取集合
+            collection = Collection(collection_name)
+            collection.load()
+
+            # 转换查询文本为向量
+            query_embedding = self.text_to_vector(query_text)
+
+            # 搜索参数
+            search_params = {
+                "metric_type": "COSINE",
+                "params": {"nprobe": 10}
+            }
+
+            # 构建过滤表达式
+            filter_expr = self._create_filter(filters)
+
+            # 执行搜索
+            results = collection.search(
+                data=[query_embedding],
+                anns_field="embedding",
+                param=search_params,
+                limit=top_k,
+                expr=filter_expr,
+                output_fields=["text", "metadata"]
+            )
+
+            # 格式化结果
+            formatted_results = []
+            for hits in results:
+                for hit in hits:
+                    formatted_results.append({
+                        'id': hit.id,
+                        'text': hit.entity.get('text', ''),
+                        'text_content': hit.entity.get('text', ''),
+                        'metadata': hit.entity.get('metadata', {}),
+                        'distance': hit.distance,
+                        'similarity': 1 - hit.distance
+                    })
+
+            # 过滤低相似度结果
+            filtered_results = [
+                result for result in formatted_results
+                if result['similarity'] >= min_score
+            ]
+
+            return filtered_results
+
+        except Exception as e:
+            logger.error(f" 相似度搜索失败: {e}")
+            return []
+
+    def _create_filter(self, filters: Dict[str, Any]) -> str:
+        """创建过滤条件"""
+        if not filters:
+            return ""
+
+        conditions = []
+        for key, value in filters.items():
+            if isinstance(value, str):
+                conditions.append(f'metadata["{key}"] == "{value}"')
+            elif isinstance(value, (int, float)):
+                conditions.append(f'metadata["{key}"] == {value}')
+            else:
+                conditions.append(f'metadata["{key}"] == "{value}"')
+
+        return " and ".join(conditions)
+
+
+class LLMReviewClient:
+    """LLM审查客户端"""
+
+    async def review_basis(self, Message: str, trace_id: str = None) -> str:
+        try:
+        
+            task_prompt_info = {
+                "task_prompt": Message,
+                "task_name": "规范性引用文件识别与状态判断"
+            }
+
+            # 调用统一模型客户端
+            response = await generate_model_client.get_model_generate_invoke(
+                trace_id=trace_id,
+                task_prompt_info=task_prompt_info
+            )
+            return response
+
+        except Exception as e:
+            logger.error(f" 模型调用准备阶段失败: {e}")
+            # 返回空JSON数组字符串以防解析崩溃
+            return "[]"
+        # ==================== 修复结束 ====================
+
+
+class BasisReviewService:
+    """编制依据审查服务核心类"""
+
+    def __init__(self, max_concurrent: int = 4):
+        self.search_engine = BasisSearchEngine()
+        self.llm_client = LLMReviewClient()
+        self.text_processor = TextProcessor()
+        self.response_processor = StandardizedResponseProcessor()  # 标准化处理器
+        # 确保使用最新的prompt_loader实例
+        from core.construction_review.component.reviewers.utils.prompt_loader import PromptLoader
+        fresh_prompt_loader = PromptLoader()
+        self.message_builder = MessageBuilder(fresh_prompt_loader)
+        self.max_concurrent = max_concurrent
+        self._semaphore = None
+
+    async def __aenter__(self):
+        """异步上下文管理器入口"""
+        if self._semaphore is None:
+            self._semaphore = asyncio.Semaphore(self.max_concurrent)
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        """异步上下文管理器出口"""
+        return False
+
+    async def review_batch(
+        self,
+        basis_items: List[str],
+        collection_name: str = "first_bfp_collection_status",
+        filters: Optional[Dict[str, Any]] = None,
+        min_score: float = 0.3,
+        top_k_each: int = 3,
+    ) -> List[Dict[str, Any]]:
+        """异步批次审查(通常3条)"""
+        basis_items = [x for x in (basis_items or []) if isinstance(x, str) and x.strip()]
+        if not basis_items:
+            return []
+
+        async with self._semaphore:
+            try:
+                # 并发搜索每个编制依据
+                search_tasks = []
+                for basis in basis_items:
+                    task = asyncio.create_task(
+                        self._async_search_basis(basis, collection_name, min_score, top_k_each, filters)
+                    )
+                    search_tasks.append(task)
+
+                # 等待所有搜索完成
+                search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
+
+                grouped_candidates = []
+                for i, result in enumerate(search_results):
+                    if isinstance(result, Exception):
+                        logger.error(f" 搜索失败 '{basis_items[i]}': {result}")
+                        grouped_candidates.append([])
+                    else:
+                        grouped_candidates.append(result)
+
+                # 构建提示词模板和用户内容
+                prompt_template = self.message_builder.get_prompt_template()
+                message=prompt_template.partial(reference_content=grouped_candidates, check_content=basis_items)
+                trace_id = f"prep_basis_batch_{int(time.time())}"
+                llm_out = await self.llm_client.review_basis(message, trace_id)
+                print("LLM输出:\n")
+                print(llm_out)
+
+                # # 使用标准化处理器处理响应
+                # standardized_result = self.response_processor.process_llm_response(llm_out, "reference_check")
+
+                # # 统计问题数量
+                # issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
+                # logger.info(f"编制依据批次审查完成:总计 {len(basis_items)} 项,发现问题 {issue_count} 项")
+
+                # return standardized_result
+
+            except Exception as e:
+                logger.error(f" 批次处理失败: {e}")
+                return [{
+                    "check_item": "reference_check",
+                    "check_result": {"error": str(e), "basis_items": basis_items},
+                    "exist_issue": True,
+                    "risk_info": {"risk_level": "high"}
+                }]
+
+    
+    async def _async_search_basis(
+        self,
+        basis: str,
+        collection_name: str,
+        min_score: float,
+        top_k_each: int,
+        filters: Optional[Dict[str, Any]]
+    ) -> List[Dict[str, Any]]:
+        """异步搜索单个编制依据"""
+        try:
+            # 在线程池中执行同步搜索操作
+            loop = asyncio.get_event_loop()
+            retrieved = await loop.run_in_executor(
+                None,
+                self.search_engine.similarity_search,
+                collection_name,
+                basis,
+                min_score,
+                top_k_each,
+                filters
+            )
+            logger.info(f" 搜索 '{basis}' -> 找到 {len(retrieved or [])} 个结果")
+            return retrieved or []
+        except Exception as e:
+            logger.error(f" 搜索失败 '{basis}': {e}")
+            return []
+
+    
+    async def review_all(self, text: str, collection_name: str = "already_basis",
+                        progress_manager=None, callback_task_id: str = None) -> List[List[Dict[str, Any]]]:
+        """异步批量审查所有编制依据"""
+        items = self.text_processor.extract_basis(text)
+        if not items:
+            return []
+
+        start_time = time.time()
+        total_batches = (len(items) + 2) // 3  # 计算总批次数
+        
+        # 发送开始审查的SSE推送
+        if progress_manager and callback_task_id:
+            try:
+                await progress_manager.update_stage_progress(
+                    callback_task_id=callback_task_id,
+                    stage_name="AI审查",
+                    current=0,
+                    status="processing",
+                    message=f"开始编制依据审查,共{len(items)}项编制依据",
+                    overall_task_status="processing",
+                    event_type="processing"
+                )
+            except Exception as e:
+                logger.error(f"SSE推送开始消息失败: {e}")
+
+        # 分批处理
+        batches = []
+        for i in range(0, len(items), 3):
+            batch = items[i:i + 3]
+            batches.append(batch)
+
+        # 异步并发执行所有批次,使用回调处理SSE推送
+        async def process_batch_with_callback(batch_index: int, batch: List[str]) -> List[Dict[str, Any]]:
+            """处理单个批次并执行SSE回调"""
+            try:
+                # 执行单个批次审查
+                result = await self.review_batch(batch, collection_name)
+
+                # 统计当前批次结果
+                batch_standard_count = 0
+                for item in result:
+                    if isinstance(item, dict) and item.get('is_standard', False):
+                        batch_standard_count += 1
+
+                # 立即推送当前批次完成的SSE消息
+                logger.info(f"批次{batch_index + 1}完成,准备推送SSE")
+                if progress_manager and callback_task_id:
+                    try:
+                        progress_percent = int((batch_index + 1) / total_batches * 100)
+                        await progress_manager.update_stage_progress(
+                            callback_task_id=callback_task_id,
+                            stage_name=f"编制依据审查-批次{batch_index + 1}",
+                            current=progress_percent,
+                            status="processing",
+                            message=f"完成第{batch_index + 1}/{total_batches}批次编制依据审查,{len(batch)}项,其中{batch_standard_count}项为标准",
+                            overall_task_status="processing",
+                            event_type="processing",
+                            issues=result  # 推送该批次的审查结果
+                        )
+                        logger.info(f"批次{batch_index + 1} SSE推送成功")
+                    except Exception as e:
+                        logger.error(f"SSE推送批次{batch_index + 1}结果失败: {e}")
+
+                return result
+
+            except Exception as e:
+                logger.error(f" 批次 {batch_index} 处理失败: {e}")
+                error_result = [{"name": name, "is_standard": False, "status": "", "meg": f"批次处理失败: {str(e)}"}
+                                for name in batch]
+
+                # 即使失败也要推送结果
+                if progress_manager and callback_task_id:
+                    try:
+                        progress_percent = int((batch_index + 1) / total_batches * 100)
+                        await progress_manager.update_stage_progress(
+                            callback_task_id=callback_task_id,
+                            stage_name=f"编制依据审查-批次{batch_index + 1}",
+                            current=progress_percent,
+                            status="processing",
+                            message=f"第{batch_index + 1}/{total_batches}批次处理失败",
+                            overall_task_status="processing",
+                            event_type="processing",
+                            issues=error_result
+                        )
+                    except Exception as push_e:
+                        logger.error(f"SSE推送失败批次{batch_index + 1}结果失败: {push_e}")
+
+                return error_result
+
+        # 创建所有批次的异步任务
+        batch_tasks = []
+        for i, batch in enumerate(batches):
+            task = process_batch_with_callback(i, batch)
+            batch_tasks.append(task)
+
+        # 并发执行所有批次
+        logger.info(f"开始并发执行{total_batches}个批次编制依据审查")
+        processed_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
+
+        # 处理异常结果并统计
+        total_items = 0
+        issue_items = 0
+        successful_batches = 0
+
+        # 重新构建结果列表,过滤异常
+        final_results = []
+        for i, result in enumerate(processed_results):
+            if isinstance(result, Exception):
+                logger.error(f" 批次 {i} 返回异常: {result}")
+                error_batch = batches[i] if i < len(batches) else []
+                error_result = [{
+                    "check_item": "reference_check",
+                    "check_result": {"error": str(result), "basis_items": error_batch},
+                    "exist_issue": True,
+                    "risk_info": {"risk_level": "high"}
+                }]
+                final_results.append(error_result)
+            else:
+                final_results.append(result)
+                successful_batches += 1
+
+        # 统计总结果
+        for result in final_results:
+            for item in result:
+                total_items += 1
+                if isinstance(item, dict) and item.get('exist_issue', False):
+                    issue_items += 1
+
+        logger.info(f"并发执行完成,成功批次: {successful_batches}/{total_batches}")
+
+
+        # 发送完成审查的SSE推送
+        elapsed_time = time.time() - start_time
+        if progress_manager and callback_task_id:
+            try:
+                await progress_manager.update_stage_progress(
+                    callback_task_id=callback_task_id,
+                    stage_name="编制依据审查",
+                    current=15,
+                    status="processing",
+                    message=f"编制依据审查完成,共{total_items}项,发现问题{issue_items}项,耗时{elapsed_time:.2f}秒",
+                    overall_task_status="processing",
+                    event_type="processing"
+                )
+            except Exception as e:
+                logger.error(f"SSE推送完成消息失败: {e}")
+
+        logger.info(f" 异步审查完成,耗时: {elapsed_time:.4f} 秒")
+        logger.info(f" 总编制依据: {total_items}, 问题项: {issue_items}, 成功批次: {successful_batches}/{total_batches}")
+        return final_results
+
+
+# 便捷函数
+async def review_basis_batch_async(basis_items: List[str], max_concurrent: int = 4) -> List[Dict[str, Any]]:
+    """异步批次审查便捷函数"""
+    async with BasisReviewService(max_concurrent=max_concurrent) as service:
+        return await service.review_batch(basis_items)
+
+
+async def review_all_basis_async(text: str, max_concurrent: int = 4) -> List[List[Dict[str, Any]]]:
+    """异步全部审查便捷函数"""
+    async with BasisReviewService(max_concurrent=max_concurrent) as service:
+        return await service.review_all(text)
+
+if __name__ == "__main__":
+    # 简单测试
+    test_text = "根据《建筑设计防火规范》(GB50016-2014)和《高层民用建筑设计防火规范》(GB50045-95)进行设计。"
+    result = asyncio.run(review_all_basis_async(test_text))