Forráskód Böngészése

v0.0.4-规范性功能优化

ZengChao 2 hónapja
szülő
commit
30ff8ed6c0

+ 125 - 0
core/construction_review/component/ai_review_engine.py

@@ -721,6 +721,131 @@ class AIReviewEngine(BaseReviewer):
                     "error_message": error_msg
                 }
             }
+    async def reference_basis_reviewer(self, review_data: Dict[str, Any], trace_id: str,
+                                state: dict = None, stage_name: str = None) -> Dict[str, Any]:
+        """
+        执行编制依据审查:调用prep_basis_reviewer中的异步审查功能
+
+        Args:
+            review_data: 待审查的编制依据数据,包含编制依据文本内容
+            trace_id: 追踪ID
+            state: 状态字典
+            stage_name: 阶段名称
+
+        Returns:
+            审查结果字典,包含编制依据审查结果
+        """
+        start_time = time.time()
+        try:
+            logger.info(f"开始编制依据审查,trace_id: {trace_id}")
+
+            # 提取关键数据
+            review_content = review_data.get('content', '')
+            max_concurrent = review_data.get('max_concurrent', 4)
+
+            # 添加调试信息
+            logger.info(f"提取的编制依据内容长度: {len(review_content)}")
+            if review_content:
+                logger.info(f"编制依据内容预览: {review_content[:50]}...")
+            else:
+                logger.warning("编制依据内容为空,将跳过审查")
+
+            # 检查是否有有效的编制依据内容
+            if not review_content or not review_content.strip():
+                logger.warning("没有可执行的编制依据审查任务")
+                return {
+                    "prep_basis_review_results": {
+                        "review_results": [],
+                        "review_content": review_content,
+                        "total_basis_items": 0,
+                        "valid_items": 0,
+                        "standard_items": 0,
+                        "execution_time": time.time() - start_time,
+                        "error_message": "编制依据内容为空,无法进行审查"
+                    }
+                }
+
+            # 调用prep_basis_reviewer中的异步审查方法
+            logger.info("开始调用编制依据异步审查...")
+
+            try:
+                # 使用信号量控制并发
+                async with self.semaphore:
+                    # 从state中获取progress_manager和callback_task_id
+                    progress_manager = state.get('progress_manager') if state else None
+                    callback_task_id = state.get('callback_task_id') if state else None
+
+                    # 调用带有SSE推送功能的review_all方法
+                    from core.construction_review.component.reviewers.reference_basis_reviewer import BasisReviewService
+                    async with BasisReviewService(max_concurrent=max_concurrent) as service:
+                        prep_basis_review_results = await service.review_all(
+                            review_content,
+                            collection_name="first_bfp_collection_status",
+                            progress_manager=progress_manager,
+                            callback_task_id=callback_task_id
+                        )
+
+                    logger.info(f"编制依据审查完成,批次数量: {len(prep_basis_review_results)}")
+
+                    # 统计审查结果
+                    total_items = 0
+                    valid_items = 0
+                    standard_items = 0
+
+                    for batch in prep_basis_review_results:
+                        if isinstance(batch, list):
+                            total_items += len(batch)
+                            for item in batch:
+                                if isinstance(item, dict):
+                                    valid_items += 1
+                                    if item.get('is_standard', False):
+                                        standard_items += 1
+
+                    logger.info(f"审查统计 - 总编制依据: {total_items}, 有效项: {valid_items}, 标准项: {standard_items}")
+
+            except Exception as e:
+                logger.error(f"编制依据异步审查失败: {str(e)}")
+                return {
+                    "prep_basis_review_results": {
+                        "review_results": [],
+                        "review_content": review_content,
+                        "total_basis_items": 0,
+                        "valid_items": 0,
+                        "standard_items": 0,
+                        "execution_time": time.time() - start_time,
+                        "error_message": f"编制依据审查失败: {str(e)}"
+                    }
+                }
+
+            # 返回完整结果
+            return {
+                "prep_basis_review_results": {
+                    "review_results": prep_basis_review_results,
+                    "review_content": review_content,
+                    "total_basis_items": total_items,
+                    "valid_items": valid_items,
+                    "standard_items": standard_items,
+                    "execution_time": time.time() - start_time,
+                    "error_message": None
+                }
+            }
+
+        except Exception as e:
+            execution_time = time.time() - start_time
+            error_msg = f"编制依据审查失败: {str(e)}"
+            logger.error(error_msg, exc_info=True)
+
+            return {
+                "prep_basis_review_results": {
+                    "review_results": [],
+                    "review_content": review_data.get('content', ''),
+                    "total_basis_items": 0,
+                    "valid_items": 0,
+                    "standard_items": 0,
+                    "execution_time": execution_time,
+                    "error_message": error_msg
+                }
+            }
         
     async def timeliness_basis_reviewer(self, review_data: Dict[str, Any], trace_id: str,
                                 state: dict = None, stage_name: str = None) -> Dict[str, Any]:

