Просмотр исходного кода

v0.0.5-功能优化-_ai_review_node_check_item 基础逻辑调试
- _ai_review_node_check_item 基础逻辑调试

WangXuMing 2 дней назад
Родитель
Сommit
dd8aef03b7

+ 35 - 11
core/construction_review/workflows/ai_review_workflow.py

@@ -300,6 +300,7 @@ class AIReviewWorkflow:
                 'parameter_compliance_check': 'check_parameter_compliance'
             }
 
+            
             review_item_config_raw = self.task_info.get_review_item_config_list()
             review_item_config = self.core_fun._replace_review_suffix(review_item_config_raw, review_func_mapping)
 
@@ -307,19 +308,37 @@ class AIReviewWorkflow:
             for item in review_item_config:
                 key, value = item.split("_", 1)
                 review_item_dict.setdefault(key, []).append(value)
-
-            logger.info(f"审查项配置解析完成: {review_item_dict}")
+            
+            # 依据方案标准章节顺序进行排序
+            sgfa_chapter_index_order = ["catalogue", "basis", "overview", "plan","technology", "safety", "quality", "environment", 
+    "management", "acceptance", "other"]
+            
+            all_keys = review_item_dict.keys()
+            sorted_keys = sorted(
+                all_keys,
+                key=lambda x :sgfa_chapter_index_order.index(x)
+            )
+            review_item_dict_sorted = {}
+            for key in sorted_keys:
+                review_item_dict_sorted[key] = review_item_dict[key]
+            logger.info(f"审查项配置解析完成: {review_item_dict_sorted}")
 
             # 3️ 获取结构化内容
             structured_content = state.get("structured_content", {})
-            chunks = structured_content.get("chunks", [])
-            total_chapters = len(review_item_dict)
-            total_chunks = len(chunks)
 
-            logger.info(f"准备执行动态审查任务,总章节数: {total_chapters}, 总块数: {total_chunks}")
+            # 预处理:根据 review_item_dict_sorted 中的 key 对 structured_content 进行筛选
+            original_chunks = structured_content.get("chunks", [])
+            filtered_chunks = [
+                chunk for chunk in original_chunks
+                if chunk.get("chapter_classification") in review_item_dict_sorted.keys()
+            ]
+
+            # 更新 chunks 和 structured_content
+            chunks = filtered_chunks
+            structured_content["chunks"] = chunks
 
-            # 发送开始审查进度
-            await self.core_fun._send_start_review_progress(state, total_chunks, 'check_item_review')
+            total_chapters = len(review_item_dict_sorted)
+            total_chunks = len(chunks)
 
             # 初始化issues列表
             all_issues = []
@@ -333,7 +352,9 @@ class AIReviewWorkflow:
             )
 
             if has_outline_check and outline_data:
-                logger.info(" 开始执行大纲审查")
+                total_chunks = total_chunks + 1  # 包含大纲审查块
+                await self.core_fun._send_start_review_progress(state, total_chunks, 'check_item_review')
+                logger.info(f"准备执行动态审查任务(含大纲审查),总章节数: {total_chapters}, 总块数: {total_chunks}")
                 try:
                     outline_result = await self.ai_review_engine.outline_check(
                         state["callback_task_id"],
@@ -346,6 +367,9 @@ class AIReviewWorkflow:
                         logger.info(f"大纲审查完成")
                 except Exception as e:
                     logger.error(f"大纲审查失败: {str(e)}", exc_info=True)
+            else:
+                await self.core_fun._send_start_review_progress(state, total_chunks, 'check_item_review')
+                logger.info(f"准备执行动态审查任务,总章节数: {total_chapters}, 总块数: {total_chunks}")  
 
             # 5️ 按章节分组
             chapter_chunks_map = self.core_fun._group_chunks_by_chapter(chunks)
@@ -369,14 +393,14 @@ class AIReviewWorkflow:
                 # 判断章节类型并分支处理
                 if chapter_code == "basis":
                     # === 编制依据章节:拼接所有chunk后一次性审查 ===
-                    await self.core_fun._process_basis_chapter(
+                    all_issues = await self.core_fun._process_basis_chapter(
                         chapter_code, chapter_content, func_names, state, all_issues, completed_chunks, total_chunks
                     )
                     # 更新已完成块数
                     completed_chunks += len(chapter_content)
                 else:
                     # === 普通章节:逐块审查 ===
-                    chunks_completed = await self.core_fun._process_normal_chapter(
+                    chunks_completed, all_issues = await self.core_fun._process_normal_chapter(
                         chapter_code, chapter_content, func_names, state, all_issues
                     )
                     # 更新已完成块数

+ 10 - 7
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -35,7 +35,7 @@ AI审查核心功能类 - 负责具体的审查逻辑和数据处理
 
 import asyncio
 import random
-from typing import Dict, Union, List, Any, Optional
+from typing import Dict, Union, List, Any, Optional, Tuple
 from dataclasses import dataclass
 from langchain_core.messages import AIMessage
 
@@ -103,7 +103,7 @@ class AIReviewCoreFun:
         all_issues: List[Dict],
         completed_chunks: int,
         total_chunks: int
-    ) -> None:
+    ) -> List[Dict]:
         """
         处理编制依据章节(basis)
 
@@ -120,6 +120,9 @@ class AIReviewCoreFun:
             all_issues: 累积的issues列表
             completed_chunks: 已完成的块数
             total_chunks: 总块数
+
+        Returns:
+            List[Dict]: 更新后的issues列表(包含本次章节的结果)
         """
         logger.info(f"🔍 处理编制依据章节: {chapter_code}, 共 {len(chapter_content)} 个chunk")
 
@@ -207,7 +210,7 @@ class AIReviewCoreFun:
                 message=f"编制依据章节审查完成,共 {len(chapter_content)} 个块",
                 event_type="processing"
             )
-
+        return all_issues
     async def _process_normal_chapter(
         self,
         chapter_code: str,
@@ -215,7 +218,7 @@ class AIReviewCoreFun:
         func_names: List[str],
         state: AIReviewState,
         all_issues: List[Dict]
-    ) -> int:
+    ) -> Tuple[int, List[Dict]]:
         """
         处理普通章节(非basis)
 
@@ -231,7 +234,7 @@ class AIReviewCoreFun:
             all_issues: 累积的issues列表
 
         Returns:
-            int: 处理的块数量
+            Tuple[int, List[Dict]]: (处理的块数量, 更新后的issues列表)
         """
         logger.info(f"📝 处理普通章节: {chapter_code}, 共 {len(chapter_content)} 个chunk")
         total_chunks = len(chapter_content)
@@ -244,7 +247,7 @@ class AIReviewCoreFun:
             # 终止信号检查(块级别)
             if await self.workflow_manager.check_terminate_signal(state["callback_task_id"]):
                 logger.warning("块审查检测到终止信号")
