Просмотр исходного кода

Merge branch 'dev' of http://47.109.151.80:15030/CRBC-MaaS-Platform-Project/LQAgentPlatform into dev

WangXuMing 2 месяцев назад
Родитель
Сommit
4b723b7ca4

+ 126 - 1
core/construction_review/component/ai_review_engine.py

@@ -58,7 +58,6 @@ from foundation.infrastructure.config.config import config_handler
 from foundation.observability.logger.loggering import server_logger as logger
 from foundation.observability.logger.loggering import server_logger as logger
 from core.construction_review.component.reviewers.base_reviewer import BaseReviewer
 from core.construction_review.component.reviewers.base_reviewer import BaseReviewer
 from core.construction_review.component.reviewers.outline_reviewer import OutlineReviewer
 from core.construction_review.component.reviewers.outline_reviewer import OutlineReviewer
-from core.construction_review.component.reviewers.prep_basis_reviewer import review_all_basis_async
 @dataclass
 @dataclass
 class ReviewResult:
 class ReviewResult:
     """审查结果"""
     """审查结果"""
@@ -720,5 +719,131 @@ class AIReviewEngine(BaseReviewer):
                     "error_message": error_msg
                     "error_message": error_msg
                 }
                 }
             }
             }
+        
+    async def timeliness_basis_reviewer(self, review_data: Dict[str, Any], trace_id: str,
+                                state: dict = None, stage_name: str = None) -> Dict[str, Any]:
+        """
+        执行编制依据审查:调用prep_basis_reviewer中的异步审查功能
+
+        Args:
+            review_data: 待审查的编制依据数据,包含编制依据文本内容
+            trace_id: 追踪ID
+            state: 状态字典
+            stage_name: 阶段名称
+
+        Returns:
+            审查结果字典,包含编制依据审查结果
+        """
+        start_time = time.time()
+        try:
+            logger.info(f"开始编制依据审查,trace_id: {trace_id}")
+
+            # 提取关键数据
+            review_content = review_data.get('content', '')
+            max_concurrent = review_data.get('max_concurrent', 4)
+
+            # 添加调试信息
+            logger.info(f"提取的编制依据内容长度: {len(review_content)}")
+            if review_content:
+                logger.info(f"编制依据内容预览: {review_content[:50]}...")
+            else:
+                logger.warning("编制依据内容为空,将跳过审查")
+
+            # 检查是否有有效的编制依据内容
+            if not review_content or not review_content.strip():
+                logger.warning("没有可执行的编制依据审查任务")
+                return {
+                    "prep_basis_review_results": {
+                        "review_results": [],
+                        "review_content": review_content,
+                        "total_basis_items": 0,
+                        "valid_items": 0,
+                        "standard_items": 0,
+                        "execution_time": time.time() - start_time,
+                        "error_message": "编制依据内容为空,无法进行审查"
+                    }
+                }
+
+            # 调用prep_basis_reviewer中的异步审查方法
+            logger.info("开始调用编制依据异步审查...")
+
+            try:
+                # 使用信号量控制并发
+                async with self.semaphore:
+                    # 从state中获取progress_manager和callback_task_id
+                    progress_manager = state.get('progress_manager') if state else None
+                    callback_task_id = state.get('callback_task_id') if state else None
+
+                    # 调用带有SSE推送功能的review_all方法
+                    from core.construction_review.component.reviewers.timeliness_basis_reviewer import BasisReviewService
+                    async with BasisReviewService(max_concurrent=max_concurrent) as service:
+                        prep_basis_review_results = await service.review_all(
+                            review_content,
+                            collection_name="first_bfp_collection_status",
+                            progress_manager=progress_manager,
+                            callback_task_id=callback_task_id
+                        )
+
+                    logger.info(f"编制依据审查完成,批次数量: {len(prep_basis_review_results)}")
+
+                    # 统计审查结果
+                    total_items = 0
+                    valid_items = 0
+                    standard_items = 0
+
+                    for batch in prep_basis_review_results:
+                        if isinstance(batch, list):
+                            total_items += len(batch)
+                            for item in batch:
+                                if isinstance(item, dict):
+                                    valid_items += 1
+                                    if item.get('is_standard', False):
+                                        standard_items += 1
+
+                    logger.info(f"审查统计 - 总编制依据: {total_items}, 有效项: {valid_items}, 标准项: {standard_items}")
+
+            except Exception as e:
+                logger.error(f"编制依据异步审查失败: {str(e)}")
+                return {
+                    "prep_basis_review_results": {
+                        "review_results": [],
+                        "review_content": review_content,
+                        "total_basis_items": 0,
+                        "valid_items": 0,
+                        "standard_items": 0,
+                        "execution_time": time.time() - start_time,
+                        "error_message": f"编制依据审查失败: {str(e)}"
+                    }
+                }
+
+            # 返回完整结果
+            return {
+                "prep_basis_review_results": {
+                    "review_results": prep_basis_review_results,
+                    "review_content": review_content,
+                    "total_basis_items": total_items,
+                    "valid_items": valid_items,
+                    "standard_items": standard_items,
+                    "execution_time": time.time() - start_time,
+                    "error_message": None
+                }
+            }
+
+        except Exception as e:
+            execution_time = time.time() - start_time
+            error_msg = f"编制依据审查失败: {str(e)}"
+            logger.error(error_msg, exc_info=True)
+
+            return {
+                "prep_basis_review_results": {
+                    "review_results": [],
+                    "review_content": review_data.get('content', ''),
+                    "total_basis_items": 0,
+                    "valid_items": 0,
+                    "standard_items": 0,
+                    "execution_time": execution_time,
+                    "error_message": error_msg
+                }
+            }
 
 
 
 

