Forráskód Böngészése

v0.0.5-功能优化-完整性审查
- 完整性审查-大纲完整性审查初版

WangXuMing 1 hónapja
szülő
commit
efa8f3a6a9

+ 72 - 35
core/construction_review/component/ai_review_engine.py

@@ -762,25 +762,13 @@ class AIReviewEngine(BaseReviewer):
             review_results_df = pd.DataFrame(review_results)
             chapter_labels = review_results_df['section_label'].str.split('->').str[0]
             review_results_df['title'] = chapter_labels
-            # review_results_df.to_csv(Path('temp') / 'document_temp' / '2_spec_review_results.csv', encoding='utf-8-sig', index=False)
-            # csv_file = rf'temp\document_temp\2_spec_review_results.csv'
-            # path2 = rf'temp\document_temp\outlines_review_results.csv'
-            # data_df = pd.read_csv(csv_file, encoding='utf-8-sig')
-            # data_df = review_results_df
-            outline_review_results = outline_review_results_df(data=review_results_df)
-  
+
+            # 将审查结果存储到Redis,供 outline_check 使用
             logger.info(f"[完整性检查] 准备将大纲审查结果存储到Redis,bind_id: {trace_id_idx}")
-            logger.info(f"[完整性检查] 大纲审查结果行数: {len(outline_review_results) if outline_review_results is not None else 'None'}")
-            outline_review_result = await self.outline_check(
-                trace_id_idx = state["callback_task_id"],
-                outline_content = outline_review_results,
-                state = state,
-                stage_name = state.get("stage_name", "大纲审查")
-            )
-            # df_store_to_redis(self.redis_client, data=outline_review_results, bind_id=trace_id_idx)
-            
+            from .reviewers.check_completeness.utils.redis_csv_utils import df_store_to_redis
+            df_store_to_redis(self.redis_client, data=review_results_df, bind_id=trace_id_idx)
             logger.info(f"[完整性检查] 数据已成功存储到Redis,bind_id: {trace_id_idx}")
-            
+
             df_filtered = review_results_df.drop_duplicates(subset='title', keep='first').reset_index(drop=True)
             unique_chapter_labels = chapter_labels.unique().tolist()
             chapter_classifications = df_filtered['chapter_classification']
@@ -1006,10 +994,17 @@ class AIReviewEngine(BaseReviewer):
         metadata = {}
         try:
             # 从Redis读取并保存为新的CSV文件
-            # rows_df = read_from_redis_and_save_csv(self.redis_client, bind_id=trace_id_idx)
-            df = outline_content
+            # 如果传入的 outline_content 为 None,尝试从 Redis 获取数据
+            if outline_content is None:
+                logger.info(f"[大纲审查] outline_content 为 None,尝试从 Redis 获取数据 (bind_id: {trace_id_idx})")
+                from .reviewers.check_completeness.utils.redis_csv_utils import read_from_redis_and_save_csv
+                df = read_from_redis_and_save_csv(self.redis_client, bind_id=trace_id_idx)
+            else:
+                df = outline_content
+
             # df = merge_results_by_classification(rows_df)
-            df.to_csv(csv_path, encoding='utf-8-sig', index=False)
+            if df is not None:
+                df.to_csv(csv_path, encoding='utf-8-sig', index=False)
             # 检查 df 是否为 None
             if df is None:
                 logger.error(f"[大纲审查] Redis中不存在ID '{trace_id_idx}' 的数据,无法进行大纲审查")
@@ -1021,16 +1016,32 @@ class AIReviewEngine(BaseReviewer):
                 }
             
             logger.info(f"[大纲审查] 成功从Redis读取数据,共 {len(df)} 行")
-            
+
+            # 检查 df 是否为空
+            if df.empty or len(df) == 0:
+                logger.warning(f"[大纲审查] DataFrame为空,无法进行大纲审查")
+                return {
+                    'outline_review_result': {
+                        "response": [],
+                    },
+                    'success': False,
+                    'execution_time': time.time() - start_time
+                }
+
             # 兼容新旧字段名
