3 Revize 60f4d2d632 ... efa8f3a6a9

Autor SHA1 Zpráva Datum
  WangXuMing efa8f3a6a9 v0.0.5-功能优化-完整性审查 před 1 měsícem
  ChenJiSheng 708f5332b5 dev:大纲审查模块的j接入整体框架,完成字段对齐; před 1 měsícem
  ChenJiSheng 639abf5476 dev:添加了大纲审查模块的优化; před 1 měsícem

+ 138 - 56
core/construction_review/component/ai_review_engine.py

@@ -92,13 +92,7 @@ from .reviewers.check_completeness.utils.file_utils import write_json
 from core.construction_review.component.reviewers.base_reviewer import ReviewResult
 from .reviewers.outline_check import outline_review_results_df
 from .reviewers.check_completeness.utils.redis_csv_utils import (
-    df_store_to_redis,
     get_redis_connection,
-    main,
-    display_redis_data,
-    store_header_to_redis,
-    read_from_redis_and_save_csv,
-    store_row_to_redis,
 )
 @dataclass
 class ReviewResult:
@@ -768,20 +762,13 @@ class AIReviewEngine(BaseReviewer):
             review_results_df = pd.DataFrame(review_results)
             chapter_labels = review_results_df['section_label'].str.split('->').str[0]
             review_results_df['title'] = chapter_labels
-            # review_results_df.to_csv(Path('temp') / 'document_temp' / '2_spec_review_results.csv', encoding='utf-8-sig', index=False)
-            # csv_file = rf'temp\document_temp\2_spec_review_results.csv'
-            # path2 = rf'temp\document_temp\outlines_review_results.csv'
-            # data_df = pd.read_csv(csv_file, encoding='utf-8-sig')
-            # data_df = review_results_df
-            outline_review_results = outline_review_results_df(data=review_results_df)
-            
-            logger.info(f"[完整性检查] 准备将大纲审查结果存储到Redis,bind_id: {trace_id_idx}")
-            logger.info(f"[完整性检查] 大纲审查结果行数: {len(outline_review_results) if outline_review_results is not None else 'None'}")
 
-            df_store_to_redis(self.redis_client, data=outline_review_results, bind_id=trace_id_idx)
-            
+            # 将审查结果存储到Redis,供 outline_check 使用
+            logger.info(f"[完整性检查] 准备将大纲审查结果存储到Redis,bind_id: {trace_id_idx}")
+            from .reviewers.check_completeness.utils.redis_csv_utils import df_store_to_redis
+            df_store_to_redis(self.redis_client, data=review_results_df, bind_id=trace_id_idx)
             logger.info(f"[完整性检查] 数据已成功存储到Redis,bind_id: {trace_id_idx}")
-            
+
             df_filtered = review_results_df.drop_duplicates(subset='title', keep='first').reset_index(drop=True)
             unique_chapter_labels = chapter_labels.unique().tolist()
             chapter_classifications = df_filtered['chapter_classification']
@@ -799,7 +786,7 @@ class AIReviewEngine(BaseReviewer):
             analyzer = ResultAnalyzer(str(csv_path))
             processed_results = analyzer.process_results(review_results)
             spec_summary_csv_path = Path('temp') / 'document_temp' / '3_spec_review_summary.csv'
-            summary_rows = analyzer.build_spec_summary(processed_results)
+            summary_rows = analyzer.build_spec_summary(processed_results, spec_summary_csv_path)
             # logger.info(f"  规范覆盖汇总结果已保存至: {spec_summary_csv_path}")
             summary_rows = pd.DataFrame(summary_rows)
             summary_rows = summary_rows[summary_rows['标签'].isin(review_results_flag)]
@@ -808,10 +795,11 @@ class AIReviewEngine(BaseReviewer):
             # 生成缺失要点 JSON 列表,便于前端消费
 
             issues = analyzer.build_missing_issue_list(summary_rows)
-
+            # issues["response"] += outline_review_result
+            # issues["response"].extend(outline_review_result)
             # 包装成外层格式化期望的结构
             execution_time = time.time() - start_time