+ 16 - 14
core/construction_review/component/reviewers/prompt/timeliness_basis_reviewer.yaml

@@ -12,36 +12,38 @@ timeliness_basis_reviewer:
     - **location**:依据的参考文件
     - **location**:依据的参考文件
     - **suggestion**:建议(可执行动作)
     - **suggestion**:建议(可执行动作)
     - **reason**:问题的原因分析和依据说明,基于标准规范要求的详细说明
     - **reason**:问题的原因分析和依据说明,基于标准规范要求的详细说明
-    - **risk_level**:风险水平,只能是 "LOW" / "MEDIUM" / "HIGH"
+    - **risk_level**:风险水平,只能是 "无风险" / "中风险" / "高风险"
 
 
     【时效性判定类型(仅限以下四类)】
     【时效性判定类型(仅限以下四类)】
-    个别修订未体现(存在修订/修正单,但 location 未反映)
-    规范版本号错误(location 版本号与候选不一致)
-    引用已被替代的标准(存在明确替代标准)
-    引用已废止的规范(明确标注为废止/停止使用)
+    无参考文件:审查文件与参考中的文件名与编号均不一样,对应 "无风险"
+    规范版本号正确:审查文件与参考中的文件名与编号均一致状态为现行,对应 "无风险"
+    规范版本号错误:审查文件与参考中存在文件名相同但找不到对应编号,对应 "高风险"
+    引用已废止的规范:参考文件中对应文件已明确标注为废止/停止使用,对应 "高风险"
+    引用已被替代的标准:审查文件与参考中的文件名与编号均一致,但状态为废止并且对应状态为现行的新文件,对应 "高风险"
+  
 
 
     【输出格式规范】
     【输出格式规范】
     - 你只能输出一个数组对象
     - 你只能输出一个数组对象
     - 数组中每个元素为一个 JSON 对象
     - 数组中每个元素为一个 JSON 对象
     - 为每一个审查文件输出一个结果对象
     - 为每一个审查文件输出一个结果对象