-            chapter_label_col = 'chapter_label' if 'chapter_label' in df.columns else 'section_label_first'
-            review_results_col = 'review_results_summary' if 'review_results_summary' in df.columns else 'merged_review_results'
-            
+            chapter_label_col = 'chapter_label' if 'chapter_label' in df.columns else ('section_label_first' if 'section_label_first' in df.columns else 'section_label')
+            review_results_col = 'review_results_summary' if 'review_results_summary' in df.columns else ('merged_review_results' if 'merged_review_results' in df.columns else 'review_result')
+
+            logger.info(f"[大纲审查] 使用 chapter_label_col={chapter_label_col}, review_results_col={review_results_col}")
+            logger.info(f"[大纲审查] DataFrame 列: {list(df.columns)}")
+
             # 遍历每一行
             for index, row in df.iterrows():
                 chapter_label = row.get(chapter_label_col, '')
                 merged_results_str = row.get(review_results_col, '')
-                
+
+                logger.info(f"第 {index} 行: chapter_label={chapter_label}, {review_results_col}={merged_results_str}")
+
                 # 解析review_results_summary字典字符串
                 try:
                     if pd.isna(merged_results_str) or merged_results_str == '':
@@ -1048,21 +1059,39 @@ class AIReviewEngine(BaseReviewer):
                     except (json.JSONDecodeError, TypeError):
                         logger.warning(f"第 {index} 行无法解析review_results_summary: {merged_results_str}")
                         merged_results = {}
-                
+
                 # 检查字典中的每个字段
                 if isinstance(merged_results, dict):
-                    logger.info(f"开始大纲审查")
-                    
+                    # 检查是否有错误信息
+                    if 'error' in merged_results:
+                        logger.warning(f"第 {index} 行审查结果包含错误: {merged_results.get('error')}")
+                        continue  # 跳过错误行,不进行大纲审查
+
+                    logger.info(f"第 {index} 行 merged_results 键: {list(merged_results.keys())}")
+
                     # 获取chapter_label列表
                     chapter_labels_list = row.get(chapter_label_col, [])
                     if not isinstance(chapter_labels_list, list):
                         chapter_labels_list = [str(chapter_labels_list)]
-                    
+
                     # 获取review_results_summary字典的所有键,用于reason字段
                     merged_results_keys = list(merged_results.keys())
+                    # 排除error等非审查字段
+                    merged_results_keys = [k for k in merged_results_keys if k not in ['error', 'chunk_id', 'page', 'section_label', 'chapter_classification', 'chapter_code', 'title']]
+
+                    logger.info(f"第 {index} 行过滤后的有效字段: {merged_results_keys}")
+
+                    if not merged_results_keys:
+                        logger.info(f"第 {index} 行没有有效的审查字段,跳过大纲审查")
+                        continue
+
                     merged_results_keys_str = "、".join(merged_results_keys)
-                    
+
+                    # 只处理有效的审查字段
                     for field_name, field_value in merged_results.items():
+                        # 跳过非审查字段
+                        if field_name in ['error', 'chunk_id', 'page']:
+                            continue
                         # 检查列表是否为空
                         if isinstance(field_value, list) and len(field_value) == 0:
                             # 为chapter_label列表中的每个值创建单独的缺失项
@@ -1074,8 +1103,8 @@ class AIReviewEngine(BaseReviewer):
                                     "suggestion": f"在待审查目录中未找到与'{field_name}'对应的章节;当前章节仅涉及'{chapter_label}',未涵盖'{field_name}'相关内容;整改建议:建议在本章或前序章节中增设'{field_name}'相关内容,确保与审查规范要求一致。",
                                     "reason": f"本章应包含{merged_results_keys_str}等{len(merged_results_keys)}个方面。",
                                     "risk_level": "高风险",
-                                    "review_references": '',
-                                    "reference_source": '',
+                                    # "review_references": '',
+                                    "reference_source": '《桥梁公司危险性较大工程管理实施细则(2025版)》',
 
                                 }
                                 missing_items.append(missing_item)