+ 2 - 2
core/construction_review/component/document_processor.py

@@ -542,8 +542,8 @@ class DocumentProcessor:
                 # 处理原始大纲,按章节层级结构化 - 复用doc_worker的逻辑
                 result['outline'] = self._create_outline_from_toc(raw_content.get('toc_info', {}))
 
-            with open(rf"temp\document_temp\文档切分预处理结果.json", 'w', encoding='utf-8') as f:
-                json.dump(result, f, ensure_ascii=False, indent=4)
+            # with open(rf"temp\document_temp\文档切分预处理结果.json", 'w', encoding='utf-8') as f:
+            #     json.dump(result, f, ensure_ascii=False, indent=4)
             return result
 
         except Exception as e:

+ 55 - 0
core/construction_review/component/reviewers/prompt/reference_basis_reviewer.yaml

@@ -0,0 +1,55 @@
+reference_basis_reviewer:
+  system_prompt: |
+    /no_think
+    【角色】
+    你是一个编制依据规范性风险点评审助手。
+
+    【任务】
+    你的任务是对编制依据的格式进行评估,并提出改进建议。
+    '《》'书名号中的是文件名'()'括号中的是编号。  
+
+    【字段说明】
+    - **issue_point**:格式判定
+    - **location**:审查内容,保持与原文一样
+    - **suggestion**:建议(可执行动作)
+    - **reason**:问题的原因分析和依据说明,基于标准规范要求的详细说明
+    - **risk_level**:风险水平,只能是 "无风险" / "高风险"
+
+    【格式判定类型(仅限以下四类)】
+    编制依据格式正确:同时具有'《》'书名号包裹的文件名和'()'包裹的编号,对应无风险。  
+    编制依据格式错误:缺乏'《》'书名号包裹的文件名或缺乏'()'包裹的编号,对应高风险。
+    文件名错误:文件名中存在多余的空格或乱码,对应高风险。  
+  
+
+    【输出格式规范】
+    - 你只能输出一个数组对象
+    - 数组中每个元素为一个 JSON 对象
+    - 为每一个审查文件输出一个结果对象
+    - 每个对象只能包含五个字段: issue_point, location, suggestion, reason, risk_level
+
+    【输出示例】
+    ```json
+    [
+      {{
+        "issue_point": "编制依据格式正确",
+        "location": "《建筑工程施工质量验收统一标准》(GB 50204-2005)",
+        "suggestion": "无需修改",
+        "reason": "", "文件《建筑工程施工质量验收统一标准》(GB 50204-2005)具有完整的文件名和编号"
+        "risk_level": "无风险"
+      }},
+      {{
+        "issue_point": "文件名错误",
+        "location": "《建筑施 工组织 设计编 制规范》(GB/T 50502-2015)",
+        "suggestion": "去掉文件名中的空格或乱码",
+        "reason": "《建筑施 工组织 设计编 制规范》(GB/T 50502-2015)文件名中存在空格或乱码",
+        "risk_level": "高风险"
+      }}
+    ]
+    ```
+    
+  user_prompt_template: |
+    请审查以下编制依据规范性: 
+    审查内容:
+    {check_content}
+
+