-            return {
+            check_result = {
                 "details": {
                     "name": "completeness_check",
                     "response": issues.get("response", []),
@@ -822,6 +810,8 @@ class AIReviewEngine(BaseReviewer):
                 "success": True,
                 "execution_time": execution_time
             } 
+            
+            return check_result,trace_id_idx
         except Exception as e:
             execution_time = time.time() - start_time
             error_msg = f"{name} 审查失败: {str(e)}"
@@ -982,8 +972,8 @@ class AIReviewEngine(BaseReviewer):
         return await self.review("parameter_compliance_check", trace_id, reviewer_type, prompt_name, combined_content, review_references,
                                reference_source, state, stage_name, timeout=45, model_name="qwen3_30b")
 
-    async def outline_check(self, trace_id_idx: str, outline_content: Dict[str, Any],
-                                   state:dict,stage_name:str) -> Dict[str, Any]:
+    async def outline_check(self,  outline_content: pd.DataFrame,trace_id_idx: str,
+                                   state:dict =None,stage_name:str =None) -> Dict[str, Any]:
         """
         大纲审查
 
@@ -993,6 +983,7 @@ class AIReviewEngine(BaseReviewer):
             state: 状态
             stage_name: 阶段名称
         """
+        start_time = time.time()
         logger.info(f"开始大纲审查,trace_id: {trace_id_idx}")
 
         # CSV文件路径
@@ -1000,12 +991,20 @@ class AIReviewEngine(BaseReviewer):
         
         # 存储所有缺失项
         missing_items = []
-        
+        metadata = {}
         try:
             # 从Redis读取并保存为新的CSV文件
-            rows_df = read_from_redis_and_save_csv(self.redis_client, bind_id=trace_id_idx)
-            df = rows_df
-            
+            # 如果传入的 outline_content 为 None,尝试从 Redis 获取数据
+            if outline_content is None:
+                logger.info(f"[大纲审查] outline_content 为 None,尝试从 Redis 获取数据 (bind_id: {trace_id_idx})")
+                from .reviewers.check_completeness.utils.redis_csv_utils import read_from_redis_and_save_csv
+                df = read_from_redis_and_save_csv(self.redis_client, bind_id=trace_id_idx)
+            else:
+                df = outline_content
+
+            # df = merge_results_by_classification(rows_df)
+            if df is not None:
+                df.to_csv(csv_path, encoding='utf-8-sig', index=False)
             # 检查 df 是否为 None
             if df is None:
                 logger.error(f"[大纲审查] Redis中不存在ID '{trace_id_idx}' 的数据,无法进行大纲审查")
@@ -1017,16 +1016,32 @@ class AIReviewEngine(BaseReviewer):
                 }
             
             logger.info(f"[大纲审查] 成功从Redis读取数据,共 {len(df)} 行")
-            
+
+            # 检查 df 是否为空
+            if df.empty or len(df) == 0:
+                logger.warning(f"[大纲审查] DataFrame为空,无法进行大纲审查")
+                return {
+                    'outline_review_result': {
+                        "response": [],
+                    },
+                    'success': False,
+                    'execution_time': time.time() - start_time
+                }
+
             # 兼容新旧字段名
-            chapter_label_col = 'chapter_label' if 'chapter_label' in df.columns else 'section_label_first'
-            review_results_col = 'review_results_summary' if 'review_results_summary' in df.columns else 'merged_review_results'
-            
+            chapter_label_col = 'chapter_label' if 'chapter_label' in df.columns else ('section_label_first' if 'section_label_first' in df.columns else 'section_label')
+            review_results_col = 'review_results_summary' if 'review_results_summary' in df.columns else ('merged_review_results' if 'merged_review_results' in df.columns else 'review_result')
+
+            logger.info(f"[大纲审查] 使用 chapter_label_col={chapter_label_col}, review_results_col={review_results_col}")
+            logger.info(f"[大纲审查] DataFrame 列: {list(df.columns)}")
+
             # 遍历每一行
             for index, row in df.iterrows():
                 chapter_label = row.get(chapter_label_col, '')
                 merged_results_str = row.get(review_results_col, '')
-                
+
+                logger.info(f"第 {index} 行: chapter_label={chapter_label}, {review_results_col}={merged_results_str}")
+
                 # 解析review_results_summary字典字符串
                 try:
                     if pd.isna(merged_results_str) or merged_results_str == '':
@@ -1044,47 +1059,114 @@ class AIReviewEngine(BaseReviewer):
                     except (json.JSONDecodeError, TypeError):
                         logger.warning(f"第 {index} 行无法解析review_results_summary: {merged_results_str}")
                         merged_results = {}
-                
+
                 # 检查字典中的每个字段
                 if isinstance(merged_results, dict):
+                    # 检查是否有错误信息
+                    if 'error' in merged_results:
+                        logger.warning(f"第 {index} 行审查结果包含错误: {merged_results.get('error')}")
+                        continue  # 跳过错误行,不进行大纲审查
+
+                    logger.info(f"第 {index} 行 merged_results 键: {list(merged_results.keys())}")
+
+                    # 获取chapter_label列表
+                    chapter_labels_list = row.get(chapter_label_col, [])
+                    if not isinstance(chapter_labels_list, list):
+                        chapter_labels_list = [str(chapter_labels_list)]
+
+                    # 获取review_results_summary字典的所有键,用于reason字段
+                    merged_results_keys = list(merged_results.keys())
+                    # 排除error等非审查字段
+                    merged_results_keys = [k for k in merged_results_keys if k not in ['error', 'chunk_id', 'page', 'section_label', 'chapter_classification', 'chapter_code', 'title']]
+
+                    logger.info(f"第 {index} 行过滤后的有效字段: {merged_results_keys}")
+
+                    if not merged_results_keys:
+                        logger.info(f"第 {index} 行没有有效的审查字段,跳过大纲审查")
+                        continue
+
+                    merged_results_keys_str = "、".join(merged_results_keys)
+
+                    # 只处理有效的审查字段
                     for field_name, field_value in merged_results.items():
+                        # 跳过非审查字段
+                        if field_name in ['error', 'chunk_id', 'page']:
+                            continue
                         # 检查列表是否为空
                         if isinstance(field_value, list) and len(field_value) == 0:
-                            # 生成缺失项
-                            missing_item = {
-                                "check_item_code": "catalogue_completeness_check",
-                                "check_result": {
+                            # 为chapter_label列表中的每个值创建单独的缺失项
+                            for chapter_label in chapter_labels_list:
+                                missing_item = {
+                                    # "check_item_code": "catalogue_completeness_check",
                                     "issue_point": f"{field_name}缺失",
-                                    "location": "",
-                                    "suggestion": "",
-                                    "reason": "",
-                                    "risk_level": ""
+                                    "location": chapter_label,
+                                    "suggestion": f"在待审查目录中未找到与'{field_name}'对应的章节;当前章节仅涉及'{chapter_label}',未涵盖'{field_name}'相关内容;整改建议:建议在本章或前序章节中增设'{field_name}'相关内容,确保与审查规范要求一致。",
+                                    "reason": f"本章应包含{merged_results_keys_str}等{len(merged_results_keys)}个方面。",
+                                    "risk_level": "高风险",
+                                    # "review_references": '',
+                                    "reference_source": '《桥梁公司危险性较大工程管理实施细则(2025版)》',
+
                                 }
-                            }
-                            missing_items.append(missing_item)
-                            logger.info(f"发现缺失项: 章节[{chapter_label}] 字段[{field_name}]")
-            
+                                missing_items.append(missing_item)
+
+            # if not metadata:
+            #     metadata = {
+            #         "review_location_label": df['chapter_label'].to_list()[-1],
+            #         "chapter_code": "catalogue",
+            #         "original_content": '',
+            #     }
             logger.info(f"大纲审查完成,共发现 {len(missing_items)} 个缺失项")
-            
+            execution_time = time.time() - start_time
         except FileNotFoundError:
             logger.error(f"CSV文件不存在: {csv_path}")
+            execution_time = time.time() - start_time
             return {
-                'outline_review_result': [],
-                'error': f'CSV文件不存在: {csv_path}'
+                "details": {
+                    "name": "completeness_check",
+                    "response": [],
+                    "review_location_label": "",
+                    "chapter_code": "catalogue",
+                    "original_content": ""
+                },
+                "success": False,
+                "execution_time": execution_time
             }
         except Exception as e:
             logger.error(f"大纲审查失败: {str(e)}", exc_info=True)
+            execution_time = time.time() - start_time
             return {
-                'outline_review_result': [],
-                'error': f'大纲审查失败: {str(e)}'
+                "details": {
+                    "name": "completeness_check",
+                    "response": [],
+                    "review_location_label": "",
+                    "chapter_code": "catalogue",
+                    "original_content": ""
+                },
+                "success": False,
+                "execution_time": execution_time
             }
-        
-        return {
-            'outline_review_result':
-                {
+        logger.info(f"大纲审查完成,耗时 {execution_time:.2f} 秒")
+
+        # 获取 review_location_label,使用兼容的字段名
+        review_location_col = 'chapter_label' if 'chapter_label' in df.columns else 'section_label_first'
+        if review_location_col not in df.columns or len(df) == 0:
+            review_location_label = "未知位置"
+        else:
+            review_location_label = df[review_location_col].to_list()[-1]
+
+        outcheck_result = {
+                "details": {
+                    "name": "completeness_check",
                     "response": missing_items,
-                }
-        }
+                    "review_location_label": review_location_label,
+                    "chapter_code": 'catalogue',
+                    "original_content": ""
+                },
+                "success": True,
+                "execution_time": execution_time
+            }
+        logger.info(f"大纲审查结果: {outcheck_result}")
+        return outcheck_result
     
     async def reference_basis_reviewer(self, review_data: Dict[str, Any], trace_id: str,
                                 state: dict = None, stage_name: str = None) -> Dict[str, Any]:

+ 6 - 2
core/construction_review/component/reviewers/check_completeness/components/result_analyzer.py

@@ -1,6 +1,7 @@
 """
 结果汇总与规范覆盖分析组件
 """
+import json
 from typing import Dict, List, Any, Set
 import ast
 import sys
@@ -242,7 +243,7 @@ class ResultAnalyzer(IResultAnalyzer):
         for row in summary_rows:
             level2 = (row.get("二级目录") or "").strip()
             requirement = (row.get("内容要求") or "").strip()
-            reference_source = '桥梁公司危险性较大工程管理实施细则(2025版)'
+            reference_source = '桥梁公司危险性较大工程管理实施细则(2025版)'
             reason= f"参照:{reference_source} 中的内容要求,{row.get('section_label', '')}内容属于,专项施工方案内容要求中的 【{suorces_eum[row.get("标签", "")]}】 板块,应包含{requirement}"
             review_references = (row.get("依据") or "").strip()
             
@@ -274,7 +275,8 @@ class ResultAnalyzer(IResultAnalyzer):
                 "reference_source": reference_source
             }
             all_issues.append(issue_item)
-
+            with open("temp/document_temp/missing_points.json", "w", encoding="utf-8") as f:
+                json.dump(all_issues, f, ensure_ascii=False, indent=4)
             # 收集元数据(从第一行获取)
             if not metadata:
                 metadata = {
@@ -282,6 +284,8 @@ class ResultAnalyzer(IResultAnalyzer):
                     "chapter_code": row.get("标签", ""),
                     "original_content": row.get("content", "")
                 }
+            with open("temp/document_temp/missing_points_metadata.json", "w", encoding="utf-8") as f:
+                json.dump(metadata, f, ensure_ascii=False, indent=4)
         logger.debug(f"build_missing_issue_list_all_issues:{len(all_issues)}")
         # 返回包含问题和元数据的字典,由外层统一格式化
         return {

+ 30 - 82
core/construction_review/component/reviewers/check_completeness/utils/redis_csv_utils.py

@@ -45,10 +45,10 @@ def get_redis_connection():
         )
         # 测试连接
         r.ping()
-        logger.info(f"[OK] Redis连接成功 (host={REDIS_HOST}, port={REDIS_PORT})")
+        logger.info(f"Redis连接成功: {REDIS_HOST}:{REDIS_PORT}")
         return r
     except Exception as e:
-        logger.info(f"[ERROR] Redis连接失败: {e}")
+        logger.error(f"Redis连接失败: {e}")
         raise
 
 
@@ -75,32 +75,20 @@ def df_store_to_redis(redis_client, data=None, bind_id=None):
     Returns:
         list: 存储的行数据列表
     """
-    logger.info(f"\n[Redis存储] 开始存储数据到Redis,bind_id: {bind_id}")
-    
-    # 调试信息:记录Redis连接信息
-    try:
-        logger.info(f"[DEBUG] Redis连接信息: host={redis_client.connection_pool.connection_kwargs.get('host')}, "
-                   f"port={redis_client.connection_pool.connection_kwargs.get('port')}, "
-                   f"db={redis_client.connection_pool.connection_kwargs.get('db')}")
-    except Exception as e:
-        logger.warning(f"[DEBUG] 无法获取Redis连接信息: {e}")
+    logger.info(f"开始存储数据到Redis,bind_id: {bind_id}")
     
     if data is None:
         # 使用pandas读取CSV文件
-        logger.info(f"[Redis存储] 从CSV文件读取数据: {INPUT_CSV}")
+        logger.info(f"从CSV文件读取数据: {INPUT_CSV}")
         df = pd.read_csv(INPUT_CSV, encoding='utf-8-sig')
         # header = df.columns.tolist()
         rows = df.to_dict('records')
-        
-        logger.info(f"[OK] 读取到 {len(rows)} 行数据")
-        # logger.info(f"[OK] CSV表头: {header}")
     else:
-        logger.info(f"[Redis存储] 使用传入的DataFrame数据,共 {len(data)} 行")
+        logger.info(f"使用传入的DataFrame数据,共 {len(data)} 行")
         rows = data.to_dict('records')
     
     # 清空Redis中该ID的数据
     redis_client.delete(bind_id)
-    logger.info(f"[OK] 清空Redis中ID '{bind_id}' 的旧数据")
     
     # 按行循环存入Redis(使用Hash结构)
     for idx, row in enumerate(rows, start=1):
@@ -112,107 +100,72 @@ def df_store_to_redis(redis_client, data=None, bind_id=None):
     # 存储行数
     redis_client.hset(bind_id, "row_count", len(rows))
     
-    logger.info(f"[OK] 成功将 {len(rows)} 行数据存入Redis")
-    logger.info(f"[OK] Redis Key: {bind_id}")
-    logger.info(f"[Redis存储] 数据存储完成")
+    logger.info(f"数据存储完成,共 {len(rows)} 行")
     
     return rows
 
 
-def read_from_redis_and_save_csv(redis_client, bind_id=None, csv_save_path=None):
-    """从Redis一次性读取所有数据并保存为CSV文件
-    
+async def read_from_redis_and_save_csv(redis_client, bind_id=None, csv_save_path=None):
+    """从Redis一次性读取所有数据并保存为CSV文件(异步版本)
+
     Args:
         redis_client: Redis客户端
         bind_id: 绑定ID
         csv_save_path: CSV文件保存路径(可选)
-    
+
     Returns:
         pandas.DataFrame: 包含数据的DataFrame,如果Redis中不存在数据则返回空DataFrame
     """
-    logger.info(f"\n从Redis读取数据 (ID: {bind_id})")
-    
-    # 调试信息:记录Redis连接信息
-    try:
-        logger.info(f"[DEBUG] Redis连接信息: host={redis_client.connection_pool.connection_kwargs.get('host')}, "
-                   f"port={redis_client.connection_pool.connection_kwargs.get('port')}, "
-                   f"db={redis_client.connection_pool.connection_kwargs.get('db')}")
-    except Exception as e:
-        logger.warning(f"[DEBUG] 无法获取Redis连接信息: {e}")
-    
-    # 调试信息:检查键是否存在
-    key_exists = redis_client.exists(bind_id)
-    logger.info(f"[DEBUG] Redis键 '{bind_id}' 存在状态: {key_exists}")
-    
-    # 调试信息:列出所有匹配的键
-    try:
-        all_keys = redis_client.keys(f"*{bind_id}*")
-        logger.info(f"[DEBUG] Redis中匹配的键: {all_keys}")
-    except Exception as e:
-        logger.warning(f"[DEBUG] 无法列出Redis键: {e}")
-    
+    logger.info(f"从Redis读取数据,bind_id: {bind_id}")
+
     # 检查数据是否存在
+    key_exists = await redis_client.exists(bind_id)
     if not key_exists:
-        logger.warning(f"[WARN] Redis中不存在ID '{bind_id}' 的数据,返回空DataFrame")
+        logger.warning(f"Redis中不存在ID '{bind_id}' 的数据,返回空DataFrame")
         return pd.DataFrame()  # 返回空DataFrame而不是None
-    
+
     # # 获取表头
-    # header_json = redis_client.hget(bind_id, "header")
+    # header_json = await redis_client.hget(bind_id, "header")
     # if not header_json:
     #     logger.info("[ERROR] 未找到表头数据")
     #     return
-    
+
     # header = json.loads(header_json)
     # logger.info(f"[OK] 读取到表头: {header}")
-    
+
     # 获取行数
-    row_count = int(redis_client.hget(bind_id, "row_count") or 0)
-    logger.info(f"[OK] 总行数: {row_count}")
-    
+    row_count = int(await redis_client.hget(bind_id, "row_count") or 0)
+
     # 读取所有行数据
     rows = []
     for idx in range(1, row_count + 1):
         row_key = f"row_{idx}"
-        row_json = redis_client.hget(bind_id, row_key)
+        row_json = await redis_client.hget(bind_id, row_key)
         if row_json:
             row = json.loads(row_json)
             rows.append(row)
-    
-    logger.info(f"[OK] 成功从Redis读取 {len(rows)} 行数据")
-    
+
     # 使用pandas保存为新的CSV文件
     df_output = pd.DataFrame(rows)
     if csv_save_path:
         df_output.to_csv(OUTPUT_CSV, index=False, encoding='utf-8-sig')
-    
-    logger.info(f"[OK] 数据已保存到: {OUTPUT_CSV}")
-    
+        logger.info(f"数据已保存到: {OUTPUT_CSV}")
+
     # 读取完成后删除Redis中的bind_id数据
-    redis_client.delete(bind_id)
-    logger.info(f"[OK] 已删除Redis中ID '{bind_id}' 的数据")
-    
+    await redis_client.delete(bind_id)
+
     return df_output
 
 
 def display_redis_data(redis_client, bind_id=None):
     """显示Redis中存储的数据摘要"""
-    logger.info(f"\nRedis数据摘要 (ID: {bind_id})")
-    logger.info("-" * 50)
-    
-    # 获取所有字段
+    logger.info(f"Redis数据摘要 (ID: {bind_id})")
     all_fields = redis_client.hkeys(bind_id)
     logger.info(f"字段数量: {len(all_fields)}")
-    logger.info(f"字段列表: {all_fields}")
-    
-    logger.info("-" * 50)
 
 
 def main():
     """主函数"""
-    logger.info("=" * 60)
-    logger.info("Redis CSV 处理器")
-    logger.info("=" * 60)
-    
     try:
         # 获取Redis连接
         redis_client = get_redis_connection()
@@ -220,23 +173,18 @@ def main():
         # 读取CSV并存入Redis
         df_store_to_redis(redis_client)
         
-        # 显示Redis数据摘要
-        display_redis_data(redis_client)
-        
         # 从Redis读取并保存为新的CSV文件
         rows = read_from_redis_and_save_csv(redis_client)
         # 将字典列表转换为JSON字符串列表
         # rows_str = [json.dumps(row, ensure_ascii=False) for row in rows]
         # logger.info(f"[OK] 保存到CSV文件成功 (\n{header}\n{'\n'.join(rows_str)})")
         logger.info(f"{pd.DataFrame(rows)}")
-        logger.info("\n" + "=" * 60)
-        logger.info("[OK] 处理完成!")
-        logger.info("=" * 60)
+        logger.info("处理完成")
         
     except Exception as e:
-        logger.info(f"\n[ERROR] 处理过程中发生错误: {e}")
+        logger.error(f"处理过程中发生错误: {e}")
         import traceback
-        traceback.logger.info_exc()
+        logger.error(traceback.format_exc())
 
 
 if __name__ == "__main__":

+ 83 - 28
core/construction_review/component/reviewers/outline_check.py

@@ -8,6 +8,8 @@ import pandas as pd
 import json
 import ast  # 用于安全解析字符串为Python对象
 
+from foundation.observability.logger.loggering import server_logger as logger
+
 
 def parse_review_result(review_result_str):
     """
@@ -27,28 +29,6 @@ def parse_review_result(review_result_str):
         except (ValueError, SyntaxError):
             return {}
 
-def merge_dict_fields_and_deduplicate(group):
-    """
-    合并字典字段的值列表并去重
-    """
-    merged = {}
-    for item in group:
-        if isinstance(item, dict):
-            for key, value in item.items():
-                if key not in merged:
-                    merged[key] = []
-                # 如果值是列表,则合并;否则添加为单个元素
-                if isinstance(value, list):
-                    merged[key].extend(value)
-                else:
-                    merged[key].append(value)
-    # 对每个字段的值去重
-    for key in merged:
-        # 使用dict.fromkeys去重同时保持顺序
-        merged[key] = list(dict.fromkeys(merged[key]))
-    return merged
-
-
 def outline_review_results_df(data, path=None):
     """
     处理大纲审查结果DataFrame,生成合并后的审查结果
@@ -63,6 +43,7 @@ def outline_review_results_df(data, path=None):
         - review_results_summary: 合并后的审查结果字典
         - chapter_classification: 章节分类(如"basis", "overview"等)
     """
+    logger.info(f"开始处理大纲审查结果,数据行数: {len(data)}")
     try:
         df = data
         # 提取章节标签的第一部分
@@ -87,7 +68,7 @@ def outline_review_results_df(data, path=None):
             
             # 使用字典推导式合并相同title的字典的字段的值列表并去重
             merged_dict = {
-                title: merge_dict_fields_and_deduplicate(group)
+                title: merge_dict_lists_and_deduplicate(group.tolist())
                 for title, group in grouped_data
             }
             
@@ -103,9 +84,6 @@ def outline_review_results_df(data, path=None):
         # 存储章节分类
         new_df['chapter_classification'] = chapter_classifications.values
         
-        print(f"章节分类: {chapter_classifications.values}")
-        print(f"处理后的DataFrame:\n{new_df}")
-        
         if path:
             # 判断文件是否存在
             if os.path.exists(path):
@@ -115,12 +93,89 @@ def outline_review_results_df(data, path=None):
                 # 文件不存在,首次写入,包含表头
                 new_df.to_csv(path, mode='w', index=False, encoding='utf-8-sig')
         
+        logger.info(f"大纲审查结果处理完成,输出 {len(new_df)} 条记录")
         return new_df
         
     except FileNotFoundError:
-        print(f"错误: 文件不存在!")
+        logger.error(f"文件不存在: {path}")
+    except Exception as e:
+        logger.error(f"处理大纲审查结果时发生错误: {e}")
+
+
+def merge_dict_lists_and_deduplicate(dict_list):
+    """
+    合并多个字典,其中每个字典的值都是列表,进行顺序去重合并
+    
+    Args:
+        dict_list: 字典列表
+        
+    Returns:
+        合并后的字典,每个键对应的值是去重后的列表
+    """
+    merged = {}
+    for d in dict_list:
+        if isinstance(d, dict):
+            for key, value in d.items():
+                if key not in merged:
+                    merged[key] = []
+                if isinstance(value, list):
+                    merged[key].extend(value)
+                else:
+                    merged[key].append(value)
+    # 对每个字段的值去重(保持顺序)
+    for key in merged:
+        merged[key] = list(dict.fromkeys(merged[key]))
+    return merged
+
+
+def merge_results_by_classification(df):
+    """
+    根据chapter_classification列的标签将review_results_summary列的字典合并,
+    chapter_label列的值合并为列表
+    
+    Args:
+        df: outline_review_results_df返回的DataFrame,包含以下列:
+            - chapter_label: 章节标签
+            - review_results_summary: 审查结果字典(字符串或字典)
+            - chapter_classification: 章节分类
+            
+    Returns:
+        合并后的DataFrame,chapter_classification列值唯一,其他列对应合并
+    """
+    logger.info(f"开始按chapter_classification合并结果,数据行数: {len(df)}")
+    try:
+        # 解析review_results_summary列(如果是字符串)
+        df['parsed_summary'] = df['review_results_summary'].apply(parse_review_result)
+        
+        # 按chapter_classification分组
+        grouped = df.groupby('chapter_classification')
+        
+        # 合并数据
+        result_data = []
+        for classification, group in grouped:
+            # 合并chapter_label为列表
+            chapter_labels = group['chapter_label'].tolist()
+            
+            # 合并review_results_summary字典
+            dict_list = group['parsed_summary'].tolist()
+            merged_dict = merge_dict_lists_and_deduplicate(dict_list)
+            
+            result_data.append({
+                'chapter_classification': classification,
+                'chapter_label': chapter_labels,
+                'review_results_summary': merged_dict
+            })
+        
+        # 创建新的DataFrame
+        result_df = pd.DataFrame(result_data)
+        
+        logger.info(f"合并完成,输出 {len(result_df)} 条记录")
+        return result_df
+        
     except Exception as e:
-        print(f"读取CSV文件时发生错误: {e}")
+        logger.error(f"按chapter_classification合并结果时发生错误: {e}")
+        raise
+
 
 if __name__ == '__main__':
     csv_file = rf'temp\document_temp\2_spec_review_results.csv'

+ 27 - 2
core/construction_review/component/reviewers/utils/inter_tool.py

@@ -333,7 +333,7 @@ class InterTool:
 
             # 检查 check_result 是否为 None 或不包含 details
             if is_dict and (not check_result or "details" not in check_result):
-                logger.warning(f"检查项 {check_key} 结果为空或缺少details字段,跳过")
+                logger.debug(f"检查项 {check_key} 结果为空或缺少details字段,跳过")
                 continue
 
             if is_review_result and not check_result.details:
@@ -360,7 +360,32 @@ class InterTool:
 
                 if check_name  not in TRCH_CHECK_ITEMS:
                     logger.debug(f"检查项 {check_name} 无参考来源,直接解析响应")
-                    check_issues = self.parse_ai_review_response(response, check_name,chapter_code)
+
+                    # 🔧 检查 response 是否已经是列表格式(如 outline_check 返回的格式)
+                    if isinstance(response, list):
+                        check_issues = []
+                        for issue_item in response:
+                            if isinstance(issue_item, dict):
+                                # 直接使用已格式化的项,确保必要字段存在
+                                formatted_issue = {
+                                    "check_item": check_name,
+                                    "chapter_code": details.get("chapter_code", chapter_code),  # 优先使用 details 中的 chapter_code
+                                    "check_item_code": f"{details.get('chapter_code', chapter_code)}_{check_name}",
+                                    "check_result": issue_item.get("issue_point", ""),
+                                    "exist_issue": True,
+                                    "risk_info": {"risk_level": issue_item.get("risk_level", "medium")},
+                                    # 保留原始字段
+                                    "location": issue_item.get("location", ""),
+                                    "suggestion": issue_item.get("suggestion", ""),
+                                    "reason": issue_item.get("reason", ""),
+                                    "review_references": issue_item.get("review_references", ""),
+                                    "reference_source": issue_item.get("reference_source", "")
+                                }
+                                check_issues.append(formatted_issue)
+                        logger.debug(f"检查项 {check_name} 的 response 已是列表格式,直接处理 {len(check_issues)} 个问题项")
+                    else:
+                        # response 是字符串,使用原有的解析逻辑
+                        check_issues = self.parse_ai_review_response(response, check_name, chapter_code)
 
                     # 将解析后的结果添加到review_lists中
                     for check_issue in check_issues:

+ 6 - 6
core/construction_review/workflows/ai_review_workflow.py

@@ -506,12 +506,12 @@ class AIReviewWorkflow:
                     state = state,
                     stage_name = state.get("stage_name", "完整性审查")
                 )
-                # outline_review_result = {} 
-                outline_review_result = await self.ai_review_engine.outline_check(
-                    trace_id_idx = state["callback_task_id"],
-                    outline_content = state["structured_content"],
-                    state = state,
-                    stage_name = state.get("stage_name", "大纲审查"))
+                outline_review_result = {} 
+                # outline_review_result = await self.ai_review_engine.outline_check(
+                #     trace_id_idx = state["callback_task_id"],
+                #     outline_content = state["structured_content"],
+                #     state = state,
+                #     stage_name = state.get("stage_name", "大纲审查"))
                 # with open(r"temp\document_temp\4_check_completeness_result.json", "w", encoding="utf-8") as f:
                 #     json.dump(check_completeness_result, f, ensure_ascii=False, indent=4)
 

+ 45 - 11
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -43,6 +43,8 @@ from foundation.observability.logger.loggering import server_logger as logger
 from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
 from core.base.task_models import TaskFileInfo
 from ...component.reviewers.utils.inter_tool import InterTool
+# from ...component.reviewers.outline_check import outline_review_results_df, merge_results_by_classification
+from ...component.reviewers.check_completeness.utils.redis_csv_utils import   read_from_redis_and_save_csv
 from ..types import AIReviewState
 
 # 常量定义
@@ -59,6 +61,7 @@ class UnitReviewResult():
     rag_enhanced: Dict[str, Any]
     overall_risk: str
     is_sse_push: bool = True  # 是否成功执行并推送SSE,默认为True
+    trace_id_idx:Optional[str] = None
 
 
 class AIReviewCoreFun:
@@ -122,10 +125,11 @@ class AIReviewCoreFun:
         """
         logger.info(f"📝 处理章节: {chapter_code}, 共 {len(chapter_content)} 个chunk")
         # chapter_total_chunks = len(chapter_content)
-
         # 按块串行遍历(所有章节统一流程)
+        trace_id_idx = None  # 初始化 trace_id_idx
         for chunk_index, chunk in enumerate(chapter_content):
             # 计算全局块索引,避免不同章节间索引重复导致Redis计数错误
+            
             global_chunk_index = completed_chunks + chunk_index
             chunk_label = chunk.get("section_label", f"chunk_{global_chunk_index}")
             logger.info(f"  📄 处理块 {global_chunk_index+1}/{total_chunks}: {chunk_label}")
@@ -135,13 +139,15 @@ class AIReviewCoreFun:
                 logger.warning("块审查检测到终止信号")
                 return chunk_index, all_issues  # 返回已处理的块数和issues
 
+                # continue  # 跳过未成功执行的块
             # 并发执行当前块的所有审查方法
             chunk_results = await self._execute_chunk_methods(
                 chapter_code, chunk, global_chunk_index, func_names, state
             )
             if not chunk_results.get('is_sse_push', False):
-                logger.info(f"跳过当前未成功审查块 {chunk_index} 处理完成")                
+                logger.info(f"跳过当前未成功审查块 {chunk_index} 处理完成")
                 continue  # 跳过未成功执行的块
+
             # 格式化当前块的结果为issues
             chunk_page = chunk.get('page', '')
             review_location_label = f"第{chunk_page}页:{chunk_label}"
@@ -245,6 +251,7 @@ class AIReviewCoreFun:
         semaphore = asyncio.Semaphore(5)  # 单个块内限制并发数为5
         rag_enhanced_content = None  # 初始化变量,避免作用域错误
         basis_content = None  # 初始化变量,避免作用域错误
+        rows_df = None
         is_complete_field = chunk.get('is_complete_field', False)
         logger.info(f"检查is_complete_field值是否正常: {is_complete_field}")
         # 只有非完整性审查的chunk才执行RAG检索(注意括号位置,确保运算符优先级正确)
@@ -261,11 +268,15 @@ class AIReviewCoreFun:
                 chunk.get('content', ''),
                 chapter_code
             )