-    - 每个对象只能包含五个字段:
+    - 每个对象只能包含五个字段: issue_point, location, suggestion, reason, risk_level
 
 
     【输出示例】
     【输出示例】
     ```json
     ```json
     [
     [
       {{
       {{
-        "issue_point": "GB 50204-2015 引用规范 ✔ 符合要求",
+        "issue_point": "规范版本号正确",
         "location": "《建筑工程施工质量验收统一标准》(GB 50204-2015)",
         "location": "《建筑工程施工质量验收统一标准》(GB 50204-2015)",
-        "suggestion": "标准引用格式规范,包含完整的标准编号和标准名称,符合要求",
-        "reason": "依据国家标准编写规范,标准引用应包含标准编号和标准名称,该引用格式正确",
+        "suggestion": "该引用符合现行标准要求,无需修改",
+        "reason": "《建筑工程施工质量验收统一标准》(GB 50204-2015)符合现行国家标准",
         "risk_level": "无风险"
         "risk_level": "无风险"
       }},
       }},
       {{
       {{
-        "issue_point": "缺少安全技术规范 ✘ 不符合要求",
-        "location": "编制依据中缺少相关安全技术规范标准",
-        "suggestion": "建议补充《建筑施工安全检查标准》(JGJ 59-2011)等安全技术规范",
-        "reason": "依据施工组织设计编制规范,应包含相关的安全技术标准",
-        "risk_level": "风险"
+        "issue_point": "引用已被替代的标准",
+        "location": "《建筑施工组织设计编制规范》(GB/T 50502-2020)",
+        "suggestion": "建议将引用更新为现行标准《建筑施工组织设计编制规范》(GB/T 50502-2020)",
+        "reason": "《建筑施工组织设计编制规范》(GB/T 50502-2015)已被《建筑施工组织设计编制规范》(GB/T 50502-2020)替代",
+        "risk_level": "风险"
       }}
       }}
     ]
     ]
     ```
     ```

+ 73 - 139
core/construction_review/component/reviewers/timeliness_basis_reviewer.py

@@ -20,6 +20,9 @@ try:
     from core.construction_review.component.reviewers.utils.inter_tool import InterTool
     from core.construction_review.component.reviewers.utils.inter_tool import InterTool
     from foundation.observability.logger.loggering import server_logger as logger
     from foundation.observability.logger.loggering import server_logger as logger
     from langchain_core.prompts import ChatPromptTemplate
     from langchain_core.prompts import ChatPromptTemplate
+    from langchain_milvus import Milvus, BM25BuiltInFunction
+    from functools import partial
+    
 except ImportError as e:
 except ImportError as e:
     logger.warning(f"Warning: 无法导入依赖: {e}")
     logger.warning(f"Warning: 无法导入依赖: {e}")
     # 设置默认值,避免程序崩溃
     # 设置默认值,避免程序崩溃
@@ -106,37 +109,7 @@ class MessageBuilder:
                 ("system", data["timeliness_basis_reviewer"]["system_prompt"]),
                 ("system", data["timeliness_basis_reviewer"]["system_prompt"]),
                 ("user", data["timeliness_basis_reviewer"]["user_prompt_template"])
                 ("user", data["timeliness_basis_reviewer"]["user_prompt_template"])
             ])
             ])
-
-    def build_user_content(
-        self,
-        basis_items: List[str],
-        grouped_candidates: List[List[Dict[str, Any]]],
-    ) -> str:
-        """构建用户内容"""
-        items = []
-        for raw, cands in zip(basis_items, grouped_candidates):
-            items.append({
-                "raw_text": raw,
-                "candidates": [
-                    {
-                        "id": c.get("id"),
-                        "similarity": c.get("similarity"),
-                        "text": c.get("text") or c.get("text_content") or "",
-                    }
-                    for c in (cands or [])
-                ],
-            })
-
-        user_content = {
-            "items": items,
-            "required_output_example": [
-                {"is_standard": False, "status": "", "meg": ""} for _ in items
-            ],
-        }
-
-        return json.dumps(user_content, ensure_ascii=False)
-
-
+    
 class BasisSearchEngine:
 class BasisSearchEngine:
     """编制依据向量搜索引擎"""
     """编制依据向量搜索引擎"""
 
 
@@ -153,16 +126,6 @@ class BasisSearchEngine:
             self.user = config_handler.get('milvus', 'MILVUS_USER')
             self.user = config_handler.get('milvus', 'MILVUS_USER')
             self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
             self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
 
 
-            # 连接到 Milvus
-            connections.connect(
-                alias="default",
-                host=self.host,
-                port=self.port,
-                user=self.user,
-                db_name="lq_db"
-            )
-            logger.info(f" 成功连接到 Milvus {self.host}:{self.port}")
-
             # 初始化嵌入模型
             # 初始化嵌入模型
             if mh:
             if mh:
                 self.emdmodel = mh._get_lq_qwen3_8b_emd()
                 self.emdmodel = mh._get_lq_qwen3_8b_emd()