+ 402 - 0
core/construction_review/component/reviewers/reference_basis_reviewer.py

@@ -0,0 +1,402 @@
+import os
+import sys
+import json
+import re
+import time
+from typing import Any, Dict, List, Optional
+import asyncio
+
+project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../../'))
+# 将根目录添加到sys.path
+sys.path.append(project_root)
+
+# 导入必要的依赖
+try:
+    from pymilvus import connections, Collection
+    from foundation.infrastructure.config.config import config_handler
+    from foundation.ai.models.model_handler import model_handler as mh
+    from foundation.ai.agent.generate.model_generate import generate_model_client
+    from core.construction_review.component.reviewers.utils.prompt_loader import prompt_loader
+    from core.construction_review.component.reviewers.utils.inter_tool import InterTool
+    from foundation.observability.logger.loggering import server_logger as logger
+    from langchain_core.prompts import ChatPromptTemplate
+    from langchain_milvus import Milvus, BM25BuiltInFunction
+    from functools import partial
+    
+except ImportError as e:
+    logger.warning(f"Warning: 无法导入依赖: {e}")
+    # 设置默认值,避免程序崩溃
+    mh = None
+    generate_model_client = None
+    prompt_loader = None
+    logger = None
+    InterTool = None
+
+
+class TextProcessor:
+    """文本处理工具类"""
+
+    @staticmethod
+    def extract_basis(text: str) -> List[str]:
+        """从文本中提取编制依据,支持:《》《》(…)《》【…】"""
+        pattern = re.compile(
+            r'《[^》]+》'                 # 《标题》
+            r'(?:([^)]+)|【[^】]+】)?'  # 可选的(…)或【…】
+        )
+        return pattern.findall(text)
+
+
+
+
+class StandardizedResponseProcessor:
+    """标准化响应处理器 - 统一为outline_reviewer.py格式"""
+
+    def __init__(self):
+        if InterTool:
+            self.inter_tool = InterTool()
+        else:
+            self.inter_tool = None
+
+    def process_llm_response(self, response_text: str, check_name: str = "编制依据检查") -> List[Dict[str, Any]]:
+        """
+        处理LLM响应,返回标准格式
+
+        Args:
+            response_text: LLM原始响应文本
+            check_name: 检查项名称
+
+        Returns:
+            List[Dict]: 标准格式的审查结果列表
+        """
+        if not self.inter_tool:
+            logger.warning("InterTool未初始化,返回空结果")
+            return []
+
+        try:
+            # 使用inter_tool提取JSON数据
+            json_data = self.inter_tool._extract_json_data(response_text)
+            parsed_result = []
+
+            if json_data and isinstance(json_data, list):
+                for item in json_data:
+                    parsed_result.append(self.inter_tool._create_issue_item(item, check_name))
+            elif json_data and isinstance(json_data, dict):
+                parsed_result.append(self.inter_tool._create_issue_item(json_data, check_name))
+
+            return parsed_result
+
+        except Exception as e:
+            logger.error(f"处理LLM响应失败: {str(e)}")
+            # 返回一个错误条目
+            return [{
+                "check_item": check_name,
+                "check_result": {"error": str(e)},
+                "exist_issue": True,
+                "risk_info": {"risk_level": "medium"}
+            }]
+
+
+class MessageBuilder:
+    """消息构建工具类"""
+
+    def __init__(self, prompt_loader_instance=None):
+        self.prompt_loader = prompt_loader_instance
+
+    
+
+    def get_prompt_template(self):
+        import yaml
+        with open("core/construction_review/component/reviewers/prompt/reference_basis_reviewer.yaml", "r", encoding="utf-8") as f:
+            data = yaml.safe_load(f)
+        return ChatPromptTemplate.from_messages([
+                ("system", data["reference_basis_reviewer"]["system_prompt"]),
+                ("user", data["reference_basis_reviewer"]["user_prompt_template"])
+            ])
+    
+class LLMReviewClient:
+    """LLM审查客户端"""
+
+    async def review_basis(self, Message: str, trace_id: str = None) -> str:
+        try:
+        
+            task_prompt_info = {
+                "task_prompt": Message,
+                "task_name": "规范性引用文件识别与状态判断"
+            }
+            logger.info(f" 模型调用准备阶段: {task_prompt_info}")
+
+            # 调用统一模型客户端
+            response = await generate_model_client.get_model_generate_invoke(
+                trace_id=trace_id,
+                task_prompt_info=task_prompt_info
+            )
+            return response
+
+        except Exception as e:
+            logger.error(f" 模型调用准备阶段失败: {e}")
+            # 返回空JSON数组字符串以防解析崩溃
+            return "[]"
+        # ==================== 修复结束 ====================
+
+
+class BasisReviewService:
+    """编制依据审查服务核心类"""
+
+    def __init__(self, max_concurrent: int = 4):
+        self.llm_client = LLMReviewClient()
+        self.text_processor = TextProcessor()
+        self.response_processor = StandardizedResponseProcessor()  # 标准化处理器
+        # 确保使用最新的prompt_loader实例
+        from core.construction_review.component.reviewers.utils.prompt_loader import PromptLoader
+        fresh_prompt_loader = PromptLoader()
+        self.message_builder = MessageBuilder(fresh_prompt_loader)
+        self.max_concurrent = max_concurrent
+        self._semaphore = None
+
+    async def __aenter__(self):
+        """异步上下文管理器入口"""
+        if self._semaphore is None:
+            self._semaphore = asyncio.Semaphore(self.max_concurrent)
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        """异步上下文管理器出口"""
+        return False
+
+    async def review_batch(
+        self,
+        basis_items: List[str],
+        collection_name: str = "first_bfp_collection_status",
+        filters: Optional[Dict[str, Any]] = None,
+        min_score: float = 0.3,
+        top_k_each: int = 3,
+    ) -> List[Dict[str, Any]]:
+        """异步批次审查(通常3条)"""
+        basis_items = [x for x in (basis_items or []) if isinstance(x, str) and x.strip()]
+        if not basis_items:
+            return []
+
+        async with self._semaphore:
+            try:
+                # 构建提示词模板和用户内容
+                prompt_template = self.message_builder.get_prompt_template()
+                message = prompt_template.partial(check_content=basis_items)
+                trace_id = f"prep_basis_batch_{int(time.time())}"
+                llm_out = await self.llm_client.review_basis(message, trace_id)
+                print("LLM输出:\n")
+                print(llm_out)
+
+                # 使用标准化处理器处理响应
+                standardized_result = self.response_processor.process_llm_response(llm_out, "reference_check")
+
+                # 统计问题数量
+                issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
+                logger.info(f"编制依据批次审查完成:总计 {len(basis_items)} 项,发现问题 {issue_count} 项")
+
+                return standardized_result
+
+            except Exception as e:
+                logger.error(f" 批次处理失败: {e}")
+                return [{
+                    "check_item": "reference_check",
+                    "check_result": {"error": str(e), "basis_items": basis_items},
+                    "exist_issue": True,
+                    "risk_info": {"risk_level": "high"}
+                }]
+
+    
+    
+    async def _async_search_basis(
+        self,
+        basis: str,
+        collection_name: str,
+        top_k_each: int
+    ) -> List[dict]:
+        """异步搜索单个编制依据(Hybrid Search)"""
+        try:
+            loop = asyncio.get_running_loop()
+            func = partial(
+                self.search_engine.hybrid_search,
+                collection_name=collection_name,
+                query_text=basis,
+                top_k=top_k_each,
+                ranker_type="weighted",
+                dense_weight=0.3,
+                sparse_weight=0.7
+            )
+            retrieved = await loop.run_in_executor(None, func)
+            logger.info(f" 搜索 '{basis}' -> 找到 {len(retrieved or [])} 个结果")
+            return retrieved or []
+        except Exception as e:
+            logger.error(f" 搜索失败 '{basis}': {e}")
+            return []
+
+    
+    async def review_all(self, text: str, collection_name: str = "first_bfp_collection_status",
+                        progress_manager=None, callback_task_id: str = None) -> List[List[Dict[str, Any]]]:
+        """异步批量审查所有编制依据"""
+        items = self.text_processor.extract_basis(text)
+        if not items:
+            return []
+
+        start_time = time.time()
+        total_batches = (len(items) + 2) // 3  # 计算总批次数
+        
+        # 发送开始审查的SSE推送
+        if progress_manager and callback_task_id:
+            try:
+                await progress_manager.update_stage_progress(
+                    callback_task_id=callback_task_id,
+                    stage_name="AI审查",
+                    current=0,
+                    status="processing",
+                    message=f"开始编制依据审查,共{len(items)}项编制依据",
+                    overall_task_status="processing",
+                    event_type="processing"
+                )
+            except Exception as e:
+                logger.error(f"SSE推送开始消息失败: {e}")
+
+        # 分批处理
+        batches = []
+        for i in range(0, len(items), 3):
+            batch = items[i:i + 3]
+            batches.append(batch)
+
+        # 异步并发执行所有批次,使用回调处理SSE推送
+        async def process_batch_with_callback(batch_index: int, batch: List[str]) -> List[Dict[str, Any]]:
+            """处理单个批次并执行SSE回调"""
+            try:
+                # 执行单个批次审查
+                result = await self.review_batch(batch, collection_name)
+
+                # 统计当前批次结果
+                batch_standard_count = 0
+                for item in result:
+                    if isinstance(item, dict) and item.get('is_standard', False):
+                        batch_standard_count += 1
+
+                # 立即推送当前批次完成的SSE消息
+                logger.info(f"批次{batch_index + 1}完成,准备推送SSE")
+                if progress_manager and callback_task_id:
+                    try:
+                        progress_percent = int((batch_index + 1) / total_batches * 100)
+                        await progress_manager.update_stage_progress(
+                            callback_task_id=callback_task_id,
+                            stage_name=f"编制依据审查-批次{batch_index + 1}",
+                            current=progress_percent,
+                            status="processing",
+                            message=f"完成第{batch_index + 1}/{total_batches}批次编制依据审查,{len(batch)}项,其中{batch_standard_count}项为标准",
+                            overall_task_status="processing",
+                            event_type="processing",
+                            issues=result  # 推送该批次的审查结果
+                        )
+                        logger.info(f"批次{batch_index + 1} SSE推送成功")
+                    except Exception as e:
+                        logger.error(f"SSE推送批次{batch_index + 1}结果失败: {e}")
+
+                return result
+
+            except Exception as e:
+                logger.error(f" 批次 {batch_index} 处理失败: {e}")
+                error_result = [{"name": name, "is_standard": False, "status": "", "meg": f"批次处理失败: {str(e)}"}
+                                for name in batch]
+
+                # 即使失败也要推送结果
+                if progress_manager and callback_task_id:
+                    try:
+                        progress_percent = int((batch_index + 1) / total_batches * 100)
+                        await progress_manager.update_stage_progress(
+                            callback_task_id=callback_task_id,
+                            stage_name=f"编制依据审查-批次{batch_index + 1}",
+                            current=progress_percent,
+                            status="processing",
+                            message=f"第{batch_index + 1}/{total_batches}批次处理失败",
+                            overall_task_status="processing",
+                            event_type="processing",
+                            issues=error_result
+                        )
+                    except Exception as push_e:
+                        logger.error(f"SSE推送失败批次{batch_index + 1}结果失败: {push_e}")
+
+                return error_result
+
+        # 创建所有批次的异步任务
+        batch_tasks = []
+        for i, batch in enumerate(batches):
+            task = process_batch_with_callback(i, batch)
+            batch_tasks.append(task)
+
+        # 并发执行所有批次
+        logger.info(f"开始并发执行{total_batches}个批次编制依据审查")
+        processed_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
+
+        # 处理异常结果并统计
+        total_items = 0
+        issue_items = 0
+        successful_batches = 0
+
+        # 重新构建结果列表,过滤异常
+        final_results = []
+        for i, result in enumerate(processed_results):
+            if isinstance(result, Exception):
+                logger.error(f" 批次 {i} 返回异常: {result}")
+                error_batch = batches[i] if i < len(batches) else []
+                error_result = [{
+                    "check_item": "reference_check",
+                    "check_result": {"error": str(result), "basis_items": error_batch},
+                    "exist_issue": True,
+                    "risk_info": {"risk_level": "high"}
+                }]
+                final_results.append(error_result)
+            else:
+                final_results.append(result)
+                successful_batches += 1
+
+        # 统计总结果
+        for result in final_results:
+            for item in result:
+                total_items += 1
+                if isinstance(item, dict) and item.get('exist_issue', False):
+                    issue_items += 1
+
+        logger.info(f"并发执行完成,成功批次: {successful_batches}/{total_batches}")
+
+
+        # 发送完成审查的SSE推送
+        elapsed_time = time.time() - start_time
+        if progress_manager and callback_task_id:
+            try:
+                await progress_manager.update_stage_progress(
+                    callback_task_id=callback_task_id,
+                    stage_name="编制依据审查",
+                    current=15,
+                    status="processing",
+                    message=f"编制依据审查完成,共{total_items}项,发现问题{issue_items}项,耗时{elapsed_time:.2f}秒",
+                    overall_task_status="processing",
+                    event_type="processing"
+                )
+            except Exception as e:
+                logger.error(f"SSE推送完成消息失败: {e}")
+
+        logger.info(f" 异步审查完成,耗时: {elapsed_time:.4f} 秒")
+        logger.info(f" 总编制依据: {total_items}, 问题项: {issue_items}, 成功批次: {successful_batches}/{total_batches}")
+        return final_results
+
+
+# 便捷函数
+async def review_basis_batch_async(basis_items: List[str], max_concurrent: int = 4) -> List[Dict[str, Any]]:
+    """异步批次审查便捷函数"""
+    async with BasisReviewService(max_concurrent=max_concurrent) as service:
+        return await service.review_batch(basis_items)
+
+
+async def review_all_basis_async(text: str, max_concurrent: int = 4) -> List[List[Dict[str, Any]]]:
+    """异步全部审查便捷函数"""
+    async with BasisReviewService(max_concurrent=max_concurrent) as service:
+        return await service.review_all(text)
+
+if __name__ == "__main__":
+    # 简单测试
+    test_text = "《中华人民共和国特  种设备安全法》(2023)"
+    result = asyncio.run(review_all_basis_async(test_text))