+
         async def execute_with_semaphore(func_name):
             async with semaphore:
                 try:
                     # 创建并执行单个审查任务
-                    result = await self._execute_single_review(chapter_code, chunk, chunk_index, func_name, state,rag_enhanced_content, basis_content)
+                    result = await self._execute_single_review(
+                        chapter_code, chunk, chunk_index, func_name, state,
+                        rag_enhanced_content, basis_content
+                    )
                     return func_name, result
                 except Exception as e:
                     logger.error(f"审查任务执行失败 [{chapter_code}.chunk{chunk_index}.{func_name}]: {str(e)}")
@@ -277,13 +288,14 @@ class AIReviewCoreFun:
                         rag_enhanced={},
                         overall_risk="error"
                     )
+                
 
         # 创建并发任务
         async_tasks = [execute_with_semaphore(func_name) for func_name in func_names]
-
-        # 等待当前块所有方法完成
         completed_results = await asyncio.gather(*async_tasks, return_exceptions=True)
 
+
+
         # 合并所有 UnitReviewResult 对象的 basic_compliance 和 technical_compliance
         merged_basic = {}
         merged_technical = {}
@@ -307,6 +319,7 @@ class AIReviewCoreFun:
                     merged_technical.update(review_result.technical_compliance)
                     # 合并 rag_enhanced
                     merged_rag.update(review_result.rag_enhanced)