@@ -1093,7 +1122,7 @@ class AIReviewEngine(BaseReviewer):
             execution_time = time.time() - start_time
             return {
                 "details": {
-                    "name": "outline_check",
+                    "name": "completeness_check",
                     "response": [],
                     "review_location_label": "",
                     "chapter_code": "catalogue",
@@ -1107,7 +1136,7 @@ class AIReviewEngine(BaseReviewer):
             execution_time = time.time() - start_time
             return {
                 "details": {
-                    "name": "outline_check",
+                    "name": "completeness_check",
                     "response": [],
                     "review_location_label": "",
                     "chapter_code": "catalogue",
@@ -1117,11 +1146,19 @@ class AIReviewEngine(BaseReviewer):
                 "execution_time": execution_time
             }
         logger.info(f"大纲审查完成,耗时 {execution_time:.2f} 秒")
+
+        # 获取 review_location_label,使用兼容的字段名
+        review_location_col = 'chapter_label' if 'chapter_label' in df.columns else 'section_label_first'
+        if review_location_col not in df.columns or len(df) == 0:
+            review_location_label = "未知位置"
+        else:
+            review_location_label = df[review_location_col].to_list()[-1]
+
         outcheck_result = {
                 "details": {
                     "name": "completeness_check",
                     "response": missing_items,
-                    "review_location_label": df['chapter_label'].to_list()[-1],
+                    "review_location_label": review_location_label,
                     "chapter_code": 'catalogue',
                     "original_content": ""
                 },

+ 1 - 1
core/construction_review/component/reviewers/check_completeness/components/result_analyzer.py

@@ -243,7 +243,7 @@ class ResultAnalyzer(IResultAnalyzer):
         for row in summary_rows:
             level2 = (row.get("二级目录") or "").strip()
             requirement = (row.get("内容要求") or "").strip()
-            reference_source = '桥梁公司危险性较大工程管理实施细则(2025版)'
+            reference_source = '桥梁公司危险性较大工程管理实施细则(2025版)'
             reason= f"参照:{reference_source} 中的内容要求,{row.get('section_label', '')}内容属于,专项施工方案内容要求中的 【{suorces_eum[row.get("标签", "")]}】 板块,应包含{requirement}"
             review_references = (row.get("依据") or "").strip()
             

+ 17 - 17
core/construction_review/component/reviewers/check_completeness/utils/redis_csv_utils.py

@@ -105,55 +105,55 @@ def df_store_to_redis(redis_client, data=None, bind_id=None):
     return rows
 
 
-def read_from_redis_and_save_csv(redis_client, bind_id=None, csv_save_path=None):
-    """从Redis一次性读取所有数据并保存为CSV文件
-    
+async def read_from_redis_and_save_csv(redis_client, bind_id=None, csv_save_path=None):
+    """从Redis一次性读取所有数据并保存为CSV文件(异步版本)
+
     Args:
         redis_client: Redis客户端
         bind_id: 绑定ID
         csv_save_path: CSV文件保存路径(可选)
-    
+
     Returns:
         pandas.DataFrame: 包含数据的DataFrame,如果Redis中不存在数据则返回空DataFrame
     """
     logger.info(f"从Redis读取数据,bind_id: {bind_id}")
-    
+
     # 检查数据是否存在
-    key_exists = redis_client.exists(bind_id)
+    key_exists = await redis_client.exists(bind_id)
     if not key_exists:
         logger.warning(f"Redis中不存在ID '{bind_id}' 的数据,返回空DataFrame")
         return pd.DataFrame()  # 返回空DataFrame而不是None
-    
+
     # # 获取表头
-    # header_json = redis_client.hget(bind_id, "header")
+    # header_json = await redis_client.hget(bind_id, "header")
     # if not header_json:
     #     logger.info("[ERROR] 未找到表头数据")
     #     return
-    
+
     # header = json.loads(header_json)
     # logger.info(f"[OK] 读取到表头: {header}")
-    
+
     # 获取行数
-    row_count = int(redis_client.hget(bind_id, "row_count") or 0)
-    
+    row_count = int(await redis_client.hget(bind_id, "row_count") or 0)
+
     # 读取所有行数据
     rows = []
     for idx in range(1, row_count + 1):
         row_key = f"row_{idx}"
-        row_json = redis_client.hget(bind_id, row_key)
+        row_json = await redis_client.hget(bind_id, row_key)
         if row_json:
             row = json.loads(row_json)
             rows.append(row)
-    
+
     # 使用pandas保存为新的CSV文件
     df_output = pd.DataFrame(rows)
     if csv_save_path:
         df_output.to_csv(OUTPUT_CSV, index=False, encoding='utf-8-sig')
         logger.info(f"数据已保存到: {OUTPUT_CSV}")
-    
+
     # 读取完成后删除Redis中的bind_id数据
-    redis_client.delete(bind_id)
-    
+    await redis_client.delete(bind_id)
+
     return df_output
 
 

+ 27 - 2
core/construction_review/component/reviewers/utils/inter_tool.py

@@ -333,7 +333,7 @@ class InterTool:
 
             # 检查 check_result 是否为 None 或不包含 details
             if is_dict and (not check_result or "details" not in check_result):
-                logger.warning(f"检查项 {check_key} 结果为空或缺少details字段,跳过")
+                logger.debug(f"检查项 {check_key} 结果为空或缺少details字段,跳过")
                 continue
 
             if is_review_result and not check_result.details:
@@ -360,7 +360,32 @@ class InterTool:
 
                 if check_name  not in TRCH_CHECK_ITEMS:
                     logger.debug(f"检查项 {check_name} 无参考来源,直接解析响应")
-                    check_issues = self.parse_ai_review_response(response, check_name,chapter_code)
+
+                    # 🔧 检查 response 是否已经是列表格式(如 outline_check 返回的格式)
+                    if isinstance(response, list):
+                        check_issues = []
+                        for issue_item in response:
+                            if isinstance(issue_item, dict):
+                                # 直接使用已格式化的项,确保必要字段存在
+                                formatted_issue = {
+                                    "check_item": check_name,
+                                    "chapter_code": details.get("chapter_code", chapter_code),  # 优先使用 details 中的 chapter_code
+                                    "check_item_code": f"{details.get('chapter_code', chapter_code)}_{check_name}",
+                                    "check_result": issue_item.get("issue_point", ""),
+                                    "exist_issue": True,
+                                    "risk_info": {"risk_level": issue_item.get("risk_level", "medium")},
+                                    # 保留原始字段
+                                    "location": issue_item.get("location", ""),
+                                    "suggestion": issue_item.get("suggestion", ""),
+                                    "reason": issue_item.get("reason", ""),
+                                    "review_references": issue_item.get("review_references", ""),
+                                    "reference_source": issue_item.get("reference_source", "")
+                                }
+                                check_issues.append(formatted_issue)
+                        logger.debug(f"检查项 {check_name} 的 response 已是列表格式,直接处理 {len(check_issues)} 个问题项")
+                    else:
+                        # response 是字符串,使用原有的解析逻辑
+                        check_issues = self.parse_ai_review_response(response, check_name, chapter_code)
 
                     # 将解析后的结果添加到review_lists中
                     for check_issue in check_issues:

+ 1 - 1
core/construction_review/workflows/ai_review_workflow.py

@@ -294,7 +294,7 @@ class AIReviewWorkflow:
             review_func_mapping: Dict[str, Union[str, List[str]]] = {
                 'sensitive_word_check': 'sensitive_word_check',
                 'semantic_logic_check': 'check_semantic_logic',
-                'completeness_check': ['check_completeness','outline_check'],
+                'completeness_check': 'check_completeness',
                 'timeliness_check': 'timeliness_basis_reviewer',
                 'reference_check': 'reference_basis_reviewer',
                 'sensitive_check': 'check_sensitive',

+ 34 - 50
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -142,16 +142,12 @@ class AIReviewCoreFun:
                 # continue  # 跳过未成功执行的块
             # 并发执行当前块的所有审查方法
             chunk_results = await self._execute_chunk_methods(
-                chapter_code, chunk, global_chunk_index, func_names, state,trace_id_idx
+                chapter_code, chunk, global_chunk_index, func_names, state
             )
             if not chunk_results.get('is_sse_push', False):
-                logger.info(f"跳过当前未成功审查块 {chunk_index} 处理完成")                
+                logger.info(f"跳过当前未成功审查块 {chunk_index} 处理完成")
                 continue  # 跳过未成功执行的块
-            
-            if  chunk_results.get('trace_id_idx', None):
-                trace_id_idx = chunk_results.get('trace_id_idx', None)
-            else:
-                trace_id_idx = None
+
             # 格式化当前块的结果为issues
             chunk_page = chunk.get('page', '')
             review_location_label = f"第{chunk_page}页:{chunk_label}"
@@ -238,7 +234,7 @@ class AIReviewCoreFun:
             chapter_names = list(chapter_map.keys())
         return chapter_map, chapter_names
 
-    async def _execute_chunk_methods(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_names: List[str], state: AIReviewState,trace_id_idx: str =None) -> Dict[str, Any]:
+    async def _execute_chunk_methods(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_names: List[str], state: AIReviewState) -> Dict[str, Any]:
         """
         并发执行单个块的所有审查方法
 
@@ -272,16 +268,15 @@ class AIReviewCoreFun:
                 chunk.get('content', ''),
                 chapter_code
             )
-        if 'completeness_check' in func_names and trace_id_idx is not None:
-                logger.debug("开始提取大纲审查数据")
-                rows_df = read_from_redis_and_save_csv(self.redis_client, bind_id=trace_id_idx)
 
-        
         async def execute_with_semaphore(func_name):
             async with semaphore:
                 try:
                     # 创建并执行单个审查任务
-                    result = await self._execute_single_review(chapter_code, chunk, chunk_index, func_name, state,rag_enhanced_content, basis_content,rows_df,trace_id_idx)
+                    result = await self._execute_single_review(
+                        chapter_code, chunk, chunk_index, func_name, state,
+                        rag_enhanced_content, basis_content
+                    )
                     return func_name, result
                 except Exception as e:
                     logger.error(f"审查任务执行失败 [{chapter_code}.chunk{chunk_index}.{func_name}]: {str(e)}")
@@ -295,11 +290,8 @@ class AIReviewCoreFun:
                     )
                 
 
-
         # 创建并发任务
         async_tasks = [execute_with_semaphore(func_name) for func_name in func_names]
-
-        # 等待当前块所有方法完成
         completed_results = await asyncio.gather(*async_tasks, return_exceptions=True)
 
 
@@ -327,7 +319,7 @@ class AIReviewCoreFun:
                     merged_technical.update(review_result.technical_compliance)
                     # 合并 rag_enhanced
                     merged_rag.update(review_result.rag_enhanced)
-                    trace_id_idx = review_result.trace_id_idx    
+                    # trace_id_idx = review_result.trace_id_idx    
 
         return {
             'basic_compliance': merged_basic,
@@ -336,7 +328,7 @@ class AIReviewCoreFun:
             'is_sse_push': has_success  # 添加 is_sse_push 字段
         }
 
-    async def _execute_single_review(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_name: str, state: AIReviewState,rag_enhanced_content :dict = None, basis_content: dict = None,rows_df = None,trace_id_idx = None) -> UnitReviewResult:
+    async def _execute_single_review(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_name: str, state: AIReviewState, rag_enhanced_content: dict = None, basis_content: dict = None) -> UnitReviewResult:
         """
         执行单个块的审查任务
 
@@ -418,45 +410,37 @@ class AIReviewCoreFun:
 
         elif func_name == "check_completeness" and is_complete_field:
             # check_completeness 需要列表类型,将单个 chunk 包装成列表
-            raw_result,trace_id_idx = await method(trace_id, [chunk], state, stage_name)
-            # 基础审查方法,放入 basic_compliance
-            return UnitReviewResult(
-                unit_index=chunk_index,
-                unit_content=chunk,
-                basic_compliance={func_name: raw_result},
-                technical_compliance={},
-                rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result),
-                is_sse_push=True,
-                trace_id_idx = trace_id_idx
-            )
-        
-        elif func_name == "outline_check" and  trace_id_idx :
-            # check_completeness 需要列表类型,将单个 chunk 包装成列表
-            raw_result = await method(trace_id_idx, rows_df, state, stage_name)
-            # 基础审查方法,放入 basic_compliance
-            return UnitReviewResult(
-                unit_index=chunk_index,
-                unit_content=chunk,
-                basic_compliance={func_name: raw_result},
-                technical_compliance={},
-                rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result),
-                is_sse_push=True
+            completeness_result, trace_id_idx = await method(trace_id, [chunk], state, stage_name)
+
+            # 在 check_completeness 完成后,立即调用 outline_check
+            # 使用 check_completeness 返回的 trace_id_idx
+            logger.info(f"[check_completeness完成] 开始调用 outline_check (trace_id_idx: {trace_id_idx})")
+
+            # 准备 outline_check 所需的数据(从 Redis 读取)
+            from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
+            from core.construction_review.component.reviewers.check_completeness.utils.redis_csv_utils import read_from_redis_and_save_csv
+            redis_client = await RedisConnectionFactory.get_connection()
+            outline_data_df = await read_from_redis_and_save_csv(redis_client, bind_id=trace_id_idx)
+            logger.info(f"[outline_check] 获取大纲数据成功:{outline_data_df}")
+            outline_result = await self.ai_review_engine.outline_check(
+                outline_content=outline_data_df,
+                trace_id_idx=trace_id_idx,
+                state=state,
+                stage_name=f"{stage_name}_大纲审查"
             )
-        
+            logger.info(f"[outline_check完成] 共发现 {len(outline_result.get('details', {}).get('response', []))} 个缺失项")
 
-        elif func_name == "check_completeness" and is_complete_field:
-            # check_completeness 需要列表类型,将单个 chunk 包装成列表
-            raw_result = await method(trace_id, [chunk], state, stage_name)
-            # 基础审查方法,放入 basic_compliance
+            # 将两个结果都放入 basic_compliance
             return UnitReviewResult(
                 unit_index=chunk_index,
                 unit_content=chunk,
-                basic_compliance={func_name: raw_result},
+                basic_compliance={
+                    "check_completeness": completeness_result,
+                    "outline_check": outline_result
+                },
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result),
+                overall_risk=self._calculate_single_result_risk(completeness_result),
                 is_sse_push=True
             )
         elif func_name == "check_non_parameter_compliance" and not is_complete_field:

+ 401 - 0
utils_test/RAG_Test/test_hybrid_search_debug.py

@@ -0,0 +1,401 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+混合检索问题诊断脚本
+用于排查 hybrid_search 返回0结果的问题
+"""
+
+import sys
+import os
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+
+from pymilvus import connections, Collection, utility
+from foundation.ai.models.model_handler import model_handler
+from foundation.observability.logger.loggering import server_logger as logger
+
+
+def check_milvus_connection():
+    """检查 Milvus 连接"""
+    print("\n" + "="*60)
+    print("1. 检查 Milvus 连接")
+    print("="*60)
+    try:
+        from foundation.infrastructure.config.config import config_handler
+        host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
+        port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
+        
+        connections.connect(
+            alias="debug",
+            host=host,
+            port=port,
+            db_name="lq_db"
+        )
+        print(f"✅ Milvus 连接成功: {host}:{port}")
+        return True
+    except Exception as e:
+        print(f"❌ Milvus 连接失败: {e}")
+        return False
+
+
+def check_collection_exists(collection_name: str):
+    """检查 Collection 是否存在"""
+    print(f"\n2. 检查 Collection 是否存在: {collection_name}")
+    print("-"*60)
+    
+    exists = utility.has_collection(collection_name, using="debug")
+    if exists:
+        print(f"✅ Collection '{collection_name}' 存在")
+    else:
+        print(f"❌ Collection '{collection_name}' 不存在!")
+    return exists
+
+
+def check_collection_schema(collection_name: str):
+    """检查 Collection Schema 结构"""
+    print(f"\n3. 检查 Collection Schema 结构")
+    print("-"*60)
+    
+    try:
+        col = Collection(collection_name, using="debug")
+        schema = col.schema
+        
+        print(f"Collection: {collection_name}")
+        print(f"Description: {schema.description}")
+        print(f"\n字段列表:")
+        
+        has_dense = False
+        has_sparse = False
+        field_names = []
+        
+        for field in schema.fields:
+            field_names.append(field.name)
+            print(f"  - {field.name}: {field.dtype.name}", end="")
+            if hasattr(field, 'dim') and field.dim:
+                print(f" (dim={field.dim})", end="")
+            if field.is_primary:
+                print(" [PRIMARY]", end="")
+            print()
+            
+            # 检查关键字段
+            if field.name == "dense":
+                has_dense = True
+            if field.name == "sparse":
+                has_sparse = True
+        
+        print(f"\n混合搜索所需字段检查:")
+        print(f"  - dense 字段: {'✅ 存在' if has_dense else '❌ 不存在'}")
+        print(f"  - sparse 字段: {'✅ 存在' if has_sparse else '❌ 不存在'}")
+        
+        if not has_dense or not has_sparse:
+            print(f"\n⚠️  警告: Collection 缺少混合搜索所需的字段!")
+            print(f"   混合搜索需要 'dense' 和 'sparse' 两个字段")
+            print(f"   当前字段: {field_names}")
+        
+        return has_dense and has_sparse
+        
+    except Exception as e:
+        print(f"❌ 获取 Schema 失败: {e}")
+        return False
+
+
+def check_collection_data(collection_name: str):
+    """检查 Collection 数据量"""
+    print(f"\n4. 检查 Collection 数据量")
+    print("-"*60)
+    
+    try:
+        col = Collection(collection_name, using="debug")
+        col.load()
+        num_entities = col.num_entities
+        
+        print(f"数据量: {num_entities} 条")
+        
+        if num_entities == 0:
+            print("❌ Collection 为空,没有数据!")
+            return False
+        else:
+            print("✅ Collection 有数据")
+            return True
+            
+    except Exception as e:
+        print(f"❌ 获取数据量失败: {e}")
+        return False
+
+
+def check_collection_index(collection_name: str):
+    """检查 Collection 索引"""
+    print(f"\n5. 检查 Collection 索引")
+    print("-"*60)
+    
+    try:
+        col = Collection(collection_name, using="debug")
+        indexes = col.indexes
+        
+        if not indexes:
+            print("❌ 没有索引!")
+            return False
+        
+        for idx in indexes:
+            print(f"  - 字段: {idx.field_name}")
+            print(f"    索引参数: {idx.params}")
+        
+        print("✅ 索引存在")
+        return True
+        
+    except Exception as e:
+        print(f"❌ 获取索引失败: {e}")
+        return False
+
+
+def test_traditional_search(collection_name: str, query_text: str):
+    """测试传统向量搜索(不使用混合搜索)"""
+    print(f"\n6. 测试传统向量搜索")
+    print("-"*60)
+    
+    try:
+        col = Collection(collection_name, using="debug")
+        col.load()
+        
+        # 获取 embedding
+        emdmodel = model_handler.get_embedding_model()
+        query_vector = emdmodel.embed_query(query_text)
+        
+        print(f"查询文本: {query_text}")
+        print(f"向量维度: {len(query_vector)}")
+        
+        # 确定向量字段名
+        vector_field = None
+        for field in col.schema.fields:
+            if "FLOAT_VECTOR" in str(field.dtype):
+                vector_field = field.name
+                break
+        
+        if not vector_field:
+            print("❌ 未找到向量字段")
+            return False
+        
+        print(f"向量字段: {vector_field}")
+        
+        # 执行搜索
+        search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
+        results = col.search(
+            data=[query_vector],
+            anns_field=vector_field,
+            param=search_params,
+            limit=5,
+            output_fields=["text"]
+        )
+        
+        print(f"\n搜索结果: {len(results[0])} 条")
+        for i, hit in enumerate(results[0]):
+            print(f"  {i+1}. ID={hit.id}, 距离={hit.distance:.4f}")
+        
+        if len(results[0]) > 0:
+            print("✅ 传统向量搜索正常")
+            return True
+        else:
+            print("❌ 传统向量搜索也返回0结果")
+            return False
+            
+    except Exception as e:
+        print(f"❌ 传统搜索失败: {e}")
+        import traceback
+        traceback.print_exc()
+        return False
+
+
+def test_langchain_hybrid_search(collection_name: str, query_text: str):
+    """测试 LangChain Milvus 混合搜索"""
+    print(f"\n7. 测试 LangChain Milvus 混合搜索")
+    print("-"*60)
+    
+    try:
+        from langchain_milvus import Milvus, BM25BuiltInFunction
+        from foundation.infrastructure.config.config import config_handler
+        
+        host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
+        port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
+        
+        connection_args = {
+            "uri": f"http://{host}:{port}",
+            "db_name": "lq_db"
+        }
+        
+        emdmodel = model_handler.get_embedding_model()
+        
+        print(f"尝试连接 Collection: {collection_name}")
+        print(f"连接参数: {connection_args}")
+        
+        # 尝试创建 vectorstore
+        vectorstore = Milvus(
+            embedding_function=emdmodel,
+            collection_name=collection_name,
+            connection_args=connection_args,
+            consistency_level="Strong",
+            builtin_function=BM25BuiltInFunction(),
+            vector_field=["dense", "sparse"]
+        )
+        
+        print("✅ Vectorstore 创建成功")
+        
+        # 执行混合搜索
+        print(f"\n执行混合搜索,查询: {query_text}")
+        results = vectorstore.similarity_search_with_score(
+            query=query_text,
+            k=5,
+            ranker_type="weighted",
+            ranker_params={"weights": [0.7, 0.3]}
+        )
+        
+        print(f"搜索结果: {len(results)} 条")
+        for i, (doc, score) in enumerate(results):
+            content = doc.page_content[:50] if doc.page_content else "N/A"
+            print(f"  {i+1}. score={score:.4f}, content={content}...")
+        
+        if len(results) > 0:
+            print("✅ 混合搜索正常")
+            return True
+        else:
+            print("❌ 混合搜索返回0结果")
+            return False
+            
+    except Exception as e:
+        print(f"❌ 混合搜索失败: {e}")
+        import traceback
+        traceback.print_exc()
+        return False
+
+
+def test_retrieval_manager(collection_name: str, query_text: str):
+    """测试 RetrievalManager 的混合搜索"""
+    print(f"\n8. 测试 RetrievalManager 混合搜索")
+    print("-"*60)
+    
+    try:
+        from foundation.ai.rag.retrieval.retrieval import retrieval_manager
+        
+        results = retrieval_manager.hybrid_search_recall(
+            collection_name=collection_name,
+            query_text=query_text,
+            top_k=5,
+            ranker_type="weighted",
+            dense_weight=0.7,
+            sparse_weight=0.3
+        )
+        
+        print(f"搜索结果: {len(results)} 条")
+        for i, result in enumerate(results):
+            content = result.get('text_content', '')[:50]
+            print(f"  {i+1}. {content}...")
+        
+        if len(results) > 0:
+            print("✅ RetrievalManager 混合搜索正常")
+            return True
+        else:
+            print("❌ RetrievalManager 混合搜索返回0结果")
+            return False
+            
+    except Exception as e:
+        print(f"❌ RetrievalManager 测试失败: {e}")
+        import traceback
+        traceback.print_exc()
+        return False
+
+
+def main():
+    """主诊断函数"""
+    print("\n" + "="*60)
+    print("混合检索问题诊断")
+    print("="*60)
+    
+    # 配置
+    collection_name = "first_bfp_collection_entity"
+    query_text = "高空作业"
+    
+    print(f"\n诊断目标:")
+    print(f"  - Collection: {collection_name}")
+    print(f"  - 查询文本: {query_text}")
+    
+    # 执行诊断
+    results = {}
+    
+    # 1. 检查连接
+    results['connection'] = check_milvus_connection()
+    if not results['connection']:
+        print("\n❌ Milvus 连接失败,无法继续诊断")
+        return
+    
+    # 2. 检查 Collection 存在
+    results['exists'] = check_collection_exists(collection_name)
+    if not results['exists']:
+        print(f"\n❌ Collection '{collection_name}' 不存在,无法继续诊断")
+        return
+    
+    # 3. 检查 Schema
+    results['schema'] = check_collection_schema(collection_name)
+    
+    # 4. 检查数据量
+    results['data'] = check_collection_data(collection_name)
+    
+    # 5. 检查索引
+    results['index'] = check_collection_index(collection_name)
+    
+    # 6. 测试传统搜索
+    results['traditional'] = test_traditional_search(collection_name, query_text)
+    
+    # 7. 测试 LangChain 混合搜索
+    results['langchain'] = test_langchain_hybrid_search(collection_name, query_text)
+    
+    # 8. 测试 RetrievalManager
+    results['retrieval'] = test_retrieval_manager(collection_name, query_text)
+    
+    # 总结
+    print("\n" + "="*60)
+    print("诊断总结")
+    print("="*60)
+    
+    for key, value in results.items():
+        status = "✅" if value else "❌"
+        print(f"  {status} {key}")
+    
+    # 给出建议
+    print("\n" + "="*60)
+    print("问题分析与建议")
+    print("="*60)
+    
+    if not results.get('schema'):
+        print("""
+⚠️  主要问题: Collection Schema 不支持混合搜索
+
+原因: Collection 缺少 'dense' 和 'sparse' 字段
+      混合搜索需要在创建 Collection 时使用 BM25BuiltInFunction
+
+解决方案:
+1. 使用 create_hybrid_collection 方法重新创建 Collection
+2. 或者修改代码,对不支持混合搜索的 Collection 使用传统向量搜索
+""")
+    
+    if results.get('traditional') and not results.get('langchain'):
+        print("""
+⚠️  问题: 传统搜索正常,但混合搜索失败
+
+可能原因:
+1. Collection 创建时未启用 BM25 功能
+2. LangChain Milvus 版本兼容性问题
+3. vector_field 配置与实际字段名不匹配
+
+建议:
+1. 检查 Collection 创建方式
+2. 确认 langchain-milvus 版本
+""")
+    
+    if not results.get('data'):
+        print("""
+⚠️  问题: Collection 为空
+
+解决方案: 先向 Collection 中导入数据
+""")
+
+
+if __name__ == "__main__":
+    main()