@@ -172,94 +135,63 @@ class BasisSearchEngine:
 
 
         except Exception as e:
         except Exception as e:
             logger.error(f" BasisSearchEngine 初始化失败: {e}")
             logger.error(f" BasisSearchEngine 初始化失败: {e}")
-            self.emdmodel = None
-
-    def text_to_vector(self, text: str) -> List[float]:
-        """将文本转换为向量"""
-        if not self.emdmodel:
-            raise ValueError("嵌入模型未初始化")
-
-        try:
-            embedding = self.emdmodel.embed_query(text)
-            return embedding.tolist() if hasattr(embedding, 'tolist') else list(embedding)
-        except Exception as e:
-            logger.error(f"文本向量化失败: {e}")
-            raise
 
 
-    def similarity_search(self, collection_name: str, query_text: str,
-                         min_score: float = 0.3, top_k: int = 3,
-                         filters: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
-        """执行相似度搜索"""
+    def hybrid_search(self, collection_name: str, query_text: str,
+                     top_k: int = 3, ranker_type: str = "weighted",
+                     dense_weight: float = 0.7, sparse_weight: float = 0.3):
         try:
         try:
-            if not self.emdmodel:
-                raise ValueError("搜索器未正确初始化")
 
 
-            # 获取集合
-            collection = Collection(collection_name)
-            collection.load()
-
-            # 转换查询文本为向量
-            query_embedding = self.text_to_vector(query_text)
-
-            # 搜索参数
-            search_params = {
-                "metric_type": "COSINE",
-                "params": {"nprobe": 10}
+            # 连接到现有集合
+            connection_args = {
+                "uri": f"http://{self.host}:{self.port}",
+                "user": self.user,
+                "db_name": "lq_db"
             }
             }
 
 
-            # 构建过滤表达式
-            filter_expr = self._create_filter(filters)
-
-            # 执行搜索
-            results = collection.search(
-                data=[query_embedding],
-                anns_field="embedding",
-                param=search_params,
-                limit=top_k,
-                expr=filter_expr,
-                output_fields=["text", "metadata"]
+            if self.password:
+                connection_args["password"] = self.password
+
+            vectorstore = Milvus(
+                embedding_function=self.emdmodel,
+                collection_name=collection_name,
+                connection_args=connection_args,
+                consistency_level="Strong",
+                builtin_function=BM25BuiltInFunction(),
+                vector_field=["dense", "sparse"]
             )
             )
 
 
-            # 格式化结果
-            formatted_results = []
-            for hits in results:
-                for hit in hits:
-                    formatted_results.append({
-                        'id': hit.id,
-                        'text': hit.entity.get('text', ''),
-                        'text_content': hit.entity.get('text', ''),
-                        'metadata': hit.entity.get('metadata', {}),
-                        'distance': hit.distance,
-                        'similarity': 1 - hit.distance
-                    })
-
-            # 过滤低相似度结果
-            filtered_results = [
-                result for result in formatted_results
-                if result['similarity'] >= min_score
-            ]
-
-            return filtered_results
+            # 执行混合搜索
+            if ranker_type == "weighted":
+                results = vectorstore.similarity_search(
+                    query=query_text,
+                    k=top_k,
+                    ranker_type="weighted",
+                    ranker_params={"weights": [dense_weight, sparse_weight]}
+                )
+            else:  # rrf
+                results = vectorstore.similarity_search(
+                    query=query_text,
+                    k=top_k,
+                    ranker_type="rrf",
+                    ranker_params={"k": 60}
+                )
 
 
-        except Exception as e:
-            logger.error(f" 相似度搜索失败: {e}")
-            return []
+            # 格式化结果,保持与其他搜索方法一致
+            formatted_results = []
+            for doc in results:
+                formatted_results.append({
+                    'id': doc.metadata.get('pk', 0),
+                    'text_content': doc.page_content,
+                    'metadata': doc.metadata,
+                    'distance': 0.0,
+                    'similarity': 1.0
+                })
 
 
-    def _create_filter(self, filters: Dict[str, Any]) -> str:
-        """创建过滤条件"""
-        if not filters:
-            return ""
-
-        conditions = []
-        for key, value in filters.items():
-            if isinstance(value, str):
-                conditions.append(f'metadata["{key}"] == "{value}"')
-            elif isinstance(value, (int, float)):
-                conditions.append(f'metadata["{key}"] == {value}')
-            else:
-                conditions.append(f'metadata["{key}"] == "{value}"')
+            return formatted_results
 
 
-        return " and ".join(conditions)
+        except Exception as e:
+            # 回退到传统的向量搜索
+            logger.error(f" 搜索失败: {e}")
 
 
 
 
 class LLMReviewClient:
 class LLMReviewClient:
@@ -332,7 +264,7 @@ class BasisReviewService:
                 search_tasks = []
                 search_tasks = []
                 for basis in basis_items:
                 for basis in basis_items:
                     task = asyncio.create_task(
                     task = asyncio.create_task(
-                        self._async_search_basis(basis, collection_name, min_score, top_k_each, filters)
+                        self._async_search_basis(basis,collection_name, top_k_each)
                     )
                     )
                     search_tasks.append(task)
                     search_tasks.append(task)
 
 
@@ -342,10 +274,13 @@ class BasisReviewService:
                 grouped_candidates = []
                 grouped_candidates = []
                 for i, result in enumerate(search_results):
                 for i, result in enumerate(search_results):
                     if isinstance(result, Exception):
                     if isinstance(result, Exception):
-                        logger.error(f" 搜索失败 '{basis_items[i]}': {result}")
+                        logger.error(f"搜索失败 '{basis_items[i]}': {result}")
                         grouped_candidates.append([])
                         grouped_candidates.append([])
                     else:
                     else:
-                        grouped_candidates.append(result)
+                        # result 是 List[dict],需要遍历
+                        texts = [item["text_content"] for item in result if "text_content" in item]
+                        grouped_candidates.append(texts)
+                print("搜索结果:\n"+str(grouped_candidates))
 
 
                 # 构建提示词模板和用户内容
                 # 构建提示词模板和用户内容
                 prompt_template = self.message_builder.get_prompt_template()
                 prompt_template = self.message_builder.get_prompt_template()
@@ -374,27 +309,26 @@ class BasisReviewService:
                 }]
                 }]
 
 
     
     
+    
     async def _async_search_basis(
     async def _async_search_basis(
         self,
         self,
         basis: str,
         basis: str,
         collection_name: str,
         collection_name: str,
-        min_score: float,
-        top_k_each: int,
-        filters: Optional[Dict[str, Any]]
-    ) -> List[Dict[str, Any]]:
-        """异步搜索单个编制依据"""
+        top_k_each: int
+    ) -> List[dict]:
+        """异步搜索单个编制依据(Hybrid Search)"""
         try:
         try:
-            # 在线程池中执行同步搜索操作
-            loop = asyncio.get_event_loop()
-            retrieved = await loop.run_in_executor(
-                None,
-                self.search_engine.similarity_search,
-                collection_name,
-                basis,
-                min_score,
-                top_k_each,
-                filters
+            loop = asyncio.get_running_loop()
+            func = partial(
+                self.search_engine.hybrid_search,
+                collection_name=collection_name,
+                query_text=basis,
+                top_k=top_k_each,
+                ranker_type="weighted",
+                dense_weight=0.3,
+                sparse_weight=0.7
             )
             )
+            retrieved = await loop.run_in_executor(None, func)
             logger.info(f" 搜索 '{basis}' -> 找到 {len(retrieved or [])} 个结果")
             logger.info(f" 搜索 '{basis}' -> 找到 {len(retrieved or [])} 个结果")
             return retrieved or []
             return retrieved or []
         except Exception as e:
         except Exception as e:
@@ -402,7 +336,7 @@ class BasisReviewService:
             return []
             return []
 
 
     
     
-    async def review_all(self, text: str, collection_name: str = "already_basis",
+    async def review_all(self, text: str, collection_name: str = "first_bfp_collection_status",
                         progress_manager=None, callback_task_id: str = None) -> List[List[Dict[str, Any]]]:
                         progress_manager=None, callback_task_id: str = None) -> List[List[Dict[str, Any]]]:
         """异步批量审查所有编制依据"""
         """异步批量审查所有编制依据"""
         items = self.text_processor.extract_basis(text)
         items = self.text_processor.extract_basis(text)