-                return chunk_index  # 返回已处理的块数
+                return chunk_index, all_issues  # 返回已处理的块数和issues
 
             # 并发执行当前块的所有审查方法
             chunk_results = await self._execute_chunk_methods(
@@ -275,7 +278,7 @@ class AIReviewCoreFun:
             if issues:
                 all_issues.extend(issues)
 
-        return total_chunks
+        return total_chunks,all_issues
 
     def _extract_issues_from_result(self, result: Any) -> List[Dict]:
         """

+ 103 - 0
utils_test/Check_Item/_ai_review_node_check_item_test.py

@@ -0,0 +1,103 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import asyncio
+import sys
+from pathlib import Path
+
+current_dir = Path(__file__).parent.absolute()
+project_root = current_dir.parent.parent
+sys.path.insert(0, str(project_root))
+
+
+# 导入必要的类
+from core.base.task_models import TaskFileInfo
+from core.construction_review.workflows.ai_review_workflow import AIReviewWorkflow
+
+
+def create_test_file_info(review_item_config):
+    """创建测试用 TaskFileInfo"""
+    file_info = {
+        'file_id': 'test_file_001',
+        'user_id': 'test_user_001',
+        'callback_task_id': 'test_task_001',
+        'file_name': 'test_file.pdf',
+        'file_type': 'pdf',
+        'review_item_config': review_item_config,
+        'review_config': [],
+        'project_plan_type': '施工组织设计'
+    }
+    return TaskFileInfo(file_info)
+
+
+def create_test_state():
+    """创建测试用 AIReviewState"""
+    return {
+        'file_id': 'test_file_001',
+        'callback_task_id': 'test_task_001',
+        'user_id': 'test_user_001',
+        'file_name': 'test_file.pdf',
+        'structured_content': {'chunks': []},
+        'review_results': None,
+        'current_stage': 'ai_review_check_item',
+        'status': 'processing',
+        'error_message': None,
+        'progress_manager': None,
+        'messages': []
+    }
+
+
+
+async def test_case_03_multi_items():
+    """测试用例03: 多个审查项"""
+    print("\n=== 测试用例03: 多个审查项 ===")
+
+    review_item_config = [
+        "basis_timeliness_check",
+        "basis_sensitive_word_check",
+        "catalogue_sensitive_word_check",
+        "catalogue_semantic_logic_check",
+        "basis_timeliness_check",
+        "overview_completeness_check"
+    ]
+    task_info = create_test_file_info(review_item_config)
+    structured_content = {'chunks': []}
+
+    print(f"输入: review_item_config={review_item_config}")
+
+    try:
+        workflow = AIReviewWorkflow(task_info, structured_content)
+        state = create_test_state()
+
+        result = await workflow._ai_review_node_check_item(state)
+
+        print(f"输出: {result}")
+        print(f"期望: 返回处理后的状态")
+        print(f"PASS 方法调用成功")
+
+    except Exception as e:
+        print(f"ERROR: {str(e)}")
+        print(f"FAIL 方法调用失败")
+
+
+async def run_all_tests():
+    """运行所有测试用例"""
+    print("=" * 70)
+    print("开始 _ai_review_node_check_item 方法测试")
+    print("直接调用 AIReviewWorkflow._ai_review_node_check_item 方法")
+    print("=" * 70)
+    await test_case_03_multi_items()
+
+    print("\n" + "=" * 70)
+    print("测试完成")
+    print("=" * 70)
+
+
+def main():
+    """主函数"""
+    asyncio.run(run_all_tests())
+    return 0
+
+
+if __name__ == "__main__":
+    sys.exit(main())

+ 69 - 0
utils_test/Milvus_Test/connections_test.py

@@ -0,0 +1,69 @@
+# connections_test_final_last.py(终极适配版,无任何导入错误)
+from pymilvus import connections, CollectionSchema, FieldSchema, DataType, Collection
+from pymilvus.orm import use_database, list_collections
+
+MILVUS_HOST = "192.168.92.61"
+MILVUS_PORT = 19530
+TARGET_DB = "lq_db"  # 旧数据可能存在的数据库
+VECTOR_DIM = 128  # 替换为你的向量维度(比如 768)
+
+try:
+    # 1. 建立连接
+    connections.connect(
+        alias="milvus_last_conn",
+        host=MILVUS_HOST,
+        port=MILVUS_PORT,
+        timeout=30
+    )
+    print("✅ Milvus 连接成功!")
+
+    # 2. 尝试切换到 lq_db 数据库
+    print(f"🔄 尝试切换到数据库:{TARGET_DB}")
+    use_database(TARGET_DB, using="milvus_last_conn")
+    print(f"✅ 成功切换到数据库:{TARGET_DB}")
+
+    # 3. 查询 lq_db 中所有 Collection(自动找旧数据)
+    print(f"\n📋 {TARGET_DB} 数据库中所有 Collection:")
+    all_collections = list_collections(using="milvus_last_conn")
+    
+    if not all_collections:
+        print(f"❌ {TARGET_DB} 中无任何 Collection(旧数据元数据已丢失)")
+    else:
+        print(f"✅ 找到 {len(all_collections)} 个 Collection:{all_collections}")
+        
+        # 4. 遍历每个 Collection 查数据量
+        for coll_name in all_collections:
+            print(f"\n=== 🔍 Collection:{coll_name} ===")
+            # 定义通用结构(适配大多数场景)
+            fields = [
+                FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
+                FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=VECTOR_DIM)
+            ]
+            schema = CollectionSchema(fields=fields)
+            coll = Collection(
+                name=coll_name,
+                schema=schema,
+                using="milvus_last_conn"
+            )
+            
+            row_count = coll.num_entities
+            if row_count > 0:
+                print(f"🎉 旧数据找到!数据量:{row_count} 条")
+                print(f"👉 Attu 切换到 {TARGET_DB} → 选择 {coll_name} 即可查看!")
+            else:
+                print(f"⚠️ 该 Collection 无数据")
+
+except Exception as e:
+    error_msg = str(e).lower()
+    if "database not found" in error_msg:
+        print(f"❌ 未找到 {TARGET_DB} 数据库(旧数据库已丢失)")
+    else:
+        print(f"❌ 错误:{e}")
+finally:
+    # 断开连接
+    if connections.has_connection("milvus_last_conn"):
+        connections.disconnect("milvus_last_conn")
+        print("\n🔌 已断开连接")
+
+if __name__ == "__main__":
+    pass

+ 76 - 0
utils_test/Other_Test/yinshepaix.py

@@ -0,0 +1,76 @@
+# 2️ 解析审查项配置
+from typing import List, Dict, Union
+def _replace_review_suffix(items: List[str], mapping: Dict[str, Union[str, List[str]]]) -> List[str]:
+
+    """
+    将列表中字符串的下划线后内容替换为映射表中的对应值
+    处理多值映射(如completeness_check对应两个值时生成两条结果)
+    """
+    result = []
+    for item in items:
+        # 拆分下划线前后部分(只拆分第一个下划线,因为后缀可能包含下划线)
+        if '_' in item:
+            prefix, suffix = item.split('_', 1)  # split('_', 1) 确保只拆分一次
+        else:
+            # 无下划线时直接保留原字符串(根据实际需求可调整)
+            result.append(item)
+            continue
+        
+        # 获取映射值,无匹配时保留原后缀
+        mapped_value = mapping.get(suffix, suffix)
+        
+        # 处理单值映射
+        if isinstance(mapped_value, str):
+            result.append(f"{prefix}_{mapped_value}")
+        # 处理多值映射(列表类型)
+        elif isinstance(mapped_value, list):
+            for val in mapped_value:
+                result.append(f"{prefix}_{val}")
+    
+    return result
+
+review_func_mapping: Dict[str, Union[str, List[str]]] = {
+    'sensitive_word_check': 'sensitive_word_check',
+    'semantic_logic_check': 'check_semantic_logic',
+    'completeness_check': ['check_completeness', 'outline_check'],
+    'timeliness_check': 'timeliness_basis_reviewer',
+    'reference_check': 'reference_basis_reviewer',
+    'sensitive_check': 'check_sensitive',
+    'non_parameter_compliance_check': 'check_non_parameter_compliance',
+    'parameter_compliance_check': 'check_parameter_compliance'
+}
+
+
+review_item_config = [
+        "safety_sensitive_check",
+    "overview_completeness_check",
+    "basis_timeliness_check",
+    "basis_sensitive_word_check",
+    "catalogue_sensitive_word_check",
+    "catalogue_semantic_logic_check",
+    "basis_timeliness_check",
+
+]
+review_item_config = _replace_review_suffix(review_item_config, review_func_mapping)
+
+review_item_dict = {}
+for item in review_item_config:
+    key, value = item.split("_", 1)
+    review_item_dict.setdefault(key, []).append(value)
+
+# 依据方案标准章节顺序进行排序
+sgfa_chapter_index_order = ["catalogue", "basis", "overview", "plan","technology", "safety", "quality", "environment", 
+"management", "acceptance", "other"]
+
+all_keys = review_item_dict.keys()
+sorted_keys = sorted(
+    all_keys,
+    key=lambda x :sgfa_chapter_index_order.index(x)
+)
+review_item_dict_sorted = {}
+for key in sorted_keys:
+    review_item_dict_sorted[key] = review_item_dict[key]
+print(f"审查项配置解析完成: {review_item_dict_sorted}")
+
+# 3️ 获取结构化内容
+