+                    # trace_id_idx = review_result.trace_id_idx    
 
         return {
             'basic_compliance': merged_basic,
@@ -315,7 +328,7 @@ class AIReviewCoreFun:
             'is_sse_push': has_success  # 添加 is_sse_push 字段
         }
 
-    async def _execute_single_review(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_name: str, state: AIReviewState,rag_enhanced_content :dict = None, basis_content: dict = None) -> UnitReviewResult:
+    async def _execute_single_review(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_name: str, state: AIReviewState, rag_enhanced_content: dict = None, basis_content: dict = None) -> UnitReviewResult:
         """
         执行单个块的审查任务
 
@@ -397,18 +410,39 @@ class AIReviewCoreFun:
 
         elif func_name == "check_completeness" and is_complete_field:
             # check_completeness 需要列表类型,将单个 chunk 包装成列表
-            raw_result = await method(trace_id, [chunk], state, stage_name)
-            # 基础审查方法,放入 basic_compliance
+            completeness_result, trace_id_idx = await method(trace_id, [chunk], state, stage_name)
+
+            # 在 check_completeness 完成后,立即调用 outline_check
+            # 使用 check_completeness 返回的 trace_id_idx
+            logger.info(f"[check_completeness完成] 开始调用 outline_check (trace_id_idx: {trace_id_idx})")
+
+            # 准备 outline_check 所需的数据(从 Redis 读取)
+            from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
+            from core.construction_review.component.reviewers.check_completeness.utils.redis_csv_utils import read_from_redis_and_save_csv
+            redis_client = await RedisConnectionFactory.get_connection()
+            outline_data_df = await read_from_redis_and_save_csv(redis_client, bind_id=trace_id_idx)
+            logger.info(f"[outline_check] 获取大纲数据成功:{outline_data_df}")
+            outline_result = await self.ai_review_engine.outline_check(
+                outline_content=outline_data_df,
+                trace_id_idx=trace_id_idx,
+                state=state,
+                stage_name=f"{stage_name}_大纲审查"
+            )
+            logger.info(f"[outline_check完成] 共发现 {len(outline_result.get('details', {}).get('response', []))} 个缺失项")
+
+            # 将两个结果都放入 basic_compliance
             return UnitReviewResult(
                 unit_index=chunk_index,
                 unit_content=chunk,
-                basic_compliance={func_name: raw_result},
+                basic_compliance={
+                    "check_completeness": completeness_result,
+                    "outline_check": outline_result
+                },
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result),
+                overall_risk=self._calculate_single_result_risk(completeness_result),
                 is_sse_push=True
             )
-
         elif func_name == "check_non_parameter_compliance" and not is_complete_field:
             # 技术审查方法需要从 RAG 检索结果中获取 references
             raw_result = await self._execute_technical_review(

+ 401 - 0
utils_test/RAG_Test/test_hybrid_search_debug.py

@@ -0,0 +1,401 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+混合检索问题诊断脚本
+用于排查 hybrid_search 返回0结果的问题
+"""
+
+import sys
+import os
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+
+from pymilvus import connections, Collection, utility
+from foundation.ai.models.model_handler import model_handler
+from foundation.observability.logger.loggering import server_logger as logger
+
+
+def check_milvus_connection():
+    """检查 Milvus 连接"""
+    print("\n" + "="*60)
+    print("1. 检查 Milvus 连接")
+    print("="*60)
+    try:
+        from foundation.infrastructure.config.config import config_handler
+        host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
+        port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
+        
+        connections.connect(
+            alias="debug",
+            host=host,
+            port=port,
+            db_name="lq_db"
+        )
+        print(f"✅ Milvus 连接成功: {host}:{port}")
+        return True
+    except Exception as e:
+        print(f"❌ Milvus 连接失败: {e}")
+        return False
+
+
+def check_collection_exists(collection_name: str):
+    """检查 Collection 是否存在"""
+    print(f"\n2. 检查 Collection 是否存在: {collection_name}")
+    print("-"*60)
+    
+    exists = utility.has_collection(collection_name, using="debug")
+    if exists:
+        print(f"✅ Collection '{collection_name}' 存在")
+    else:
+        print(f"❌ Collection '{collection_name}' 不存在!")
+    return exists
+
+
+def check_collection_schema(collection_name: str):
+    """检查 Collection Schema 结构"""
+    print(f"\n3. 检查 Collection Schema 结构")
+    print("-"*60)
+    
+    try:
+        col = Collection(collection_name, using="debug")
+        schema = col.schema
+        
+        print(f"Collection: {collection_name}")
+        print(f"Description: {schema.description}")
+        print(f"\n字段列表:")
+        
+        has_dense = False
+        has_sparse = False
+        field_names = []
+        
+        for field in schema.fields:
+            field_names.append(field.name)
+            print(f"  - {field.name}: {field.dtype.name}", end="")
+            if hasattr(field, 'dim') and field.dim:
+                print(f" (dim={field.dim})", end="")
+            if field.is_primary:
+                print(" [PRIMARY]", end="")
+            print()
+            
+            # 检查关键字段
+            if field.name == "dense":
+                has_dense = True
+            if field.name == "sparse":
+                has_sparse = True
+        
+        print(f"\n混合搜索所需字段检查:")
+        print(f"  - dense 字段: {'✅ 存在' if has_dense else '❌ 不存在'}")
+        print(f"  - sparse 字段: {'✅ 存在' if has_sparse else '❌ 不存在'}")
+        
+        if not has_dense or not has_sparse:
+            print(f"\n⚠️  警告: Collection 缺少混合搜索所需的字段!")
+            print(f"   混合搜索需要 'dense' 和 'sparse' 两个字段")
+            print(f"   当前字段: {field_names}")
+        
+        return has_dense and has_sparse
+        
+    except Exception as e:
+        print(f"❌ 获取 Schema 失败: {e}")
+        return False
+
+
+def check_collection_data(collection_name: str):
+    """检查 Collection 数据量"""
+    print(f"\n4. 检查 Collection 数据量")
+    print("-"*60)
+    
+    try:
+        col = Collection(collection_name, using="debug")
+        col.load()
+        num_entities = col.num_entities
+        
+        print(f"数据量: {num_entities} 条")
+        
+        if num_entities == 0:
+            print("❌ Collection 为空,没有数据!")
+            return False
+        else:
+            print("✅ Collection 有数据")
+            return True
+            
+    except Exception as e:
+        print(f"❌ 获取数据量失败: {e}")
+        return False
+
+
+def check_collection_index(collection_name: str):
+    """检查 Collection 索引"""
+    print(f"\n5. 检查 Collection 索引")
+    print("-"*60)
+    
+    try:
+        col = Collection(collection_name, using="debug")
+        indexes = col.indexes
+        
+        if not indexes:
+            print("❌ 没有索引!")
+            return False
+        
+        for idx in indexes:
+            print(f"  - 字段: {idx.field_name}")
+            print(f"    索引参数: {idx.params}")
+        
+        print("✅ 索引存在")
+        return True
+        
+    except Exception as e:
+        print(f"❌ 获取索引失败: {e}")
+        return False
+
+
+def test_traditional_search(collection_name: str, query_text: str):
+    """测试传统向量搜索(不使用混合搜索)"""
+    print(f"\n6. 测试传统向量搜索")
+    print("-"*60)
+    
+    try:
+        col = Collection(collection_name, using="debug")
+        col.load()
+        
+        # 获取 embedding
+        emdmodel = model_handler.get_embedding_model()
+        query_vector = emdmodel.embed_query(query_text)
+        
+        print(f"查询文本: {query_text}")
+        print(f"向量维度: {len(query_vector)}")
+        
+        # 确定向量字段名
+        vector_field = None
+        for field in col.schema.fields:
+            if "FLOAT_VECTOR" in str(field.dtype):
+                vector_field = field.name
+                break
+        
+        if not vector_field:
+            print("❌ 未找到向量字段")
+            return False
+        
+        print(f"向量字段: {vector_field}")
+        
+        # 执行搜索
+        search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
+        results = col.search(
+            data=[query_vector],
+            anns_field=vector_field,
+            param=search_params,
+            limit=5,
+            output_fields=["text"]
+        )
+        
+        print(f"\n搜索结果: {len(results[0])} 条")
+        for i, hit in enumerate(results[0]):
+            print(f"  {i+1}. ID={hit.id}, 距离={hit.distance:.4f}")
+        
+        if len(results[0]) > 0:
+            print("✅ 传统向量搜索正常")
+            return True
+        else:
+            print("❌ 传统向量搜索也返回0结果")
+            return False
+            
+    except Exception as e:
+        print(f"❌ 传统搜索失败: {e}")
+        import traceback
+        traceback.print_exc()
+        return False
+
+
+def test_langchain_hybrid_search(collection_name: str, query_text: str):
+    """测试 LangChain Milvus 混合搜索"""
+    print(f"\n7. 测试 LangChain Milvus 混合搜索")
+    print("-"*60)
+    
+    try:
+        from langchain_milvus import Milvus, BM25BuiltInFunction
+        from foundation.infrastructure.config.config import config_handler
+        
+        host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
+        port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
+        
+        connection_args = {
+            "uri": f"http://{host}:{port}",
+            "db_name": "lq_db"
+        }
+        
+        emdmodel = model_handler.get_embedding_model()
+        
+        print(f"尝试连接 Collection: {collection_name}")
+        print(f"连接参数: {connection_args}")
+        
+        # 尝试创建 vectorstore
+        vectorstore = Milvus(
+            embedding_function=emdmodel,
+            collection_name=collection_name,
+            connection_args=connection_args,
+            consistency_level="Strong",
+            builtin_function=BM25BuiltInFunction(),
+            vector_field=["dense", "sparse"]
+        )
+        
+        print("✅ Vectorstore 创建成功")
+        
+        # 执行混合搜索
+        print(f"\n执行混合搜索,查询: {query_text}")
+        results = vectorstore.similarity_search_with_score(
+            query=query_text,
+            k=5,
+            ranker_type="weighted",
+            ranker_params={"weights": [0.7, 0.3]}
+        )
+        
+        print(f"搜索结果: {len(results)} 条")
+        for i, (doc, score) in enumerate(results):
+            content = doc.page_content[:50] if doc.page_content else "N/A"
+            print(f"  {i+1}. score={score:.4f}, content={content}...")
+        
+        if len(results) > 0:
+            print("✅ 混合搜索正常")
+            return True
+        else:
+            print("❌ 混合搜索返回0结果")
+            return False
+            
+    except Exception as e:
+        print(f"❌ 混合搜索失败: {e}")
+        import traceback
+        traceback.print_exc()
+        return False
+
+
+def test_retrieval_manager(collection_name: str, query_text: str):
+    """测试 RetrievalManager 的混合搜索"""
+    print(f"\n8. 测试 RetrievalManager 混合搜索")
+    print("-"*60)
+    
+    try:
+        from foundation.ai.rag.retrieval.retrieval import retrieval_manager
+        
+        results = retrieval_manager.hybrid_search_recall(
+            collection_name=collection_name,
+            query_text=query_text,
+            top_k=5,
+            ranker_type="weighted",
+            dense_weight=0.7,
+            sparse_weight=0.3
+        )
+        
+        print(f"搜索结果: {len(results)} 条")
+        for i, result in enumerate(results):
+            content = result.get('text_content', '')[:50]
+            print(f"  {i+1}. {content}...")
+        
+        if len(results) > 0:
+            print("✅ RetrievalManager 混合搜索正常")
+            return True
+        else:
+            print("❌ RetrievalManager 混合搜索返回0结果")
+            return False
+            
+    except Exception as e:
+        print(f"❌ RetrievalManager 测试失败: {e}")
+        import traceback
+        traceback.print_exc()
+        return False
+
+
+def main():
+    """主诊断函数"""
+    print("\n" + "="*60)
+    print("混合检索问题诊断")
+    print("="*60)
+    
+    # 配置
+    collection_name = "first_bfp_collection_entity"
+    query_text = "高空作业"
+    
+    print(f"\n诊断目标:")
+    print(f"  - Collection: {collection_name}")
+    print(f"  - 查询文本: {query_text}")
+    
+    # 执行诊断
+    results = {}
+    
+    # 1. 检查连接
+    results['connection'] = check_milvus_connection()
+    if not results['connection']:
+        print("\n❌ Milvus 连接失败,无法继续诊断")
+        return
+    
+    # 2. 检查 Collection 存在
+    results['exists'] = check_collection_exists(collection_name)
+    if not results['exists']:
+        print(f"\n❌ Collection '{collection_name}' 不存在,无法继续诊断")
+        return
+    
+    # 3. 检查 Schema
+    results['schema'] = check_collection_schema(collection_name)
+    
+    # 4. 检查数据量
+    results['data'] = check_collection_data(collection_name)
+    
+    # 5. 检查索引
+    results['index'] = check_collection_index(collection_name)
+    
+    # 6. 测试传统搜索
+    results['traditional'] = test_traditional_search(collection_name, query_text)
+    
+    # 7. 测试 LangChain 混合搜索
+    results['langchain'] = test_langchain_hybrid_search(collection_name, query_text)
+    
+    # 8. 测试 RetrievalManager
+    results['retrieval'] = test_retrieval_manager(collection_name, query_text)
+    
+    # 总结
+    print("\n" + "="*60)
+    print("诊断总结")
+    print("="*60)
+    
+    for key, value in results.items():
+        status = "✅" if value else "❌"
+        print(f"  {status} {key}")
+    
+    # 给出建议
+    print("\n" + "="*60)
+    print("问题分析与建议")
+    print("="*60)
+    
+    if not results.get('schema'):
+        print("""
+⚠️  主要问题: Collection Schema 不支持混合搜索
+
+原因: Collection 缺少 'dense' 和 'sparse' 字段
+      混合搜索需要在创建 Collection 时使用 BM25BuiltInFunction
+
+解决方案:
+1. 使用 create_hybrid_collection 方法重新创建 Collection
+2. 或者修改代码,对不支持混合搜索的 Collection 使用传统向量搜索
+""")
+    
+    if results.get('traditional') and not results.get('langchain'):
+        print("""
+⚠️  问题: 传统搜索正常,但混合搜索失败
+
+可能原因:
+1. Collection 创建时未启用 BM25 功能
+2. LangChain Milvus 版本兼容性问题
+3. vector_field 配置与实际字段名不匹配
+
+建议:
+1. 检查 Collection 创建方式
+2. 确认 langchain-milvus 版本
+""")
+    
+    if not results.get('data'):
+        print("""
+⚠️  问题: Collection 为空
+
+解决方案: 先向 Collection 中导入数据
+""")
+
+
+if __name__ == "__main__":
+    main()