@@ -568,5 +502,5 @@ async def review_all_basis_async(text: str, max_concurrent: int = 4) -> List[Lis
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
     # 简单测试
     # 简单测试
-    test_text = "根据《建筑设计防火规范》(GB50016-2014)和《高层民用建筑设计防火规范》(GB50045-95)进行设计。"
+    test_text = "《起重机械安全规程》(GB6067-2010)"
     result = asyncio.run(review_all_basis_async(test_text))
     result = asyncio.run(review_all_basis_async(test_text))

+ 35 - 3
core/construction_review/workflows/ai_review_workflow.py

@@ -302,9 +302,9 @@ class AIReviewWorkflow:
                 outline_review_result = await self.ai_review_engine.outline_check(state["callback_task_id"], state["structured_content"],
                 outline_review_result = await self.ai_review_engine.outline_check(state["callback_task_id"], state["structured_content"],
                                                     state, state.get("stage_name", "大纲审查"))
                                                     state, state.get("stage_name", "大纲审查"))
                 
                 
-            # 4. 执行编制依据审查
-            #await self.core_fun._send_start_review_progress(state, total_units,'prpe_basis')
-            prep_basis_check = "prep_basis_check" in self.task_info.get_review_config_list() or "timeliness_check" in self.task_info.get_review_config_list() or "reference_check" in self.task_info.get_review_config_list()
+            # # 4. 执行编制依据审查
+            # #await self.core_fun._send_start_review_progress(state, total_units,'prpe_basis')
+            prep_basis_check = "reference_check" in self.task_info.get_review_config_list()
             prep_basis_review_result = None
             prep_basis_review_result = None
             logger.info(f"执行编制依据审查")
             logger.info(f"执行编制依据审查")
             if not prep_basis_check:
             if not prep_basis_check:
@@ -332,6 +332,36 @@ class AIReviewWorkflow:
                     # SSE推送已在prep_basis_reviewer.py中的review_all方法中处理
                     # SSE推送已在prep_basis_reviewer.py中的review_all方法中处理
                 else:
                 else:
                     logger.warning(f"未找到编制依据内容,跳过编制依据审查")
                     logger.warning(f"未找到编制依据内容,跳过编制依据审查")
+            # 4. 执行编制依据审查
+            #await self.core_fun._send_start_review_progress(state, total_units,'prpe_basis')
+            timeliness_check = "timeliness_check" in self.task_info.get_review_config_list()
+            timeliness_check_result = None
+            logger.info(f"执行编制依据审查")
+            if not timeliness_check:
+                logger.info(f"跳过执行编制依据审查")
+            else:
+                # 从结构化内容中提取编制依据文本
+                prep_basis_content = self._extract_prep_basis_content(state["structured_content"])
+                if prep_basis_content:
+                    logger.info(f"开始执行编制依据审查")
+
+                    # 准备编制依据审查数据
+                    timeliness_check_data = {
+                        'content': prep_basis_content,
+                        'max_concurrent': 4
+                    }
+
+                    # 执行编制依据审查
+                    timeliness_check_result = await self.ai_review_engine.timeliness_basis_reviewer(
+                        review_data=timeliness_check_data,
+                        trace_id=state["callback_task_id"],
+                        state=state,
+                        stage_name="编制依据审查"
+                    )
+
+                    # SSE推送已在prep_basis_reviewer.py中的review_all方法中处理
+                else:
+                    logger.warning(f"未找到编制依据内容,跳过编制依据审查")
 
 
 
 
             # 6. 汇总结果
             # 6. 汇总结果
@@ -343,6 +373,8 @@ class AIReviewWorkflow:
                 all_issues.append(outline_review_result)
                 all_issues.append(outline_review_result)
             if prep_basis_check and prep_basis_review_result:
             if prep_basis_check and prep_basis_review_result:
                 all_issues.append(prep_basis_review_result)
                 all_issues.append(prep_basis_review_result)
+            if timeliness_check and timeliness_check_result:
+                all_issues.append(timeliness_check_result)
             for unit_issues in successful_results:
             for unit_issues in successful_results:
                 if unit_issues and isinstance(unit_issues, list):
                 if unit_issues and isinstance(unit_issues, list):
                     all_issues.extend(unit_issues)
                     all_issues.extend(unit_issues)