+ 6 - 6
core/construction_review/workflows/ai_review_workflow.py

@@ -304,10 +304,10 @@ class AIReviewWorkflow:
                 
             # # 4. 执行编制依据审查
             # #await self.core_fun._send_start_review_progress(state, total_units,'prpe_basis')
-            prep_basis_check = "reference_check" in self.task_info.get_review_config_list()
-            prep_basis_review_result = None
+            reference_check = "reference_check" in self.task_info.get_review_config_list()
+            reference_check_result = None
             logger.info(f"执行编制依据审查")
-            if not prep_basis_check:
+            if not reference_check:
                 logger.info(f"跳过执行编制依据审查")
             else:
                 # 从结构化内容中提取编制依据文本
@@ -322,7 +322,7 @@ class AIReviewWorkflow:
                     }
 
                     # 执行编制依据审查
-                    prep_basis_review_result = await self.ai_review_engine.prep_basis_review(
+                    reference_check_result = await self.ai_review_engine.reference_basis_reviewer(
                         review_data=prep_basis_review_data,
                         trace_id=state["callback_task_id"],
                         state=state,
@@ -371,8 +371,8 @@ class AIReviewWorkflow:
             all_issues = []
             if completeness_check:
                 all_issues.append(outline_review_result)
-            if prep_basis_check and prep_basis_review_result:
-                all_issues.append(prep_basis_review_result)
+            if reference_check and reference_check_result:
+                all_issues.append(reference_check_result)
             if timeliness_check and timeliness_check_result:
                 all_issues.append(timeliness_check_result)
             for unit_issues in successful_results: