15 Commit-ok 1bed752b20 ... 60f4d2d632

Szerző SHA1 Üzenet Dátum
  WangXuMing 60f4d2d632 v0.0.5-功能优化 1 hónapja
  ZengChao 328191db4b dev:过滤空白数组 1 hónapja
  ZengChao 4c894fc357 dev:规范性和时间性添加过滤匹配不到的将不参见审查 1 hónapja
  ChenJiSheng 44122e5f46 dev:添加了大纲审查的内存初版完成主框架接入; 1 hónapja
  ChenJiSheng 274c9aa8e2 dev:添加了大纲审查的临时文件初版; 1 hónapja
  ZengChao 72fbb4379a dev:修改location字段 1 hónapja
  WangXuMing 00e6ddc457 debug 1 hónapja
  WangXuMing ee968b0e5c v0.0.5-debug 1 hónapja
  WangXuMing 297d694a34 v0.0.5-功能优化-条文完整性审查 1 hónapja
  WangXuMing db4f9b7b58 Merge branch 'dev' of http://47.109.151.80:15030/CRBC-MaaS-Platform-Project/LQAgentPlatform into dev 1 hónapja
  WangXuMing 5b8f0650a5 v0.0.5-功能优化-条文完整性审查 1 hónapja
  ZengChao 8cdc32747b dev:终极版规范性审查 1 hónapja
  ChenJiSheng 4527536e2d dev:优化了目录分类逻辑的兼容性,添加了第十一个收容项; 1 hónapja
  ChenJiSheng 7b96b1c1ce dev:扩展了目录分类逻辑的兼容性,添加了第十一个收容项; 1 hónapja
  ZengChao 808e188bc3 dev:规范化优化 1 hónapja
22 módosított fájl, 1611 hozzáadás és 272 törlés
  1. 143 77
      core/construction_review/component/ai_review_engine.py
  2. 5 2
      core/construction_review/component/doc_worker/classification/hierarchy_classifier.py
  3. 4 2
      core/construction_review/component/doc_worker/config/prompt.yaml
  4. 3 4
      core/construction_review/component/doc_worker/utils/prompt_loader.py
  5. 21 5
      core/construction_review/component/reviewers/check_completeness/components/result_analyzer.py
  6. BIN
      core/construction_review/component/reviewers/check_completeness/config/Construction_Plan_Content_Specification.csv
  7. 243 0
      core/construction_review/component/reviewers/check_completeness/utils/redis_csv_utils.py
  8. 129 0
      core/construction_review/component/reviewers/outline_check.py
  9. 8 16
      core/construction_review/component/reviewers/prompt/reference_basis_reviewer.yaml
  10. 59 45
      core/construction_review/component/reviewers/prompt/technical_reviewers.yaml
  11. 182 12
      core/construction_review/component/reviewers/reference_basis_reviewer.py
  12. 37 9
      core/construction_review/component/reviewers/timeliness_basis_reviewer.py
  13. 56 4
      core/construction_review/component/reviewers/utils/directory_extraction.py
  14. 23 4
      core/construction_review/component/reviewers/utils/inter_tool.py
  15. 284 0
      core/construction_review/component/reviewers/utils/punctuation_checker.py
  16. 187 0
      core/construction_review/component/reviewers/utils/punctuation_result_processor.py
  17. 2 2
      core/construction_review/component/reviewers/utils/reference_matcher.py
  18. 1 1
      core/construction_review/component/reviewers/utils/timeliness_determiner.py
  19. 30 8
      core/construction_review/workflows/ai_review_workflow.py
  20. 192 80
      core/construction_review/workflows/core_functions/ai_review_core_fun.py
  21. 1 0
      data_pipeline/RAG_recall/rag_miluvs/foundation/ai/rag/retrieval/retrieval.py
  22. 1 1
      foundation/database/base/vector/milvus_vector.py

+ 143 - 77
core/construction_review/component/ai_review_engine.py

@@ -54,7 +54,9 @@ import time
 from dataclasses import dataclass
 from enum import Enum
 from typing import Any, Dict, List, Optional, Sequence
-
+import pandas as pd
+import json
+import ast  # 用于安全解析字符串为Python对象
 import pandas as pd
 
 from core.base.task_models import TaskFileInfo
@@ -88,7 +90,16 @@ from .reviewers.check_completeness.components.result_saver import ResultSaver
 from .reviewers.check_completeness.components.result_analyzer import ResultAnalyzer
 from .reviewers.check_completeness.utils.file_utils import write_json
 from core.construction_review.component.reviewers.base_reviewer import ReviewResult
-
+from .reviewers.outline_check import outline_review_results_df
+from .reviewers.check_completeness.utils.redis_csv_utils import (
+    df_store_to_redis,
+    get_redis_connection,
+    main,
+    display_redis_data,
+    store_header_to_redis,
+    read_from_redis_and_save_csv,
+    store_row_to_redis,
+)
 @dataclass
 class ReviewResult:
     """审查结果"""
@@ -156,6 +167,7 @@ class AIReviewEngine(BaseReviewer):
         self.outline_reviewer = OutlineReviewer()
 
         self.milvus = MilvusManager(MilvusConfig())
+        self.redis_client = get_redis_connection()   # 获取Redis连接
 
     def _process_review_result(self, result):
         """
@@ -215,6 +227,20 @@ class AIReviewEngine(BaseReviewer):
 
         # Step 2: 构建查询对
         query_pairs = query_rewrite_manager.query_extract(unit_content)
+
+        # 检查 query_pairs 是否为 None(模型调用失败时的防护)
+        if query_pairs is None:
+            logger.warning("[RAG增强] Query提取失败,返回空结果")
+            return {
+                'vector_search': [],
+                'retrieval_status': 'query_extract_failed',
+                'file_name': '',
+                'text_content': '',
+                'metadata': {},
+                'entity_results': [],        # 保持结构一致
+                'total_entities': 0          # 保持结构一致
+            }
+
         logger.info(f"[RAG增强] 提取到 {len(query_pairs)} 个查询对")
 
         # Step 3: 根据查询对主实体、辅助实体,进行实体增强召回
@@ -668,7 +694,6 @@ class AIReviewEngine(BaseReviewer):
         #     json.dump(review_content, f, ensure_ascii=False, indent=4)
         name = "completeness_check"
         start_time = time.time()
-        
         try:
             # 验证review_content格式
             if not isinstance(review_content, list):
@@ -732,15 +757,36 @@ class AIReviewEngine(BaseReviewer):
                 max_concurrent=concurrent_workers
             )
             logger.info("  组件初始化完成")
-            
+
 
             # 3. 执行审查
             logger.info("\n[4/5] 开始执行审查...")
             logger.info(f"  使用模型: {llm_client.model_type}")
             logger.info(f"  最大并发数: {concurrent_workers}")
-            
+
             review_results = await review_pipeline.review(documents, specification)
-            review_results_flag = pd.DataFrame(review_results)["chapter_classification"].unique().tolist()
+            review_results_df = pd.DataFrame(review_results)
+            chapter_labels = review_results_df['section_label'].str.split('->').str[0]
+            review_results_df['title'] = chapter_labels
+            # review_results_df.to_csv(Path('temp') / 'document_temp' / '2_spec_review_results.csv', encoding='utf-8-sig', index=False)
+            # csv_file = rf'temp\document_temp\2_spec_review_results.csv'
+            # path2 = rf'temp\document_temp\outlines_review_results.csv'
+            # data_df = pd.read_csv(csv_file, encoding='utf-8-sig')
+            # data_df = review_results_df
+            outline_review_results = outline_review_results_df(data=review_results_df)
+            
+            logger.info(f"[完整性检查] 准备将大纲审查结果存储到Redis,bind_id: {trace_id_idx}")
+            logger.info(f"[完整性检查] 大纲审查结果行数: {len(outline_review_results) if outline_review_results is not None else 'None'}")
+
+            df_store_to_redis(self.redis_client, data=outline_review_results, bind_id=trace_id_idx)
+            
+            logger.info(f"[完整性检查] 数据已成功存储到Redis,bind_id: {trace_id_idx}")
+            
+            df_filtered = review_results_df.drop_duplicates(subset='title', keep='first').reset_index(drop=True)
+            unique_chapter_labels = chapter_labels.unique().tolist()
+            chapter_classifications = df_filtered['chapter_classification']
+            review_results_flag = chapter_classifications.unique().tolist()
+
             # with open(r'temp\document_temp\1_spec_review_results.json', 'w', encoding='utf-8') as f:
             #     json.dump(review_results, f, ensure_ascii=False, indent=4)
             # 统计结果
@@ -754,16 +800,15 @@ class AIReviewEngine(BaseReviewer):
             processed_results = analyzer.process_results(review_results)
             spec_summary_csv_path = Path('temp') / 'document_temp' / '3_spec_review_summary.csv'
             summary_rows = analyzer.build_spec_summary(processed_results)
-            logger.info(f"  规范覆盖汇总结果已保存至: {spec_summary_csv_path}")
+            # logger.info(f"  规范覆盖汇总结果已保存至: {spec_summary_csv_path}")
             summary_rows = pd.DataFrame(summary_rows)
             summary_rows = summary_rows[summary_rows['标签'].isin(review_results_flag)]
-            # summary_rows.to_csv(str(spec_summary_csv_path), encoding='utf-8-sig', index=False)
+            summary_rows.to_csv(str(spec_summary_csv_path), encoding='utf-8-sig', index=False)
             summary_rows = summary_rows.to_dict('records')
             # 生成缺失要点 JSON 列表,便于前端消费
-            #missing_issue_json_path = Path(r'temp\document_temp') / 'spec_review_missing_issues.json'
+
             issues = analyzer.build_missing_issue_list(summary_rows)
-            # with open(r'temp\document_temp\4_spec_review_missing_issues.json', 'w', encoding='utf-8') as f:
-            #     json.dump(issues, f, ensure_ascii=False, indent=4)
+
             # 包装成外层格式化期望的结构
             execution_time = time.time() - start_time
             return {
@@ -776,7 +821,7 @@ class AIReviewEngine(BaseReviewer):
                 },
                 "success": True,
                 "execution_time": execution_time
-            }
+            } 
         except Exception as e:
             execution_time = time.time() - start_time
             error_msg = f"{name} 审查失败: {str(e)}"
@@ -950,74 +995,95 @@ class AIReviewEngine(BaseReviewer):
         """
         logger.info(f"开始大纲审查,trace_id: {trace_id_idx}")
 
-        # 1. 获取整体大纲(1级大纲目录)
-        overall_outline = ""
-
-        # 添加调试信息
-        logger.debug(f"outline_content结构: {list(outline_content.keys()) if outline_content else 'None'}")
-        outline_data = outline_content.get('outline', {})
-        logger.debug(f"outline_data结构: {list(outline_data.keys()) if outline_data else 'None'}")
-        chapters = outline_data.get('chapters', [])
-        logger.info(f"chapters数量: {len(chapters)}")
-
-        for i, chapter in enumerate(chapters):
-            chapter_title = chapter.get('title', 'N/A')
-            chapter_page = chapter.get('page', 'N/A')
-            logger.info(f"章节{i+1}: {chapter_title} (页码: {chapter_page})")
-            overall_outline += f"{chapter_title} (页码: {chapter_page})\n"
-
-        logger.info(f"生成的overall_outline长度: {len(overall_outline)}")
-        if overall_outline:
-            logger.info(f"overall_outline内容: {overall_outline[:200]}...")
-
-        # 2. 获取大纲各章节及其子目录的详细信息
-        detailed_outline = []
-
-        for chapter in chapters:
-            # 将每个章节作为整体项,包含标题、页码和子目录
-            chapter_content = f"\n{chapter['title']} (页码: {chapter['page']})\n"
-
-            # 添加子目录(如果有)
-            subsections = chapter.get('subsections', [])
-            if subsections:
-                chapter_content += "包含子目录:\n"
-                for subsection in subsections:
-                    indent = "  " * (subsection['level'] - 1)
-                    chapter_content += f"{indent}- {subsection['title']} (页码: {subsection['page']})\n"
-
-            # 将完整章节内容作为一个项添加到列表
-            detailed_outline.append(chapter_content)
-
-
-
-        logger.info(f"提取整体大纲完成{overall_outline}")
-        logger.info(f"提取详细大纲完成{detailed_outline}")
-
-        # 准备审查数据
-        review_data = {
-            'outline_content': outline_content,
-            'overall_outline': overall_outline,
-            'detailed_outline': detailed_outline,
-            'state': state,
-            'stage_name': stage_name
-        }
-
-        # 调用outline_reviewer进行审查
+        # CSV文件路径
+        csv_path = Path('temp') / 'document_temp' / 'outlines_review_results.csv'
+        
+        # 存储所有缺失项
+        missing_items = []
+        
         try:
-            outline_review_result = await self.outline_reviewer.outline_review(review_data, trace_id_idx, state,stage_name)
+            # 从Redis读取并保存为新的CSV文件
+            rows_df = read_from_redis_and_save_csv(self.redis_client, bind_id=trace_id_idx)
+            df = rows_df
+            
+            # 检查 df 是否为 None
+            if df is None:
+                logger.error(f"[大纲审查] Redis中不存在ID '{trace_id_idx}' 的数据,无法进行大纲审查")
+                return {
+                    'outline_review_result': {
+                        "response": [],
+                    },
+                    'error': f'Redis中不存在ID \'{trace_id_idx}\' 的数据'
+                }
+            
+            logger.info(f"[大纲审查] 成功从Redis读取数据,共 {len(df)} 行")
+            
+            # 兼容新旧字段名
+            chapter_label_col = 'chapter_label' if 'chapter_label' in df.columns else 'section_label_first'
+            review_results_col = 'review_results_summary' if 'review_results_summary' in df.columns else 'merged_review_results'
+            
+            # 遍历每一行
+            for index, row in df.iterrows():
+                chapter_label = row.get(chapter_label_col, '')
+                merged_results_str = row.get(review_results_col, '')
+                
+                # 解析review_results_summary字典字符串
+                try:
+                    if pd.isna(merged_results_str) or merged_results_str == '':
+                        merged_results = {}
+                    elif isinstance(merged_results_str, dict):
+                        # 如果已经是字典,直接使用
+                        merged_results = merged_results_str
+                    else:
+                        # 尝试使用ast.literal_eval解析
+                        merged_results = ast.literal_eval(merged_results_str)
+                except (ValueError, SyntaxError):
+                    try:
+                        # 尝试使用json.loads解析
+                        merged_results = json.loads(merged_results_str)
+                    except (json.JSONDecodeError, TypeError):
+                        logger.warning(f"第 {index} 行无法解析review_results_summary: {merged_results_str}")
+                        merged_results = {}
+                
+                # 检查字典中的每个字段
+                if isinstance(merged_results, dict):
+                    for field_name, field_value in merged_results.items():
+                        # 检查列表是否为空
+                        if isinstance(field_value, list) and len(field_value) == 0:
+                            # 生成缺失项
+                            missing_item = {
+                                "check_item_code": "catalogue_completeness_check",
+                                "check_result": {
+                                    "issue_point": f"{field_name}缺失",
+                                    "location": "",
+                                    "suggestion": "",
+                                    "reason": "",
+                                    "risk_level": ""
+                                }
+                            }
+                            missing_items.append(missing_item)
+                            logger.info(f"发现缺失项: 章节[{chapter_label}] 字段[{field_name}]")
+            
+            logger.info(f"大纲审查完成,共发现 {len(missing_items)} 个缺失项")
+            
+        except FileNotFoundError:
+            logger.error(f"CSV文件不存在: {csv_path}")
+            return {
+                'outline_review_result': [],
+                'error': f'CSV文件不存在: {csv_path}'
+            }
         except Exception as e:
-            logger.warning(f"大纲审查失败,但返回提取结果: {str(e)}")
-            outline_review_result = None
-
-        # 确保目录存在
-        # import os
-        # os.makedirs("temp/outline_result_temp", exist_ok=True)
-
-        # # with open("temp/outline_result_temp/outline_result.json","w",encoding="utf-8") as f:
-        # #     json.dump(outline_review_result,f,ensure_ascii=False,indent=4)
-        # 返回提取的大纲结果和审查结果
+            logger.error(f"大纲审查失败: {str(e)}", exc_info=True)
+            return {
+                'outline_review_result': [],
+                'error': f'大纲审查失败: {str(e)}'
+            }
+        
         return {
-            'outline_review_result': outline_review_result
+            'outline_review_result':
+                {
+                    "response": missing_items,
+                }
         }
     
     async def reference_basis_reviewer(self, review_data: Dict[str, Any], trace_id: str,

+ 5 - 2
core/construction_review/component/doc_worker/classification/hierarchy_classifier.py

@@ -8,6 +8,7 @@ from __future__ import annotations
 
 from collections import Counter
 import asyncio
+import json
 from typing import Any, Dict, List, Optional
 
 from ..interfaces import HierarchyClassifier as IHierarchyClassifier
@@ -99,7 +100,8 @@ class HierarchyClassifier(IHierarchyClassifier):
                 level1_title=level1_item["title"],
                 level2_titles=level2_titles
             )
-            
+            # with open('temp/document_temp/prompt.txt', "w", encoding="utf-8") as f:
+            #     f.write(prompt["user"])
             # 构建消息列表
             messages = [
                 {"role": "system", "content": prompt["system"]},
@@ -110,7 +112,8 @@ class HierarchyClassifier(IHierarchyClassifier):
         
         # 批量异步调用LLM API
         llm_results = await self.llm_client.batch_call_async(llm_requests)
-        
+        # with open('temp/document_temp/llm_results.json', "w", encoding="utf-8") as f:
+        #     json.dump(llm_results, f, ensure_ascii=False, indent=4)
         # 处理分类结果
         classified_items = []
         category_stats = Counter()

+ 4 - 2
core/construction_review/component/doc_worker/config/prompt.yaml

@@ -6,6 +6,7 @@ toc_classification:
     - 一级目录名称本身是重要的分类依据,即使没有二级目录,也要根据一级目录名称进行分类;
     - 必须从提供的标准类别中选择一个,所有标准类别都是平等的,没有偏好,不能创建新类别;
     - 如果待分类的目录与多个标准类别都相关,选择最匹配的一个;
+    - 注意:其他资料类别有自己标准,要严格符合其他资料类别的标准才能分到这个类别;
 
     - /no_think
   user_template: |
@@ -17,8 +18,9 @@ toc_classification:
     {{ level2_titles }}
 
     分类标准(一级目录名称及其包含的二级目录集合):
+
     {{ classification_standards }}
-    
+        - 十一、非标准项(用于接收不符合前十项类别的目录项)
 
     输出要求(只输出 JSON):
     {
@@ -38,7 +40,7 @@ toc_classification:
     - 施工管理及作业人员配备与分工 -> management
     - 验收要求 -> acceptance
     - 其他资料 -> other
-
+    - 非标准项 -> no_standard
 
 
 

+ 3 - 4
core/construction_review/component/doc_worker/utils/prompt_loader.py

@@ -103,15 +103,14 @@ class PromptLoader:
             level2_count = len(level2_list)
             level2_text = "、".join(level2_list)
             
-            # 将一级目录名称和二级目录集合都包含在分类标准中
-            # 强调:匹配时只看核心标题名称,忽略编号前缀
+            # 简化格式,只包含核心标题和二级目录列表
             if level2_count > 0:
                 standards_lines.append(
-                    f"    - {number_prefix}、{level1}(核心标题名称:{level1};包含的二级目录:{level2_text}等{level2_count}个方面):匹配核心标题「{level1}」,包含{level2_text}等{level2_count}个方面。"
+                    f"    - {number_prefix}、{level1}(包含{level2_text}等{level2_count}个方面)"
                 )
             else:
                 standards_lines.append(
-                    f"    - {number_prefix}、{level1}(核心标题名称:{level1}):匹配核心标题「{level1}」。"
+                    f"    - {number_prefix}、{level1}"
                 )
         
         self._classification_standards = "\n".join(standards_lines)

+ 21 - 5
core/construction_review/component/reviewers/check_completeness/components/result_analyzer.py

@@ -227,11 +227,25 @@ class ResultAnalyzer(IResultAnalyzer):
         """
         all_issues = []
         metadata = {}
-
+        suorces_eum = {
+            "basis": "编制依据",
+            "overview": "工程概况",
+            "plan": "施工计划",
+            "technology": "施工工艺技术",
+            "safety": "安全保证措施",
+            "quality": "质量保证措施",
+            "environment": "环境保证措施",
+            "management": "施工管理及作业人员配备与分工",
+            "acceptance": "验收要求",
+            "other": "其他资料"
+            }
         for row in summary_rows:
             level2 = (row.get("二级目录") or "").strip()
             requirement = (row.get("内容要求") or "").strip()
-
+            reference_source = '桥梁公司危险性较大工程管理实施细则(2025版)'
+            reason= f"参照:{reference_source} 中的内容要求,{row.get('section_label', '')}内容属于,专项施工方案内容要求中的 【{suorces_eum[row.get("标签", "")]}】 板块,应包含{requirement}"
+            review_references = (row.get("依据") or "").strip()
+            
             missing_points_raw = row.get("缺失的要点", "")
             missing_points = self._parse_list_field(missing_points_raw)
             if not missing_points:
@@ -244,9 +258,9 @@ class ResultAnalyzer(IResultAnalyzer):
             requirement_list = requirement.split(':')[-1].split(';')
             requirement_text = ';'.join([requirement_list[i-1] for i in missing_points])
             issue_point = (
-                f"[{level2}内容缺失]未包含要点:{requirement_text}"
+                f"{row.get('section_label', '')}下缺失{suorces_eum[row.get("标签", "")]}中的【{level2}】内容"
             )
-            suggestion = f"补充:{requirement_text}" if requirement else "补充缺失要点内容"
+            suggestion = f"建议补充:{requirement_text}" if requirement else "补充缺失要点内容"
             risk_level = self._map_risk_level(len(missing_points))
 
             # 构建问题项并添加到列表
@@ -254,8 +268,10 @@ class ResultAnalyzer(IResultAnalyzer):
                 "issue_point": issue_point,
                 "location": row.get("section_label", ""),
                 "suggestion": suggestion,
-                "reason": requirement,
+                "reason": reason,
                 "risk_level": risk_level,
+                "review_references": review_references,
+                "reference_source": reference_source
             }
             all_issues.append(issue_item)
 

BIN
core/construction_review/component/reviewers/check_completeness/config/Construction_Plan_Content_Specification.csv


+ 243 - 0
core/construction_review/component/reviewers/check_completeness/utils/redis_csv_utils.py

@@ -0,0 +1,243 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+Redis CSV处理器
+功能:读取CSV文件,存入Redis,然后一次性读取并保存为新CSV文件
+"""
+
+import csv
+import pandas as pd
+import redis
+import json
+import configparser
+import os
+
+from foundation.observability.logger.loggering import server_logger as logger
+
+# 从config.ini读取Redis配置
+config = configparser.ConfigParser()
+config_path = os.path.join(os.path.dirname(__file__), '../../../../../../config/config.ini')
+config.read(config_path, encoding='utf-8')
+
+# Redis配置
+REDIS_HOST = config.get('redis', 'REDIS_HOST', fallback='localhost')
+REDIS_PORT = config.getint('redis', 'REDIS_PORT', fallback=6379)
+REDIS_PASSWORD = config.get('redis', 'REDIS_PASSWORD', fallback='')
+REDIS_DB = config.getint('redis', 'REDIS_DB', fallback=0)
+
+# 绑定ID
+BIND_ID = '2d5d99c823a6b1a19f770932f3237bf8-1768535328'
+
+# 文件路径
+INPUT_CSV = 'outlines_review_results.csv'
+OUTPUT_CSV = 'outlines_review_results_redis.csv'
+
+
+def get_redis_connection():
+    """获取Redis连接"""
+    try:
+        r = redis.Redis(
+            host=REDIS_HOST,
+            port=REDIS_PORT,
+            password=REDIS_PASSWORD,
+            db=REDIS_DB,
+            decode_responses=True
+        )
+        # 测试连接
+        r.ping()
+        logger.info(f"[OK] Redis连接成功 (host={REDIS_HOST}, port={REDIS_PORT})")
+        return r
+    except Exception as e:
+        logger.info(f"[ERROR] Redis连接失败: {e}")
+        raise
+
+
+def store_row_to_redis(redis_client, bind_id, row_key, row_data):
+    """将单行数据存储到Redis"""
+    row_json = json.dumps(row_data, ensure_ascii=False)
+    redis_client.hset(bind_id, row_key, row_json)
+
+
+def store_header_to_redis(redis_client, bind_id, header):
+    """将表头存储到Redis"""
+    header_json = json.dumps(header, ensure_ascii=False)
+    redis_client.hset(bind_id, "header", header_json)
+
+
+def df_store_to_redis(redis_client, data=None, bind_id=None):
+    """读取CSV文件并按行存入Redis
+    
+    Args:
+        redis_client: Redis客户端
+        data: DataFrame数据(可选),如果为None则从CSV文件读取
+        bind_id: 绑定ID
+    
+    Returns:
+        list: 存储的行数据列表
+    """
+    logger.info(f"\n[Redis存储] 开始存储数据到Redis,bind_id: {bind_id}")
+    
+    # 调试信息:记录Redis连接信息
+    try:
+        logger.info(f"[DEBUG] Redis连接信息: host={redis_client.connection_pool.connection_kwargs.get('host')}, "
+                   f"port={redis_client.connection_pool.connection_kwargs.get('port')}, "
+                   f"db={redis_client.connection_pool.connection_kwargs.get('db')}")
+    except Exception as e:
+        logger.warning(f"[DEBUG] 无法获取Redis连接信息: {e}")
+    
+    if data is None:
+        # 使用pandas读取CSV文件
+        logger.info(f"[Redis存储] 从CSV文件读取数据: {INPUT_CSV}")
+        df = pd.read_csv(INPUT_CSV, encoding='utf-8-sig')
+        # header = df.columns.tolist()
+        rows = df.to_dict('records')
+        
+        logger.info(f"[OK] 读取到 {len(rows)} 行数据")
+        # logger.info(f"[OK] CSV表头: {header}")
+    else:
+        logger.info(f"[Redis存储] 使用传入的DataFrame数据,共 {len(data)} 行")
+        rows = data.to_dict('records')
+    
+    # 清空Redis中该ID的数据
+    redis_client.delete(bind_id)
+    logger.info(f"[OK] 清空Redis中ID '{bind_id}' 的旧数据")
+    
+    # 按行循环存入Redis(使用Hash结构)
+    for idx, row in enumerate(rows, start=1):
+        store_row_to_redis(redis_client, bind_id, f"row_{idx}", row)
+    
+    # # 存储表头
+    # store_header_to_redis(redis_client, BIND_ID)
+    
+    # 存储行数
+    redis_client.hset(bind_id, "row_count", len(rows))
+    
+    logger.info(f"[OK] 成功将 {len(rows)} 行数据存入Redis")
+    logger.info(f"[OK] Redis Key: {bind_id}")
+    logger.info(f"[Redis存储] 数据存储完成")
+    
+    return rows
+
+
+def read_from_redis_and_save_csv(redis_client, bind_id=None, csv_save_path=None):
+    """从Redis一次性读取所有数据并保存为CSV文件
+    
+    Args:
+        redis_client: Redis客户端
+        bind_id: 绑定ID
+        csv_save_path: CSV文件保存路径(可选)
+    
+    Returns:
+        pandas.DataFrame: 包含数据的DataFrame,如果Redis中不存在数据则返回空DataFrame
+    """
+    logger.info(f"\n从Redis读取数据 (ID: {bind_id})")
+    
+    # 调试信息:记录Redis连接信息
+    try:
+        logger.info(f"[DEBUG] Redis连接信息: host={redis_client.connection_pool.connection_kwargs.get('host')}, "
+                   f"port={redis_client.connection_pool.connection_kwargs.get('port')}, "
+                   f"db={redis_client.connection_pool.connection_kwargs.get('db')}")
+    except Exception as e:
+        logger.warning(f"[DEBUG] 无法获取Redis连接信息: {e}")
+    
+    # 调试信息:检查键是否存在
+    key_exists = redis_client.exists(bind_id)
+    logger.info(f"[DEBUG] Redis键 '{bind_id}' 存在状态: {key_exists}")
+    
+    # 调试信息:列出所有匹配的键
+    try:
+        all_keys = redis_client.keys(f"*{bind_id}*")
+        logger.info(f"[DEBUG] Redis中匹配的键: {all_keys}")
+    except Exception as e:
+        logger.warning(f"[DEBUG] 无法列出Redis键: {e}")
+    
+    # 检查数据是否存在
+    if not key_exists:
+        logger.warning(f"[WARN] Redis中不存在ID '{bind_id}' 的数据,返回空DataFrame")
+        return pd.DataFrame()  # 返回空DataFrame而不是None
+    
+    # # 获取表头
+    # header_json = redis_client.hget(bind_id, "header")
+    # if not header_json:
+    #     logger.info("[ERROR] 未找到表头数据")
+    #     return
+    
+    # header = json.loads(header_json)
+    # logger.info(f"[OK] 读取到表头: {header}")
+    
+    # 获取行数
+    row_count = int(redis_client.hget(bind_id, "row_count") or 0)
+    logger.info(f"[OK] 总行数: {row_count}")
+    
+    # 读取所有行数据
+    rows = []
+    for idx in range(1, row_count + 1):
+        row_key = f"row_{idx}"
+        row_json = redis_client.hget(bind_id, row_key)
+        if row_json:
+            row = json.loads(row_json)
+            rows.append(row)
+    
+    logger.info(f"[OK] 成功从Redis读取 {len(rows)} 行数据")
+    
+    # 使用pandas保存为新的CSV文件
+    df_output = pd.DataFrame(rows)
+    if csv_save_path:
+        df_output.to_csv(OUTPUT_CSV, index=False, encoding='utf-8-sig')
+    
+    logger.info(f"[OK] 数据已保存到: {OUTPUT_CSV}")
+    
+    # 读取完成后删除Redis中的bind_id数据
+    redis_client.delete(bind_id)
+    logger.info(f"[OK] 已删除Redis中ID '{bind_id}' 的数据")
+    
+    return df_output
+
+
+def display_redis_data(redis_client, bind_id=None):
+    """显示Redis中存储的数据摘要"""
+    logger.info(f"\nRedis数据摘要 (ID: {bind_id})")
+    logger.info("-" * 50)
+    
+    # 获取所有字段
+    all_fields = redis_client.hkeys(bind_id)
+    logger.info(f"字段数量: {len(all_fields)}")
+    logger.info(f"字段列表: {all_fields}")
+    
+    logger.info("-" * 50)
+
+
+def main():
+    """主函数"""
+    logger.info("=" * 60)
+    logger.info("Redis CSV 处理器")
+    logger.info("=" * 60)
+    
+    try:
+        # 获取Redis连接
+        redis_client = get_redis_connection()
+        
+        # 读取CSV并存入Redis
+        df_store_to_redis(redis_client)
+        
+        # 显示Redis数据摘要
+        display_redis_data(redis_client)
+        
+        # 从Redis读取并保存为新的CSV文件
+        rows = read_from_redis_and_save_csv(redis_client)
+        # 将字典列表转换为JSON字符串列表
+        # rows_str = [json.dumps(row, ensure_ascii=False) for row in rows]
+        # logger.info(f"[OK] 保存到CSV文件成功 (\n{header}\n{'\n'.join(rows_str)})")
+        logger.info(f"{pd.DataFrame(rows)}")
+        logger.info("\n" + "=" * 60)
+        logger.info("[OK] 处理完成!")
+        logger.info("=" * 60)
+        
+    except Exception as e:
+        logger.info(f"\n[ERROR] 处理过程中发生错误: {e}")
+        import traceback
+        traceback.logger.info_exc()
+
+
+if __name__ == "__main__":
+    main()

+ 129 - 0
core/construction_review/component/reviewers/outline_check.py

@@ -0,0 +1,129 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+使用pandas读取CSV文件
+"""
+import os
+import pandas as pd
+import json
+import ast  # 用于安全解析字符串为Python对象
+
+
+def parse_review_result(review_result_str):
+    """
+    解析review_result字符串为字典
+    """
+    try:
+        # 尝试解析JSON格式的字符串
+        if isinstance(review_result_str, dict):
+            return review_result_str
+        if pd.isna(review_result_str) or review_result_str == '':
+            return {}
+        return json.loads(review_result_str)
+    except (json.JSONDecodeError, TypeError):
+        try:
+            # 尝试使用ast.literal_eval解析
+            return ast.literal_eval(review_result_str)
+        except (ValueError, SyntaxError):
+            return {}
+
+def merge_dict_fields_and_deduplicate(group):
+    """
+    合并字典字段的值列表并去重
+    """
+    merged = {}
+    for item in group:
+        if isinstance(item, dict):
+            for key, value in item.items():
+                if key not in merged:
+                    merged[key] = []
+                # 如果值是列表,则合并;否则添加为单个元素
+                if isinstance(value, list):
+                    merged[key].extend(value)
+                else:
+                    merged[key].append(value)
+    # 对每个字段的值去重
+    for key in merged:
+        # 使用dict.fromkeys去重同时保持顺序
+        merged[key] = list(dict.fromkeys(merged[key]))
+    return merged
+
+
+def outline_review_results_df(data, path=None):
+    """
+    处理大纲审查结果DataFrame,生成合并后的审查结果
+    
+    Args:
+        data: 输入的DataFrame数据
+        path: 输出CSV文件路径(可选)
+        
+    Returns:
+        处理后的DataFrame,包含以下列:
+        - chapter_label: 章节标签(如"第一章编制依据")
+        - review_results_summary: 合并后的审查结果字典
+        - chapter_classification: 章节分类(如"basis", "overview"等)
+    """
+    try:
+        df = data
+        # 提取章节标签的第一部分
+        chapter_labels = df['section_label'].str.split('->').str[0]
+        df['title'] = chapter_labels
+        df_filtered = df.drop_duplicates(subset='title', keep='first').reset_index(drop=True)
+        unique_chapter_labels = chapter_labels.unique().tolist()
+        chapter_classifications = df_filtered['chapter_classification']
+        
+        # 创建新的DataFrame来存储结果
+        new_df = pd.DataFrame()
+        
+        # 存储章节标签
+        new_df['chapter_label'] = unique_chapter_labels
+        
+        # 检查是否存在review_result列
+        if 'review_result' in df.columns:
+            df['parsed_review_result'] = df['review_result'].apply(parse_review_result)
+            
+            # 按title分组,使用字典推导式创建合并后的数据
+            grouped_data = df.groupby('title')['parsed_review_result']
+            
+            # 使用字典推导式合并相同title的字典的字段的值列表并去重
+            merged_dict = {
+                title: merge_dict_fields_and_deduplicate(group)
+                for title, group in grouped_data
+            }
+            
+            # 创建合并后的数据帧
+            merged_data = pd.DataFrame(list(merged_dict.items()), columns=['title', 'review_results_summary'])
+            
+            # 将合并后的数据赋值给新列,按照unique_chapter_labels的顺序
+            new_df['review_results_summary'] = merged_data.set_index('title').reindex(unique_chapter_labels)['review_results_summary'].tolist()
+        else:
+            # 如果没有review_result列,则填充空值
+            new_df['review_results_summary'] = [''] * len(unique_chapter_labels)
+        
+        # 存储章节分类
+        new_df['chapter_classification'] = chapter_classifications.values
+        
+        print(f"章节分类: {chapter_classifications.values}")
+        print(f"处理后的DataFrame:\n{new_df}")
+        
+        if path:
+            # 判断文件是否存在
+            if os.path.exists(path):
+                # 文件已存在,追加写入,不写表头
+                new_df.to_csv(path, mode='a', index=False, encoding='utf-8-sig', header=False)
+            else:
+                # 文件不存在,首次写入,包含表头
+                new_df.to_csv(path, mode='w', index=False, encoding='utf-8-sig')
+        
+        return new_df
+        
+    except FileNotFoundError:
+        print(f"错误: 文件不存在!")
+    except Exception as e:
+        print(f"读取CSV文件时发生错误: {e}")
+
+if __name__ == '__main__':
+    csv_file = rf'temp\document_temp\2_spec_review_results.csv'
+    path2 = rf'temp\document_temp\outlines_review_results.csv'
+    df = pd.read_csv(csv_file, encoding='utf-8-sig')
+    outline_review_results_df(data=df, path=path2)

+ 8 - 16
core/construction_review/component/reviewers/prompt/reference_basis_reviewer.yaml

@@ -1,25 +1,17 @@
 reference_basis_reviewer:
   system_prompt: |
     忘掉你之前所有的内容,完成下面的任务。
-    你是一个“格式校验专家(validator)”,只检查格式是否正确,对内容不做任何检查和修改
+    你是一个“格式校验专家(validator)”,只检查格式是否正确,对内容不做任何检查、建议、修改,忽略全半角符号的区别
 
-    =========================
-    【正确格式】
-    《名称》(编号)
-    =========================
-
-    【格式规则(只检查格式)】
-    1) 名称部分必须被中文书名号《》包裹
+    【检查内容】
+    1) 名称部分必须被书名号《》包裹
 
     2) 编号部分必须使用括号包裹
 
-    3) 《名称》与(编号)一一对应
-       - 一个《名称》应对应一个(编号)
-    
-    4) 不考虑任何空格问题 
+    3) 一个《名称》应对应一个(编号)
 
-    【判定优先级(必须按顺序执行)
-    1) 只要违反任意规则 => issue_point="编制依据格式错误" 且 risk_level="风险"
+    【判定过程】
+    1) 只要违反任意规则 => issue_point="编制依据格式错误" 且 risk_level="中风险"
     2) 否则 => issue_point="编制依据格式正确" 且 risk_level="无风险"
 
     【输出硬约束】
@@ -40,13 +32,13 @@ reference_basis_reviewer:
       - 必须与原输入文本完全一致(原样复制)
 
     - suggestion:
-      - 告诉添加或者修改,不能编造内容,格式正确时可填写 "无"
+      - 对错误内容提出修改建议,格式正确时可填写 "无"
 
     - reason(只能描述格式,不得涉及语义):
       - 简洁的说明存在的问题,格式正确时可填写 "无"
 
     - risk_level:
-      - 只能是 "无风险" 或 "风险"
+      - 只能是 "无风险" 或 "风险"
 
     【输出格式规范】
     - 只输出 JSON 数组

+ 59 - 45
core/construction_review/component/reviewers/prompt/technical_reviewers.yaml

@@ -3,94 +3,108 @@
 # 非参数合规性检查功能 - 审查 安全相关/强制性条文相关知识库
 non_parameter_compliance_check:
   system_prompt: |
-    你是施工方案非参数合规性审查专家,专门负责检查安全相关条文和强制性标准的符合性。
+    # 角色: 
+    你是施工方案非参数合规性审查专家,专门负责检查安全相关条文和非参数的强制性标准的符合性。
 
-    审查参考:
+    ## 审查参考:
     {review_references}
 
-    审查要求:
-    - 重点检查安全相关强制性条文符合性
+    ## 审查要求:
+    - 重点检查安全相关非参数性的条文内容
     - 识别违反安全标准和强制规范的问题
     - 关注安全防护措施、安全风险评估、安全管理要求
     - 简明扼要指出违规内容和整改要求
     - 风险等级分类:
-      * 高风险:影响审查结论、可能导致法律问题或严重安全隐患
-      * 中风险:影响专业表达、可能导致理解偏差或一般性问题
-      * 低风险:形式问题、不影响实质内容和安全
-
-    注意事项:
-    1. 务必结合语境进行分析检查
-    2. 对于表格制表符、不需要检查
-    3. 对于术语概念不得曲解
-    4. 没有明显安全合规问题的内容不予检查,输出无明显问题
-    5. 已检查出的问题项仅输出一次检查结果,禁止对同一内容重复检查
-    6. 若审查参考与审查内容相关性过低,不予检查,输出无明显问题
-    7. 务必注意,只有在审查参考与审查内容相关时才能依据审查参考的内容进行问题检查,否则输出无明显问题
-    8. 审查依据务必是基于审查参考,如果审查参考不足以对审查内容提供参考时,不予检查,输出无明显问题,不得自行编造审查依据
+      * 高风险: 影响审查结论、可能导致法律问题或严重安全隐患
+      * 中风险: 影响专业表达、可能导致理解偏差或一般性问题
+      * 低风险: 形式问题、不影响实质内容和安全
 
   user_prompt_template: |
-    请审查以下内容的安全合规性和强制性标准符合性:
+    请审查以下内容的安全合规性和非参数性内容(不含数字):
 
-    {review_content}
 
-    输出格式:务必须严格按照以下标准JSON格式输出审查结果:
-    如果未发现问题,请输出:无明显问题
-    如果发现问题,请按以下格式输出:
-    location字段直接输出原字段内容,不得猜测
+    ## 待审查内容:
+    {review_content}  
+
+    ## 强制标准:
+    1. 审查结果必须基于<审查参考>,如果审查参考不足以对审查内容提供参考时,不予检查,输出无明显问题,不得自行编造审查依据
+    2. 审查依据必须使用<审查参考>中的文件信息不得引用其他外部信息,如果无文件信息,不予检查,输出无明显问题
+    3. 对于表格制表符不需要检查
+    4. 对于术语概念不得曲解
+    5. 没有明显参数问题的内容不予检查,输出无明显问题
+    6. 已检查出的问题项仅输出一次检查结果,禁止对同一内容重复检查
+    7. 若审查参考与审查内容相关性过低,不予检查,输出无明显问题
+    8. 务必注意,只有在审查参考与审查内容相关时才能依据审查参考的内容进行问题检查,否则输出无明显问题
+    
+    ## rules:
+    - 输出格式:务必须严格按照以下标准JSON格式输出审查结果:
+    - 如果未发现问题,请输出:无明显问题
+    - 如果发现问题,请按以下格式输出:
+    - location字段直接输出原字段内容,不得猜测
+    - 务必遵循<强制标准>进行审查
+    - 尤其要注意<强制标准>中的第1条
     ```json
     {{
       "issue_point": "问题标题描述",
       "location": "当前问题对应的原始条款内容及位置,如六、验收标准 (页码: 85),以及其语境上下文",
       "suggestion": "具体的修改建议内容",
       "reason": "问题的原因分析和依据说明",
-      "risk_level": ""
+      "risk_level": "高风险/中风险/低风险"
     }}
     ```
 
+
 # 参数合规性检查功能 - 审查 实体概念/工程术语相关知识库
 parameter_compliance_check:
   system_prompt: |
+    # 角色: 
     你是施工方案参数合规性审查专家,专门负责检查技术参数、实体概念和工程术语的准确性和合理性。
 
-    审查参考:
+    ## 审查参考:
     {review_references}
 
-    审查要求:
+    ## 审查要求:
     - 重点检查技术参数的准确性和合理性
     - 识别参数错误或不合理设置
     - 检查实体概念和工程术语的参数是否正确使用
     - 验证设计值与标准的符合性
     - 简明扼要指出参数问题和修正建议
     - 风险等级分类:
-      * 高风险:影响审查结论、可能导致法律问题或严重安全隐患
-      * 中风险:影响专业表达、可能导致理解偏差或一般性问题
-      * 低风险:形式问题、不影响实质内容和安全
-
-    注意事项:
-    1. 务必结合语境进行分析检查
-    2. 对于表格制表符、不需要检查
-    3. 对于术语概念不得曲解
-    4. 没有明显参数问题的内容不予检查,输出无明显问题
-    5. 已检查出的问题项仅输出一次检查结果,禁止对同一内容重复检查
-    6. 若审查参考与审查内容相关性过低,不予检查,输出无明显问题
-    7. 务必注意,只有在审查参考与审查内容相关时才能依据审查参考的内容进行问题检查,否则输出无明显问题
-    8. 审查依据务必是基于审查参考,如果审查参考不足以对审查内容提供参考时,不予检查,输出无明显问题,不得自行编造审查依据
+      * 高风险: 影响审查结论、可能导致法律问题或严重安全隐患
+      * 中风险: 影响专业表达、可能导致理解偏差或一般性问题
+      * 低风险: 形式问题、不影响实质内容和安全
+
+
 
   user_prompt_template: |
     请审查以下内容的技术参数精确性、实体概念和工程术语的正确性:
 
-    {review_content}
+    ## 待审查内容:
+    {review_content}  
+
+    ## 强制标准:
+    1. 审查结果必须基于<审查参考>,如果审查参考不足以对审查内容提供参考时,不予检查,输出无明显问题,不得自行编造审查依据
+    2. 审查依据必须使用<审查参考>中的文件信息不得引用其他外部信息,如果无文件信息,不予检查,输出无明显问题
+    3. 对于表格制表符不需要检查
+    4. 对于术语概念不得曲解
+    5. 没有明显参数问题的内容不予检查,输出无明显问题
+    6. 已检查出的问题项仅输出一次检查结果,禁止对同一内容重复检查
+    7. 若审查参考与审查内容相关性过低,不予检查,输出无明显问题
+    8. 务必注意,只有在审查参考与审查内容相关时才能依据审查参考的内容进行问题检查,否则输出无明显问题
 
-    输出格式:务必须严格按照以下标准JSON格式输出审查结果:
-    如果未发现问题,请输出:无明显问题
-    如果发现问题,请按以下格式输出:
-    location字段直接输出原字段内容,不得猜测
+    ## rules:
+    - 输出格式:务必须严格按照以下标准JSON格式输出审查结果:
+    - 如果未发现问题,请输出:无明显问题
+    - 如果发现问题,请按以下格式输出:
+    - location字段直接输出原字段内容,不得猜测
+    - 务必遵循<强制标准>进行审查
+    - 尤其要注意<强制标准>中的第1条
     ```json
     {{
       "issue_point": "问题标题描述",
       "location": "当前问题对应的原始条款内容及位置,如六、验收标准 (页码: 85),以及其语境上下文",
       "suggestion": "具体的修改建议内容",
       "reason": "问题的原因分析和依据说明",
-      "risk_level": ""
+      "risk_level": "高风险/中风险/低风险"
     }}
     ```

+ 182 - 12
core/construction_review/component/reviewers/reference_basis_reviewer.py

@@ -1,17 +1,105 @@
 from __future__ import annotations
 
 import asyncio
+import json
 import time
 import yaml
 from typing import Any, Dict, List, Optional
+from functools import partial
 
+from langchain_milvus import Milvus, BM25BuiltInFunction
+from foundation.infrastructure.config.config import config_handler
+from foundation.ai.models.model_handler import model_handler as mh
 from core.construction_review.component.reviewers.utils.directory_extraction import BasisItem, BasisItems
 from core.construction_review.component.reviewers.utils.inter_tool import InterTool
 from core.construction_review.component.reviewers.utils.prompt_loader import PromptLoader
+from core.construction_review.component.reviewers.utils.punctuation_checker import check_punctuation
+from core.construction_review.component.reviewers.utils.punctuation_result_processor import process_punctuation_results
+from core.construction_review.component.reviewers.utils.reference_matcher import match_reference_files
 from foundation.observability.logger.loggering import server_logger as logger
 from langchain_core.prompts import ChatPromptTemplate
 from langchain_openai import ChatOpenAI
 
+class BasisSearchEngine:
+    """编制依据向量搜索引擎"""
+
+    def __init__(self):
+        self.emdmodel = None
+        self._initialize()
+
+    def _initialize(self):
+        """初始化搜索引擎"""
+        try:
+            # 连接配置
+            self.host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
+            self.port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
+            self.user = config_handler.get('milvus', 'MILVUS_USER')
+            self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
+
+            # 初始化嵌入模型
+            self.emdmodel = mh._get_lq_qwen3_8b_emd()
+            logger.info("嵌入模型初始化成功")
+
+        except Exception as e:
+            logger.error(f" BasisSearchEngine 初始化失败: {e}")
+
+    def hybrid_search(self, collection_name: str, query_text: str,
+                     top_k: int = 3, ranker_type: str = "weighted",
+                     dense_weight: float = 0.7, sparse_weight: float = 0.3):
+        try:
+
+            # 连接到现有集合
+            connection_args = {
+                "uri": f"http://{self.host}:{self.port}",
+                "user": self.user,
+                "db_name": "lq_db"
+            }
+
+            if self.password:
+                connection_args["password"] = self.password
+
+            vectorstore = Milvus(
+                embedding_function=self.emdmodel,
+                collection_name=collection_name,
+                connection_args=connection_args,
+                consistency_level="Strong",
+                builtin_function=BM25BuiltInFunction(),
+                vector_field=["dense", "sparse"]
+            )
+
+            # 执行混合搜索
+            if ranker_type == "weighted":
+                results = vectorstore.similarity_search(
+                    query=query_text,
+                    k=top_k,
+                    ranker_type="weighted",
+                    ranker_params={"weights": [dense_weight, sparse_weight]}
+                )
+            else:  # rrf
+                results = vectorstore.similarity_search(
+                    query=query_text,
+                    k=top_k,
+                    ranker_type="rrf",
+                    ranker_params={"k": 60}
+                )
+
+            # 格式化结果,保持与其他搜索方法一致
+            formatted_results = []
+            for doc in results:
+                formatted_results.append({
+                    'id': doc.metadata.get('pk', 0),
+                    'text_content': doc.page_content,
+                    'metadata': doc.metadata,
+                    'distance': 0.0,
+                    'similarity': 1.0
+                })
+
+            return formatted_results
+
+        except Exception as e:
+            # 回退到传统的向量搜索
+            logger.error(f" 搜索失败: {e}")
+
 class StandardizedResponseProcessor:
     """标准化响应处理器 - 统一为outline_reviewer.py格式"""
 
@@ -84,7 +172,7 @@ class LLMReviewClient:
             model="qwen3-30b",
             base_url="http://192.168.91.253:8003/v1",
             api_key="sk-123456",
-            temperature=0.3,
+            temperature=0.7,
         )
 
     async def review_basis(self, Message: str, trace_id: str = None) -> str:
@@ -113,6 +201,7 @@ class BasisReviewService:
     """编制依据审查服务核心类"""
 
     def __init__(self, max_concurrent: int = 4):
+        self.search_engine = BasisSearchEngine()
         self.llm_client = LLMReviewClient()
         self.response_processor = StandardizedResponseProcessor()
         fresh_prompt_loader = PromptLoader()
@@ -145,20 +234,69 @@ class BasisReviewService:
 
         async with self._semaphore:
             try:
-                # 构建提示词模板和用户内容
-                prompt_template = self.message_builder.get_prompt_template()
-                message = prompt_template.partial(check_content=basis_items)
-                trace_id = f"prep_basis_batch_{int(time.time())}"
-                llm_out = await self.llm_client.review_basis(message, trace_id)
-                print("LLM输出:\n")
-                print(llm_out)
+                # 第一步:搜索编制依据并通过match_reference_files过滤
+                search_tasks = []
+                for basis in basis_items:
+                    task = asyncio.create_task(
+                        self._async_search_basis(basis, collection_name, top_k_each)
+                    )
+                    search_tasks.append(task)
+
+                # 等待所有搜索完成
+                search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
+
+                grouped_candidates = []
+                for i, result in enumerate(search_results):
+                    if isinstance(result, Exception):
+                        logger.error(f"搜索失败 '{basis_items[i]}': {result}")
+                        grouped_candidates.append([])
+                    else:
+                        # result 是 List[dict],需要遍历
+                        texts = [item["text_content"] for item in result if "text_content" in item]
+                        grouped_candidates.append(texts)
+                
+                # 获取match_reference_files的结果并过滤
+                match_result = await match_reference_files(reference_text=grouped_candidates, review_text=basis_items)
+                # 解析JSON并过滤:same_name_current和exact_match_info都是""的项过滤掉
+                try:
+                    match_data = json.loads(match_result)
+                    # 提取items字段(match_reference_files返回{items: [...]}格式)
+                    items = match_data.get('items', match_data) if isinstance(match_data, dict) else match_data
+                    filtered_data = [item for item in items if not (item.get('same_name_current') == "" and item.get('exact_match_info') == "")]
+                    # 从过滤后的数据中提取review_item用于后续检查
+                    filtered_basis_items = [item.get('review_item') for item in filtered_data if item.get('review_item')]
+                    basis_items_to_check = filtered_basis_items if filtered_basis_items else []
+                    logger.info(f"过滤后参与检查的编制依据: {len(basis_items_to_check)}/{len(basis_items)}")
+                except (json.JSONDecodeError, TypeError) as e:
+                    logger.warning(f"过滤match_reference_files结果时出错: {e}")
+                    # 如果解析失败,使用原始结果
+                    basis_items_to_check = []
+                
+                # 如果没有过滤出数据,直接返回空结果
+                if not basis_items_to_check:
+                    logger.info(f"过滤后没有符合条件的编制依据,跳过后续检查")
+                    return []
+                
+                # 第二步:调用标点符号检查器
+                checker_result = await check_punctuation(basis_items_to_check)
+                print(checker_result)
+                
+                # 第三步:调用结果处理器,生成详细的问题分析报告
+                processor_result = await process_punctuation_results(checker_result)
+                print("\n【第二步】问题分析报告输出:")
+                print(processor_result)
                 
-                # 使用标准化处理器处理响应
-                standardized_result = self.response_processor.process_llm_response(llm_out, "reference_check", "basis","basis_reference_check")
+                # 第四步:转换为标准格式
+                standardized_result = self.response_processor.process_llm_response(
+                    processor_result, 
+                    "reference_check", 
+                    "basis",
+                    "basis_reference_check"
+                )
 
                 # 统计问题数量
                 issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
-                logger.info(f"编制依据批次审查完成:总计 {len(basis_items)} 项,发现问题 {issue_count} 项")
+                logger.info(f"编制依据批次审查完成:总计 {len(basis_items_to_check)} 项,发现问题 {issue_count} 项")
 
                 return standardized_result
 
@@ -173,6 +311,31 @@ class BasisReviewService:
                     "risk_info": {"risk_level": "high"}
                 }]
 
+    async def _async_search_basis(
+        self,
+        basis: str,
+        collection_name: str,
+        top_k_each: int
+    ) -> List[dict]:
+        """异步搜索单个编制依据(Hybrid Search)"""
+        try:
+            loop = asyncio.get_running_loop()
+            func = partial(
+                self.search_engine.hybrid_search,
+                collection_name=collection_name,
+                query_text=basis,
+                top_k=top_k_each,
+                ranker_type="weighted",
+                dense_weight=0.3,
+                sparse_weight=0.7
+            )
+            retrieved = await loop.run_in_executor(None, func)
+            logger.info(f" 搜索 '{basis}' -> 找到 {len(retrieved or [])} 个结果")
+            return retrieved or []
+        except Exception as e:
+            logger.error(f" 搜索失败 '{basis}': {e}")
+            return []
+
     async def review_all(self, basis_items: BasisItems, collection_name: str = "first_bfp_collection_status",
                         progress_manager=None, callback_task_id: str = None) -> List[List[Dict[str, Any]]]:
         """异步批量审查所有编制依据(BasisItems 入参)"""
@@ -299,6 +462,9 @@ class BasisReviewService:
                 final_results.append(result)
                 successful_batches += 1
 
+                # 过滤空批次结果,避免出现 []
+        final_results = [res for res in final_results if res]
+
         # 统计总结果
         for result in final_results:
             for item in result:
@@ -347,6 +513,10 @@ async def review_all_basis_async(basis_items: BasisItems, max_concurrent: int =
 if __name__ == "__main__":
     # 简单测试
     test_basis_items = BasisItems(items=[
-        BasisItem(title="中华人民共和国特种设备安全法", suffix="2023", raw="《起重机用钢丝绳》(GB T 34198-2017)")
+        BasisItem(title="坠落防护水平生命线装置", suffix="GB 38454", raw="《坠落防护水平生命线装置》GB 38454"),
+        BasisItem(title="电力高处作业防坠器", suffix="DL/T 1147", raw="《电力高处作业防坠器》DL/T 1147"),
+        BasisItem(title="坠落防护挂点装置", suffix="GB 30862", raw="《坠落防护挂点装置》GB 30862"),
+        BasisItem(title="混凝土结构设计规范", suffix="GB 50010-2010", raw="《混凝土结构设计规范》GB 50010-2010"),
+        BasisItem(title="建筑施工组织设计规范", suffix="GB/T 50502-2015", raw="《建筑施工组织设计规范》GB/T 50502-2015"),
     ])
     result = asyncio.run(review_all_basis_async(test_basis_items))

+ 37 - 9
core/construction_review/component/reviewers/timeliness_basis_reviewer.py

@@ -198,14 +198,39 @@ class BasisReviewService:
                         texts = [item["text_content"] for item in result if "text_content" in item]
                         grouped_candidates.append(texts)
                 
-                llm_out = await determine_timeliness_issue(await match_reference_files(reference_text=grouped_candidates, review_text=basis_items))
-                
-                standardized_result = self.response_processor.process_llm_response(llm_out, "timeliness_check", "basis", "basis_timeliness_check")
-                # 统计问题数量
-                issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
-                logger.info(f"编制依据批次审查完成:总计 {len(basis_items)} 项,发现问题 {issue_count} 项")
-
-                return standardized_result
+                # 获取match_reference_files的结果并过滤
+                match_result = await match_reference_files(reference_text=grouped_candidates, review_text=basis_items)
+                # 解析JSON并过滤:same_name_current和exact_match_info都是""的项过滤掉
+                try:
+                    match_data = json.loads(match_result)
+                    # 提取items字段(match_reference_files返回{items: [...]}格式)
+                    items = match_data.get('items', match_data) if isinstance(match_data, dict) else match_data
+                    filtered_data = [item for item in items if not (item.get('same_name_current') == "" and item.get('exact_match_info') == "")]
+                    
+                    # 如果没有过滤出数据,直接返回空结果
+                    if not filtered_data:
+                        logger.info(f"过滤后没有符合条件的编制依据,跳过后续检查")
+                        standardized_result = []
+                    else:
+                        # 重新构建JSON格式
+                        if isinstance(match_data, dict) and 'items' in match_data:
+                            match_result = json.dumps({"items": filtered_data}, ensure_ascii=False, indent=2)
+                        else:
+                            match_result = json.dumps(filtered_data, ensure_ascii=False, indent=2)
+                        
+                        llm_out = await determine_timeliness_issue(match_result)
+                        
+                        standardized_result = self.response_processor.process_llm_response(llm_out, "timeliness_check", "basis", "basis_timeliness_check")
+                        # 统计问题数量
+                        issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
+                        logger.info(f"编制依据批次审查完成:总计 {len(filtered_data)} 项,发现问题 {issue_count} 项")
+                    
+                    return standardized_result if standardized_result else []
+                    
+                except (json.JSONDecodeError, TypeError) as e:
+                    logger.warning(f"过滤match_reference_files结果时出错: {e}")
+                    # 如果解析失败,返回空结果
+                    return []
 
             except Exception as e:
                 logger.error(f" 批次处理失败: {e}")
@@ -372,6 +397,9 @@ class BasisReviewService:
                 final_results.append(result)
                 successful_batches += 1
 
+        # 过滤空批次结果,避免出现 []
+        final_results = [res for res in final_results if res]
+
         # 统计总结果
         for result in final_results:
             for item in result:
@@ -419,7 +447,7 @@ async def review_all_basis_async(basis_items: BasisItems, max_concurrent: int =
 if __name__ == "__main__":
     # 直接构造 BasisItems 测试 review_all
     test_basis_items = BasisItems(items=[
-        BasisItem(title="坠落防护水平生命线装置", suffix="GB 38454", raw="《坠落防护水平生命线装置》GB 38454"),
+        BasisItem(title="坠落防护水平生命线装置", suffix="GB 38454", raw="《预应力混凝土用钢绞线》(GB/T5224-2023);"),
         BasisItem(title="电力高处作业防坠器", suffix="DL/T 1147", raw="《电力高处作业防坠器》DL/T 1147"),
         BasisItem(title="坠落防护挂点装置", suffix="GB 30862", raw="《坠落防护挂点装置》GB 30862"),
         BasisItem(title="混凝土结构设计规范", suffix="GB 50010-2010", raw="《混凝土结构设计规范》GB 50010-2010"),

+ 56 - 4
core/construction_review/component/reviewers/utils/directory_extraction.py

@@ -83,6 +83,46 @@ def fallback_regex(text: str) -> BasisItems:
     return BasisItems(items=items)
 
 
+# --------- 3.5) 根据chapter_code过滤,避免LLM误抽 ---------
+def _filter_by_chapter_code(items: List[BasisItem], chapter_code: str) -> List[BasisItem]:
+    """
+    根据章节代码过滤编制依据条目
+
+    规则:
+    - 如果 chapter_code == "basis":编制依据章节,不过滤,返回所有条目
+    - 如果 chapter_code != "basis":非编制依据章节,过滤掉raw中没有书名号的条目
+
+    Args:
+        items: 待过滤的条目列表
+        chapter_code: 章节代码("basis" 表示编制依据章节)
+
+    Returns:
+        过滤后的条目列表
+    """
+    # 编制依据章节,不过滤
+    if chapter_code == "basis":
+        logger.debug(f"[编制依据提取] 当前为编制依据章节(basis),不过滤,共 {len(items)} 条")
+        return items
+
+    # 非编制依据章节,过滤掉没有书名号的条目(避免LLM误抽)
+    before_count = len(items)
+    filtered_items = [
+        item for item in items
+        if "《" in item.raw and "》" in item.raw
+    ]
+    after_count = len(filtered_items)
+
+    # 如果有过滤,记录日志
+    if before_count != after_count:
+        logger.info(
+            f"[编制依据提取] 非编制依据章节(chapter_code={chapter_code}),"
+            f"过滤掉无书名号条目: {before_count} -> {after_count} "
+            f"(过滤 {before_count - after_count} 条)"
+        )
+
+    return filtered_items
+
+
 # --------- 4) 最小修改:只做一件事,提取第一个 JSON ---------
 def extract_first_json(text: str) -> dict:
     """
@@ -107,7 +147,7 @@ def extract_first_json(text: str) -> dict:
 
 
 # --------- 5) 主函数:使用 LangChain 抽取(最小改动版) ---------
-async def extract_basis_with_langchain_qwen(progress_manager,callback_task_id,text: str) -> BasisItems:
+async def extract_basis_with_langchain_qwen(progress_manager,callback_task_id:str,text: str,chapter_code: str) -> BasisItems:
     """
     使用 LangChain + LLM 提取编制依据信息(流式输出版本)
     """
@@ -199,19 +239,31 @@ async def extract_basis_with_langchain_qwen(progress_manager,callback_task_id,te
 
             cleaned.append(BasisItem(title=title, suffix=suffix, raw=raw))
 
+        # ✅ 新增:根据chapter_code过滤,避免LLM在非编制依据章节误抽
+        cleaned = _filter_by_chapter_code(cleaned, chapter_code)
+
         if cleaned:
             logger.info(f"[编制依据提取] LLM 提取成功,共 {len(cleaned)} 条")
             return BasisItems(items=cleaned)
 
         logger.warning("[编制依据提取] LLM 未提取到内容,使用兜底方案")
-        return fallback_regex(text)
+        # ✅ 修改:兜底方案也要经过过滤
+        fallback_result = fallback_regex(text)
+        filtered_items = _filter_by_chapter_code(fallback_result.items, chapter_code)
+        return BasisItems(items=filtered_items)
 
     except (json.JSONDecodeError, ValidationError, ValueError) as e:
         logger.error(f"[编制依据提取] LLM 输出解析失败: {e},使用兜底方案")
-        return fallback_regex(text)
+        # ✅ 修改:兜底方案也要经过过滤
+        fallback_result = fallback_regex(text)
+        filtered_items = _filter_by_chapter_code(fallback_result.items, chapter_code)
+        return BasisItems(items=filtered_items)
     except Exception as e:
         logger.error(f"[编制依据提取] LLM 提取失败: {str(e)},使用兜底方案")
-        return fallback_regex(text)
+        # ✅ 修改:兜底方案也要经过过滤
+        fallback_result = fallback_regex(text)
+        filtered_items = _filter_by_chapter_code(fallback_result.items, chapter_code)
+        return BasisItems(items=filtered_items)
 
 
 

+ 23 - 4
core/construction_review/component/reviewers/utils/inter_tool.py

@@ -530,13 +530,19 @@ class InterTool:
 
             # 3. 如果JSON解析失败,回退到文本解析
             if not review_lists:
+                # 🔧 修复:检查响应是否为空或只包含空白字符
+                response_stripped = response.strip() if isinstance(response, str) else ""
+                is_empty_response = not response_stripped or response_stripped in ["", "null", "None", "undefined"]
+
                 risk_level = self._determine_risk_level(response)
+
+                # 如果响应为空,则设置 exist_issue=False
                 review_lists.append({
                     "check_item": check_name,
                     "chapter_code": chapter_code,
                     "check_item_code": check_item_code,
                     "check_result": response,
-                    "exist_issue": True,
+                    "exist_issue": not is_empty_response,  # 🔧 修复:空响应不存在问题
                     "risk_info": {"risk_level": risk_level}
                 })
 
@@ -632,9 +638,22 @@ class InterTool:
         """创建单个审查问题项"""
         risk_level = self._determine_risk_level(issue_data.get("risk_level", ""))
 
-        # 根据原始风险等级判断是否存在问题
-        original_risk_level = issue_data.get("risk_level", "")
-        exist_issue = original_risk_level not in ["无风险", "无", "通过", "符合要求"]
+        # 🔧 修复:首先检查 issue_data 是否为空
+        is_empty = False
+        if isinstance(issue_data, list):
+            is_empty = len(issue_data) == 0
+        elif isinstance(issue_data, dict):
+            # 检查是否为空字典,或者只有 risk_level 字段但没有其他实质内容
+            is_empty = len(issue_data) == 0 or (len(issue_data) == 1 and "risk_level" in issue_data)
+
+        # 根据原始风险等级和内容判断是否存在问题
+        original_risk_level = issue_data.get("risk_level", "") if isinstance(issue_data, dict) else ""
+        # 只有当内容不为空,且风险等级不是"无风险"类时,才认为存在问题
+        exist_issue = not is_empty and original_risk_level not in ["无风险", "无", "通过", "符合要求"]
+
+        # 记录调试信息
+        if is_empty:
+            logger.debug(f"检查项 {check_name} 的 issue_data 为空,设置 exist_issue=False")
 
         return {
             "check_item": check_name,

+ 284 - 0
core/construction_review/component/reviewers/utils/punctuation_checker.py

@@ -0,0 +1,284 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import json
+from typing import List, Optional
+
+from pydantic import BaseModel, Field, ValidationError
+from langchain_core.prompts import ChatPromptTemplate
+from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser
+from langchain_openai import ChatOpenAI
+
+
+# ===== 1) 定义结构 =====
+class PunctuationResult(BaseModel):
+    original_text: str = Field(..., description="审查的规范原文,与输入完全一致")
+    title_mark_status: bool = Field(..., description="书名号使用是否正确,true表示正确,false表示错误")
+    bracket_status: Optional[bool] = Field(..., description="括号使用是否正确,true表示正确,false表示错误,null表示没有编号")
+
+
+class PunctuationResults(BaseModel):
+    items: List[PunctuationResult]
+
+
+# ===== 2) SYSTEM Prompt =====
+SYSTEM = """
+你是【标点符号规范性检查助手】。
+
+【任务】
+仅对已通过“成对出现”预检的文本,检查书名号和括号是否**包裹完整且位置正确**。
+(预检已经保证:
+    - 书名号《》至少各出现一次且数量相等
+    - 括号(/( )/)至少各出现一次且数量相等(中文、英文括号视为同类)
+    因此你只需判断包裹范围是否正确、是否遗漏内容。)
+
+【判断标准】
+- title_mark_status:书名号需完全包裹规范名称,且不多包/漏包
+- bracket_status:括号需完全包裹规范编号,且不多包/漏包;编号可能是各种形式,如果文本中没有编号,设置为null
+
+
+【输出要求】
+- 为每个输入文本输出一个检查结果
+- 确保输出数量与输入一致
+- original_text 必须与输入完全一致
+- title_mark_status 必须是布尔值:true表示正确,false表示错误
+- bracket_status 必须是布尔值或null:true表示正确,false表示错误,null表示没有编号
+"""
+
+HUMAN = """
+请检查以下文本中书名号和括号的**内容是否全部被包裹**,以及是否有编号。
+(所有文本已通过成对出现的预检,至少各有一对《》且数量相等。)
+
+【判断原则】
+- 仅检查包裹的**完整性**:书名号是否包裹了规范名称的全部内容;括号是否包裹了编号的全部内容
+- 中文括号()和英文括号()混用视为正常,不区分
+- 若内容在符号外遗漏,或符号包裹了多余内容,则判定为false
+- **重要**:如果文本中没有编号(完全没有任何()或()符号),则bracket_status设置为null
+
+【简单示例】
+示例1:《建筑抗震设》计规范 (GB 50011-2001)
+- 规范名称是"建筑抗震设计规范",但只有"建筑抗震设"被包裹,"计规范"在外 → title_mark_status=false
+- 编号被完整包裹 → bracket_status=true
+
+示例2:《建筑抗震设计规范》
+- 书名号包裹了完整的规范名称 → title_mark_status=true
+- 没有编号 → bracket_status=null
+
+示例3:《建筑抗震设计规范》(GB 50011-2001)
+- 书名号包裹了完整的规范名称 → title_mark_status=true
+- 英文括号包裹了完整的编号 → bracket_status=true(混用不算错)
+
+示例4:《起重机械钢丝绳保养维护检验和报废》GB/T5972-2023;
+- 书名号包裹了完整的规范名称 → title_mark_status=true
+- 编号未被包裹 → bracket_status=false
+
+【待检查文本】
+{items}
+
+【输出格式要求】
+{format_instructions}
+/no_think
+"""
+
+# ===== 3) Output Parser =====
+parser = PydanticOutputParser(pydantic_object=PunctuationResults)
+
+# ===== 4) Prompt =====
+prompt = ChatPromptTemplate.from_messages([
+    ("system", SYSTEM),
+    ("human", HUMAN)
+])
+
+# ===== 5) LLM =====
+llm = ChatOpenAI(
+    model="qwen3-30b",
+    base_url="http://192.168.91.253:8003/v1",
+    api_key="sk-123456",
+    temperature=0.7,
+)
+
+
+# ===== 6) 提取第一个 JSON =====
+def extract_first_json(text: str) -> dict:
+    """从任意模型输出中提取第一个完整 JSON 对象 { ... }"""
+    start = text.find("{")
+    if start == -1:
+        raise ValueError("未找到 JSON 起始 '{'")
+
+    depth = 0
+    for i in range(start, len(text)):
+        ch = text[i]
+        if ch == "{":
+            depth += 1
+        elif ch == "}":
+            depth -= 1
+            if depth == 0:
+                return json.loads(text[start:i + 1])
+
+    raise ValueError("JSON 花括号未闭合")
+
+
+# ===== 7) 核心方法 =====
+async def check_punctuation(items: List[str]) -> str:
+    """
+    检查规范文本中的书名号和括号使用是否正确,先进行成对预检,再用LLM判断包裹完整性
+    
+    Args:
+        items: 待检查的规范文本列表
+        
+    Returns:
+        检查结果的JSON字符串,包含三个字段:
+        - original_text: 原文
+        - title_mark_status: 书名号使用是否正确(true/false)
+        - bracket_status: 括号使用是否正确(true/false/null,null表示没有编号)
+    """
+    # 1) 预检:是否存在且成对出现
+    pre_results = []  # 预填结果,若需LLM再补充
+    llm_inputs = []   # 需要LLM判定包裹完整性的文本
+
+    for text in items:
+        # 书名号成对判定
+        left_title = text.count("《")
+        right_title = text.count("》")
+        title_pair_ok = left_title == right_title and left_title > 0
+
+        # 括号成对判定(中英文括号混用视为同类)
+        left_br = text.count("(") + text.count("(")
+        right_br = text.count(")") + text.count(")")
+        bracket_pair_ok = left_br == right_br and left_br > 0
+        
+        # 只有书名号和括号都存在时,才判断一一对应
+        # 情况1:都不存在 → 都为False
+        if left_title == 0 and left_br == 0:
+            pre_results.append({
+                "original_text": text,
+                "title_mark_status": False,
+                "bracket_status": None
+            })
+            continue
+        
+        # 情况2:只有书名号,没有括号 → bracket_status为None
+        if left_title > 0 and left_br == 0:
+            if title_pair_ok:
+                llm_inputs.append(text)
+            else:
+                pre_results.append({
+                    "original_text": text,
+                    "title_mark_status": False,
+                    "bracket_status": None
+                })
+            continue
+        
+        # 情况3:只有括号,没有书名号 → title_mark_status为False
+        if left_title == 0 and left_br > 0:
+            pre_results.append({
+                "original_text": text,
+                "title_mark_status": False,
+                "bracket_status": bool(bracket_pair_ok)
+            })
+            continue
+        
+        # 情况4:两者都存在,判断一一对应
+        if left_title != left_br:
+            # 数量不对应,两个都为False
+            pre_results.append({
+                "original_text": text,
+                "title_mark_status": False,
+                "bracket_status": False
+            })
+            continue
+        
+        # 检查括号是否在书名号之后
+        bracket_after_title = True
+        if bracket_pair_ok and title_pair_ok:
+            # 找最后一个书名号的位置
+            last_title_pos = max(text.rfind("《"), text.rfind("》"))
+            # 找第一个括号的位置
+            first_bracket_pos = min(
+                text.find("(") if "(" in text else float('inf'),
+                text.find("(") if "(" in text else float('inf')
+            )
+            bracket_after_title = last_title_pos < first_bracket_pos
+
+        if not title_pair_ok or not bracket_pair_ok or not bracket_after_title:
+            # 预检失败或位置不正确,直接判定对应项为False,无需LLM
+            pre_results.append({
+                "original_text": text,
+                "title_mark_status": bool(title_pair_ok),
+                "bracket_status": bool(bracket_pair_ok and bracket_after_title)
+            })
+        else:
+            # 成对且位置正确通过,交给LLM判断包裹是否完整和是否有编号
+            llm_inputs.append(text)
+
+    # 若无需要LLM的,直接返回预检结果
+    if not llm_inputs:
+        return json.dumps(pre_results, ensure_ascii=False, indent=2)
+
+    chain = prompt | llm | StrOutputParser()
+    format_instructions = parser.get_format_instructions()
+
+    payload = {
+        "items": json.dumps(llm_inputs, ensure_ascii=False, indent=2),
+        "format_instructions": format_instructions
+    }
+
+    last_err = None
+
+    llm_result: List[dict] = []
+    for _ in range(2):
+        try:
+            raw = await chain.ainvoke(payload)
+            data = extract_first_json(raw)
+
+            # 兼容两种格式:带 items 字段或不带 items 字段(单个对象)
+            if "items" in data:
+                findings = PunctuationResults.model_validate(data)
+                llm_result = [x.model_dump() for x in findings.items]
+            else:
+                # LLM 返回了单个对象,包装成列表
+                single_result = PunctuationResult.model_validate(data)
+                llm_result = [single_result.model_dump()]
+            break
+        except (Exception, ValidationError, json.JSONDecodeError) as e:
+            last_err = e
+            print(f"[标点符号检查] 解析失败,重试中: {e}")
+
+    if last_err and not llm_result:
+        raise RuntimeError(f"标点符号检查失败:{last_err}") from last_err
+
+    # 合并预检与LLM结果,按原输入顺序输出
+    merged = []
+    llm_map = {item["original_text"]: item for item in llm_result}
+    for text in items:
+        # 先看预检是否已有
+        found = next((r for r in pre_results if r["original_text"] == text), None)
+        if found:
+            merged.append(found)
+        else:
+            merged.append(llm_map.get(text, {
+                "original_text": text,
+                "title_mark_status": False,
+                "bracket_status": None
+            }))
+
+    return json.dumps(merged, ensure_ascii=False, indent=2)
+
+
+# ===== 8) 示例 =====
+if __name__ == "__main__":
+    import asyncio
+
+    # 测试用例
+    test_items = [
+        "《起重机械钢丝绳保养维护检验和报废》GB/T5972-2023;"  # 正确
+        # "《混》凝土结构设计规范(GB 50010-2010)",      # 缺少书名号
+        # "建筑施工组织设计规范GB/T 50502-2015",  # 缺少括号
+        # "《建筑抗震设计规范》(GB 50011)-2001",       # 括号不成对
+        # "《城市道路工程设计规范(CJJ 37-2012)",    # 书名号不成对
+        # "《公路工程技术标准》(JTG B01-2014)",     # 正确
+    ]
+
+    result = asyncio.run(check_punctuation(test_items))
+    print("\n标点符号检查结果:")
+    print(result)

+ 187 - 0
core/construction_review/component/reviewers/utils/punctuation_result_processor.py

@@ -0,0 +1,187 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import json
+from typing import List, Literal
+
+from pydantic import BaseModel, Field, ValidationError
+from langchain_core.prompts import ChatPromptTemplate
+from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser
+from langchain_openai import ChatOpenAI
+
+
+# ===== 1) 定义结构 =====
+RiskLevel = Literal["无风险", "中风险"]
+
+
+class PunctuationIssueResult(BaseModel):
+    issue_point: str = Field(..., description="问题点描述")
+    location: str = Field(..., description="审查内容,与输入完全一致")
+    suggestion: str = Field(..., description="修改建议,可执行动作")
+    reason: str = Field(..., description="问题的原因分析")
+    risk_level: RiskLevel = Field(..., description='风险水平,只能是 "无风险" / "中风险"')
+
+
+class PunctuationIssueResults(BaseModel):
+    items: List[PunctuationIssueResult]
+
+
+# ===== 2) SYSTEM Prompt =====
+SYSTEM = """
+你是【编制依据格式问题分析专家】。
+
+【任务】
+根据标点符号检查结果,生成详细的问题分析报告。
+
+【重要说明(必须严格遵守)】
+- location 字段必须与输入的 original_text 完全一致(一字不差)
+- 根据 title_mark_status 和 bracket_status 的值判断问题类型
+- 提供具体的修改建议和原因分析
+
+【输出要求】
+- 为每个检查结果输出一个详细的问题分析
+- 确保输出数量与输入一致
+- location 必须与 original_text 完全一致
+- 严格按照判定规则生成内容
+"""
+
+HUMAN = """
+请根据以下标点符号检查结果,生成详细的问题分析报告:
+
+【判定规则】
+
+当 title_mark_status = true 且 bracket_status = true:
+- issue_point:编制依据格式正确
+- reason:规范名称和编号的标点符号使用规范
+- suggestion:无
+- risk_level:无风险
+
+当 title_mark_status != true 时:
+- issue_point:编制依据格式错误
+- reason:从以下三种情况中选择最符合实际的问题描述:
+    1. 规范名称未被书名号包裹
+    2. 书名号不成对
+    3. 规范名称未完全被书名号包裹
+- suggestion:将规范名称用书名号《》包裹,正确格式:《规范名称》(编号)
+- risk_level:中风险
+
+当 bracket_status != true 时:
+- issue_point:编制依据格式错误
+- reason:如果bracket_status = null,则问题原因是"编号缺失";
+    否则从以下三种情况中从上到下选择符合的问题描述:
+    1. 规范编号未被括号包裹
+    2. 规范编号未完全被括号包裹
+    3. 括号不成对
+- suggestion:
+  * 如果是"编号缺失":补充编号,格式为(编号)
+  * 否则:将编号用括号()包裹,正确格式:《规范名称》(编号)
+- risk_level:中风险
+
+当 title_mark_status != true 且 bracket_status != true:
+- issue_point:编制依据格式错误
+- reason:引用不符合正确格式:《规范名称》(编号)
+- suggestion:请将引用调正为正确格式:《规范名称》(编号)并保证名称与编号一一对应
+- risk_level:中风险
+
+【标点符号检查结果】
+{check_results}
+
+【输出格式要求】
+{format_instructions}
+/no_think
+"""
+
+# ===== 3) Output Parser =====
+parser = PydanticOutputParser(pydantic_object=PunctuationIssueResults)
+
+# ===== 4) Prompt =====
+prompt = ChatPromptTemplate.from_messages([
+    ("system", SYSTEM),
+    ("human", HUMAN)
+])
+
+# ===== 5) LLM =====
+llm = ChatOpenAI(
+    model="qwen3-30b",
+    base_url="http://192.168.91.253:8003/v1",
+    api_key="sk-123456",
+    temperature=0,
+)
+
+
+# ===== 6) 提取第一个 JSON =====
+def extract_first_json(text: str) -> dict:
+    """从任意模型输出中提取第一个完整 JSON 对象 { ... }"""
+    start = text.find("{")
+    if start == -1:
+        raise ValueError("未找到 JSON 起始 '{'")
+
+    depth = 0
+    for i in range(start, len(text)):
+        ch = text[i]
+        if ch == "{":
+            depth += 1
+        elif ch == "}":
+            depth -= 1
+            if depth == 0:
+                return json.loads(text[start:i + 1])
+
+    raise ValueError("JSON 花括号未闭合")
+
+
+# ===== 7) 核心方法 =====
+async def process_punctuation_results(check_results: str) -> str:
+    """
+    根据标点符号检查结果生成详细的问题分析报告
+    
+    Args:
+        check_results: punctuation_checker 的返回结果(JSON字符串)
+        
+    Returns:
+        问题分析报告的JSON字符串,包含五个字段:
+        - issue_point: 问题点描述
+        - location: 审查内容(与原文一致)
+        - suggestion: 修改建议
+        - reason: 问题原因分析
+        - risk_level: 风险水平
+    """
+    chain = prompt | llm | StrOutputParser()
+    format_instructions = parser.get_format_instructions()
+
+    payload = {
+        "check_results": check_results,
+        "format_instructions": format_instructions
+    }
+
+    last_err = None
+
+    for _ in range(2):
+        try:
+            raw = await chain.ainvoke(payload)
+            #print(f"[标点符号问题分析] 模型输出: {raw}...")
+            data = extract_first_json(raw)
+            findings = PunctuationIssueResults.model_validate(data)
+            result = [x.model_dump() for x in findings.items]
+            return json.dumps(result, ensure_ascii=False, indent=2)
+        except (Exception, ValidationError, json.JSONDecodeError) as e:
+            last_err = e
+
+    raise RuntimeError(f"标点符号问题分析失败:{last_err}") from last_err
+
+
+# ===== 8) 示例 =====
+if __name__ == "__main__":
+    import asyncio
+
+    # 模拟 punctuation_checker 的返回结果
+    check_results = json.dumps([
+        {
+            "original_text": "《起重机械钢丝绳保养、维护、检验和报废》GB/T5972-2023;",
+            "title_mark_status": True,
+            "bracket_status": False
+        }
+    ], ensure_ascii=False)
+
+    result = asyncio.run(process_punctuation_results(check_results))
+    print("\n标点符号问题分析结果:")
+    print(result)

+ 2 - 2
core/construction_review/component/reviewers/utils/reference_matcher.py

@@ -55,7 +55,7 @@ HUMAN = """
    - 完全找不到任何相关文件,返回 false
 
 3. **has_exact_match**(是否有名称编号都相同的文件)
-   - 找到名称且编号相同的文件,返回 true
+   - 忽略书写格式不同,找到名称且编号相同的文件,返回 true
    - 否则返回 false
 
 4. **exact_match_info**(名称编号相同的文件及状态)
@@ -96,7 +96,7 @@ llm = ChatOpenAI(
     model="qwen3-30b",
     base_url="http://192.168.91.253:8003/v1",
     api_key="sk-123456",
-    temperature=0,
+    temperature=0.7,
 )
 
 # ===== 6) 提取第一个 JSON =====

+ 1 - 1
core/construction_review/component/reviewers/utils/timeliness_determiner.py

@@ -16,7 +16,7 @@ RiskLevel = Literal["无风险", "高风险"]
 
 class TimelinessResult(BaseModel):
     issue_point: str = Field(..., description="时效性判定类型")
-    review_item: str = Field(..., description="审查的规范原文,与输入完全一致")
+    location: str = Field(..., description="审查的规范原文,与输入完全一致")
     suggestion: str = Field(..., description="建议")
     reason: str = Field(..., description="原因")
     risk_level: RiskLevel = Field(..., description='风险水平,只能是 "无风险" / "高风险"')

+ 30 - 8
core/construction_review/workflows/ai_review_workflow.py

@@ -294,7 +294,7 @@ class AIReviewWorkflow:
             review_func_mapping: Dict[str, Union[str, List[str]]] = {
                 'sensitive_word_check': 'sensitive_word_check',
                 'semantic_logic_check': 'check_semantic_logic',
-                'completeness_check': ['check_completeness', 'outline_check'],
+                'completeness_check': 'check_completeness',
                 'timeliness_check': 'timeliness_basis_reviewer',
                 'reference_check': 'reference_basis_reviewer',
                 'sensitive_check': 'check_sensitive',
@@ -316,11 +316,15 @@ class AIReviewWorkflow:
             original_chunks = state.get("structured_content", {}).get("chunks", [])
 
             # 预处理:根据 review_item_dict_sorted 中的 key 对 structured_content 进行筛选
-            # original_chunks = structured_content.get("chunks", [])
             filtered_chunks = [
                 chunk for chunk in original_chunks
                 if chunk.get("chapter_classification") in review_item_dict_sorted.keys()
             ]
+            # 筛选完整性存在完整性审查的分类,将其整章进行合并
+            filtered_chunks = self.core_fun._merge_chunks_for_completeness_check(
+                filtered_chunks, review_item_dict_sorted
+            )
+
             # with open("temp/filtered_chunks/filtered_chunks.json", "w", encoding="utf-8") as f:
             #     json.dump(filtered_chunks, f, ensure_ascii=False, indent=4)
             # # 更新 chunks 和 structured_content
@@ -328,7 +332,20 @@ class AIReviewWorkflow:
             # structured_content["chunks"] = chunks
 
             total_chapters = len(review_item_dict_sorted)
-            total_chunks = len(filtered_chunks)
+            # with open("temp/filtered_chunks/review_item_dict_sorted.json", "w", encoding="utf-8") as f:
+            #     json.dump(review_item_dict_sorted, f, ensure_ascii=False, indent=4)
+            # 如果review_item_dict_sorted中只包含check_completeness,则total_chunks 仅计算chunk中is_complete_field = true的chunk数量
+            all_check_items = []
+            for check_list in review_item_dict_sorted.values():
+                all_check_items.extend(check_list)  # 把每个分类的检查项加入总列表
+
+            # 判断:所有检查项是否都只有 "check_completeness"(无其他检查项)
+            if all(item == "check_completeness" for item in all_check_items):
+                # 仅统计 is_complete_field = True 的chunk数量(用生成器表达式省内存)
+                total_chunks = sum(1 for chunk in filtered_chunks if chunk.get("is_complete_field", False))
+            else:
+                # 统计所有 filtered_chunks
+                total_chunks = len(filtered_chunks)
 
             # 初始化issues列表
             all_issues = []
@@ -481,17 +498,22 @@ class AIReviewWorkflow:
                 logger.info(f"开始执行大纲审查")
 
 
-                # outline_review_result = await self.ai_review_engine.outline_check(state["callback_task_id"], state["structured_content"],
-                #                                     state, state.get("stage_name", "大纲审查"))
-                outline_review_result = {} 
+
+
                 check_completeness_result = await self.ai_review_engine.check_completeness(
                     trace_id_idx = state["callback_task_id"],
                     review_content = state["structured_content"]["chunks"],
                     state = state,
                     stage_name = state.get("stage_name", "完整性审查")
                 )
-                with open(r"temp\document_temp\4_check_completeness_result.json", "w", encoding="utf-8") as f:
-                    json.dump(check_completeness_result, f, ensure_ascii=False, indent=4)
+                # outline_review_result = {} 
+                outline_review_result = await self.ai_review_engine.outline_check(
+                    trace_id_idx = state["callback_task_id"],
+                    outline_content = state["structured_content"],
+                    state = state,
+                    stage_name = state.get("stage_name", "大纲审查"))
+                # with open(r"temp\document_temp\4_check_completeness_result.json", "w", encoding="utf-8") as f:
+                #     json.dump(check_completeness_result, f, ensure_ascii=False, indent=4)
 
 
             # # 4. 执行编制依据审查

+ 192 - 80
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -29,7 +29,6 @@ AI审查核心功能类 - 负责具体的审查逻辑和数据处理
 ├── _execute_technical_review()     # 执行技术性审查(参数/非参数合规性检查)
 ├── _group_chunks_by_chapter()      # 按章节代码对chunks进行分组
 ├── _extract_issues_from_result()   # 从审查结果中提取issues列表
-├── _format_chunk_results_to_issues() # 格式化单个块的审查结果为issues列表
 └── _dummy_review_task()            # 空任务(方法不存在时使用)
 '''
 
@@ -59,6 +58,7 @@ class UnitReviewResult():
     technical_compliance: Dict[str, Any]
     rag_enhanced: Dict[str, Any]
     overall_risk: str
+    is_sse_push: bool = True  # 是否成功执行并推送SSE,默认为True
 
 
 class AIReviewCoreFun:
@@ -139,7 +139,9 @@ class AIReviewCoreFun:
             chunk_results = await self._execute_chunk_methods(
                 chapter_code, chunk, global_chunk_index, func_names, state
             )
-
+            if not chunk_results.get('is_sse_push', False):
+                logger.info(f"跳过当前未成功审查块 {chunk_index} 处理完成")                
+                continue  # 跳过未成功执行的块
             # 格式化当前块的结果为issues
             chunk_page = chunk.get('page', '')
             review_location_label = f"第{chunk_page}页:{chunk_label}"
@@ -203,43 +205,6 @@ class AIReviewCoreFun:
 
         return issues
 
-    def _format_chunk_results_to_issues(
-        self,
-        state: AIReviewState,
-        chunk_index: int,
-        chunk: Dict[str, Any],
-        chapter_code: str,
-        chunk_results: Dict[str, Any]
-    ) -> List[Dict]:
-        """
-        格式化单个块的所有审查结果为issues列表
-
-        Args:
-            state: AI审查状态
-            chunk_index: 块索引
-            chunk: 块内容
-            chapter_code: 章节代码
-            chunk_results: 块审查结果字典 {func_name: result}
-
-        Returns:
-            List[Dict]: issues列表
-        """
-        issues = []
-
-        for func_name, result in chunk_results.items():
-            if result is None:
-                continue
-
-            # 处理错误结果
-            if isinstance(result, dict) and "error" in result:
-                logger.warning(f"审查方法 {func_name} 返回错误: {result['error']}")
-                continue
-
-            # 提取issues
-            extracted = self._extract_issues_from_result(result)
-            issues.extend(extracted)
-
-        return issues
 
     def _group_chunks_by_chapter(self, chunks: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
         """
@@ -280,18 +245,21 @@ class AIReviewCoreFun:
         semaphore = asyncio.Semaphore(5)  # 单个块内限制并发数为5
         rag_enhanced_content = None  # 初始化变量,避免作用域错误
         basis_content = None  # 初始化变量,避免作用域错误
-
-        if 'check_parameter_compliance' in func_names or 'check_non_parameter_compliance' in func_names:
+        is_complete_field = chunk.get('is_complete_field', False)
+        logger.info(f"检查is_complete_field值是否正常: {is_complete_field}")
+        # 只有非完整性审查的chunk才执行RAG检索(注意括号位置,确保运算符优先级正确)
+        if ('check_parameter_compliance' in func_names or 'check_non_parameter_compliance' in func_names) and not is_complete_field:
             logger.debug("开始执行RAG检索增强")
             rag_enhanced_content = self.ai_review_engine.rag_enhanced_check(chunk.get('content', ''))
 
-        if 'reference_basis_reviewer' in func_names or 'timeliness_basis_reviewer' in func_names:
+        if ('reference_basis_reviewer' in func_names or 'timeliness_basis_reviewer' in func_names) and not is_complete_field:
             logger.debug("开始执行编制依据/时效性预处理")
             # 预处理编制依据/时效性审查所需内容
             basis_content = await directory_extraction.extract_basis_with_langchain_qwen(
                 state['progress_manager'],
                 state["callback_task_id"],
-                chunk.get('content', '')
+                chunk.get('content', ''),
+                chapter_code
             )
         async def execute_with_semaphore(func_name):
             async with semaphore:
@@ -320,6 +288,7 @@ class AIReviewCoreFun:
         merged_basic = {}
         merged_technical = {}
         merged_rag = {}
+        has_success = False  # 标记是否有成功执行的任务
 
         for result in completed_results:
             if isinstance(result, Exception):
@@ -329,6 +298,9 @@ class AIReviewCoreFun:
             if result and len(result) == 2:
                 func_name, review_result = result
                 if isinstance(review_result, UnitReviewResult):
+                    # 检查是否有成功的任务
+                    if review_result.is_sse_push:
+                        has_success = True
                     # 合并 basic_compliance
                     merged_basic.update(review_result.basic_compliance)
                     # 合并 technical_compliance
@@ -339,7 +311,8 @@ class AIReviewCoreFun:
         return {
             'basic_compliance': merged_basic,
             'technical_compliance': merged_technical,
-            'rag_enhanced': merged_rag
+            'rag_enhanced': merged_rag,
+            'is_sse_push': has_success  # 添加 is_sse_push 字段
         }
 
     async def _execute_single_review(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_name: str, state: AIReviewState,rag_enhanced_content :dict = None, basis_content: dict = None) -> UnitReviewResult:
@@ -356,6 +329,7 @@ class AIReviewCoreFun:
         Returns:
             UnitReviewResult: 单个审查方法的UnitReviewResult对象,包含 basic_compliance 或 technical_compliance
         """
+      
         # 从ai_review_engine获取对应的方法
         if not hasattr(self.ai_review_engine, func_name):
             logger.warning(f"AIReviewEngine中未找到方法: {func_name}")
@@ -366,7 +340,8 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: {"error": f"未找到方法: {func_name}"}},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk="error"
+                overall_risk="error",
+                is_sse_push=True
             )
 
         method = getattr(self.ai_review_engine, func_name)
@@ -377,11 +352,11 @@ class AIReviewCoreFun:
 
         # 获取块内容
         review_content = chunk.get("content", "")
-
+        is_complete_field = chunk.get("is_complete_field", False)
         logger.debug(f"执行审查: {trace_id} -> {func_name}")
 
         # 根据func_name构建对应的参数并调用
-        if func_name == "sensitive_word_check":
+        if func_name == "sensitive_word_check" and not is_complete_field:
             raw_result = await method(trace_id, review_content, state, stage_name)
             # 基础审查方法,放入 basic_compliance
             return UnitReviewResult(
@@ -390,10 +365,11 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: raw_result},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
-        elif func_name == "check_semantic_logic":
+        elif func_name == "check_semantic_logic" and not is_complete_field:
             raw_result = await method(trace_id, review_content, state, stage_name)
             # 基础审查方法,放入 basic_compliance
             return UnitReviewResult(
@@ -402,10 +378,11 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: raw_result},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
-        elif func_name == "check_sensitive":
+        elif func_name == "check_sensitive" and not is_complete_field:
             raw_result = await method(trace_id, review_content, state, stage_name)
             # 基础审查方法,放入 basic_compliance
             return UnitReviewResult(
@@ -414,10 +391,11 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: raw_result},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
-        elif func_name == "check_completeness":
+        elif func_name == "check_completeness" and is_complete_field:
             # check_completeness 需要列表类型,将单个 chunk 包装成列表
             raw_result = await method(trace_id, [chunk], state, stage_name)
             # 基础审查方法,放入 basic_compliance
@@ -427,10 +405,11 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: raw_result},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
-        elif func_name == "check_non_parameter_compliance":
+        elif func_name == "check_non_parameter_compliance" and not is_complete_field:
             # 技术审查方法需要从 RAG 检索结果中获取 references
             raw_result = await self._execute_technical_review(
                 method, trace_id, review_content, chunk, state, stage_name, rag_enhanced_content, func_name
@@ -442,10 +421,11 @@ class AIReviewCoreFun:
                 basic_compliance={},
                 technical_compliance={func_name: raw_result},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
-        elif func_name == "check_parameter_compliance":
+        elif func_name == "check_parameter_compliance" and not is_complete_field:
             # 技术审查方法需要从 RAG 检索结果中获取 references
             raw_result = await self._execute_technical_review(
                 method, trace_id, review_content, chunk, state, stage_name, rag_enhanced_content, func_name
@@ -457,23 +437,14 @@ class AIReviewCoreFun:
                 basic_compliance={},
                 technical_compliance={func_name: raw_result},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
-        # # outline_check 仍在章节级别处理
-        # elif func_name == "outline_check":
-        #     logger.warning(f"方法 {func_name} 不应在块级别调用,已在主流程中处理")
-        #     return UnitReviewResult(
-        #         unit_index=chunk_index,
-        #         unit_content=chunk,
-        #         basic_compliance={},
-        #         technical_compliance={},
-        #         rag_enhanced={},
-        #         overall_risk="low"
-        #     )
+
 
         # reference_basis_reviewer:编制依据审查(逐块处理)
-        elif func_name == "reference_basis_reviewer":
+        elif func_name == "reference_basis_reviewer" and not is_complete_field:
             review_data = {
                 "content": review_content,  # 原始文本内容
                 "basis_items": basis_content,  # 提取的 BasisItems 对象
@@ -492,11 +463,12 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: raw_result},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
         # timeliness_basis_reviewer:时效性审查(逐块处理)
-        elif func_name == "timeliness_basis_reviewer":
+        elif func_name == "timeliness_basis_reviewer" and not is_complete_field:
             review_data = {
                 "content": review_content,  # 原始文本内容
                 "basis_items": basis_content,  # 提取的 BasisItems 对象
@@ -515,18 +487,21 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: raw_result},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
         else:
-            logger.warning(f"未知的审查方法: {func_name},使用默认调用方式")
+            logger.warning(f"未知的审查方法: {func_name}")
+            logger.warning(f"is_complete_field: {is_complete_field}")
             return UnitReviewResult(
                 unit_index=chunk_index,
                 unit_content=chunk,
                 basic_compliance={func_name: {"error": f"未知的审查方法: {func_name}"}},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk="error"
+                overall_risk="error",
+                is_sse_push=False
             )
 
     def _calculate_single_result_risk(self, raw_result: Any) -> str:
@@ -1090,7 +1065,7 @@ class AIReviewCoreFun:
             logger.warning(f"发送单元完成进度更新失败: {str(e)}")
             # 发生异常时,尝试返回一个基于 index 的估算值
             try:
-                return int(((unit_index + 1) / total_units) * 100)
+                return int(((unit_index + 1) / total_chunks) * 100)
             except:
                 return 0
 
@@ -1157,11 +1132,11 @@ class AIReviewCoreFun:
         for item in review_item_config:
             key, value = item.split("_", 1)
             review_item_dict.setdefault(key, []).append(value)
-        
+
         # 依据方案标准章节顺序进行排序
-        sgfa_chapter_index_order = ["catalogue", "basis", "overview", "plan","technology", "safety", "quality", "environment", 
+        sgfa_chapter_index_order = ["catalogue", "basis", "overview", "plan","technology", "safety", "quality", "environment",
 "management", "acceptance", "other"]
-        
+
         all_keys = review_item_dict.keys()
         sorted_keys = sorted(
             all_keys,
@@ -1170,4 +1145,141 @@ class AIReviewCoreFun:
         review_item_dict_sorted = {}
         for key in sorted_keys:
             review_item_dict_sorted[key] = review_item_dict[key]
-        return review_item_dict_sorted
+        return review_item_dict_sorted
+
+    def _merge_chunks_for_completeness_check(
+        self,
+        chunks: List[Dict[str, Any]],
+        review_item_dict: Dict[str, List[str]]
+    ) -> List[Dict[str, Any]]:
+        """
+        筛选包含完整性审查的分类,将其整章进行合并
+
+        Args:
+            chunks: 筛选后的chunks列表
+            review_item_dict: 审查项字典 {chapter_code: [func_names]}
+
+        Returns:
+            List[Dict[str, Any]]: 追加合并chunk后的chunks列表,并按标准章节顺序排序
+
+        Note:
+            合并规则:
+            1. 找出包含 'check_completeness' 或 'outline_check' 的章节分类
+            2. 章节定义:chapter字段去除->及其之后的内容作为章节名
+            3. 同章节内按page升序排列,合并content和original_content
+            4. page取最小值
+            5. 合并后的chunk追加到原列表末尾,不删除原chunks
+            6. 增加 is_complete_field 字段标记为合并chunk(即使只有一个chunk也要标记)
+            7. 追加后按 chapter_classification 和标准章节顺序排序
+        """
+        try:
+            # 1. 找出包含完整性审查的章节分类
+            completeness_chapters = set()
+            for chapter_code, func_names in review_item_dict.items():
+                if 'check_completeness' in func_names or 'outline_check' in func_names:
+                    completeness_chapters.add(chapter_code)
+
+            if not completeness_chapters:
+                logger.info("没有包含完整性审查的章节,无需合并")
+                return chunks
+
+            logger.info(f"包含完整性审查的章节分类: {completeness_chapters}")
+
+            # 2. 筛选出需要合并的chunks(属于完整性审查章节的)
+            chunks_to_merge = []
+            for chunk in chunks:
+                chapter_code = chunk.get("chapter_classification", "")
+                if chapter_code in completeness_chapters:
+                    chunks_to_merge.append(chunk)
+
+            if not chunks_to_merge:
+                logger.info("没有找到需要合并的chunks")
+                return chunks
+
+            # 3. 按章节分组(章节定义:去除->及其之后的内容)
+            chapter_groups = {}
+            for chunk in chunks_to_merge:
+                chapter_full = chunk.get("chapter", chunk.get("section_label", ""))
+                # 提取章节名:去除->及其之后的内容
+                chapter_name = chapter_full.split("->")[0].strip() if "->" in chapter_full else chapter_full
+
+                if chapter_name not in chapter_groups:
+                    chapter_groups[chapter_name] = []
+                chapter_groups[chapter_name].append(chunk)
+
+            logger.info(f"按章节分组完成,共 {len(chapter_groups)} 个章节需要合并")
+
+            # 4. 合并每个章节的chunks
+            # 先给所有原chunk添加 is_complete_field: False
+            result_chunks = []
+            for chunk in chunks:
+                chunk_copy = chunk.copy()
+                chunk_copy["is_complete_field"] = False
+                result_chunks.append(chunk_copy)
+
+            for chapter_name, chapter_chunk_list in chapter_groups.items():
+                # 按page升序排列
+                chapter_chunk_list.sort(key=lambda x: int(x.get("page", 0)) if str(x.get("page", 0)).isdigit() else x.get("page", 0))
+
+                # 提取最小page
+                min_page = chapter_chunk_list[0].get("page", 0)
+
+                # 合并content和original_content
+                merged_content = "\n\n".join([
+                    chunk.get("content", "") for chunk in chapter_chunk_list
+                ])
+                merged_original_content = "\n\n".join([
+                    chunk.get("original_content", "") for chunk in chapter_chunk_list
+                ])
+
+                # 创建合并后的chunk(基于第一个chunk,保留所有字段)
+                merged_chunk = chapter_chunk_list[0].copy()
+
+                # 更新核心字段
+                # chunk_id 去除 -> 及其后的内容
+                original_chunk_id = merged_chunk.get('chunk_id', '')
+                clean_chunk_id = original_chunk_id.split("->")[0].strip() if "->" in original_chunk_id else original_chunk_id
+                merged_chunk["chunk_id"] = f"{clean_chunk_id}_merged"
+
+                merged_chunk["chapter"] = chapter_name  # 更新为合并后的章节名
+                merged_chunk["content"] = merged_content
+                merged_chunk["original_content"] = merged_original_content
+                merged_chunk["page"] = min_page
+                merged_chunk["is_complete_field"] = True  # 标记为合并chunk(即使只有一个chunk也要标记)
+
+                # 更新 section_label 和 title
+                merged_chunk["section_label"] = chapter_name
+                merged_chunk["title"] = chapter_name
+
+                # serial_number 设置为空字符串
+                merged_chunk["serial_number"] = ""
+
+                # 保留其他所有字段(如 element_tag, project_plan_type 等)
+                # element_tag 只保留第一个的
+                if "element_tag" in merged_chunk:
+                    merged_chunk["element_tag"] = chapter_chunk_list[0]["element_tag"].copy()
+                    # element_tag 中的 chunk_id 也要去除 -> 及其后的内容
+                    original_element_chunk_id = merged_chunk["element_tag"].get('chunk_id', '')
+                    clean_element_chunk_id = original_element_chunk_id.split("->")[0].strip() if "->" in original_element_chunk_id else original_element_chunk_id
+                    merged_chunk["element_tag"]["chunk_id"] = f"{clean_element_chunk_id}_merged"
+                    # element_tag 中的 serial_number 也设置为空字符串
+                    merged_chunk["element_tag"]["serial_number"] = ""
+
+                # 追加到结果列表
+                result_chunks.append(merged_chunk)
+
+                logger.info(f"合并章节 '{chapter_name}': {len(chapter_chunk_list)} 个chunk -> 1 个合并chunk (page={min_page})")
+
+            # 5. 按页码排序
+            result_chunks.sort(
+                key=lambda x: int(x.get("page", 0)) if str(x.get("page", 0)).isdigit() else x.get("page", 0)
+            )
+
+            logger.info(f"合并完成并按页码排序: 原始 {len(chunks)} 个chunk -> 最终 {len(result_chunks)} 个chunk(包含 {len(result_chunks) - len(chunks)} 个合并chunk)")
+
+            return result_chunks
+
+        except Exception as e:
+            logger.error(f"合并chunks失败: {str(e)}", exc_info=True)
+            # 出错时返回原始列表
+            return chunks

+ 1 - 0
data_pipeline/RAG_recall/rag_miluvs/foundation/ai/rag/retrieval/retrieval.py

@@ -42,6 +42,7 @@ class RetrievalManager:
             self.logger.info(f"开始混合检索")
 
             param = {'collection_name': collection_name}
+            self.logger.info(f"开始向量检索")
             results = self.vector_manager.hybrid_search(
                 param=param,
                 query_text=query_text,

+ 1 - 1
foundation/database/base/vector/milvus_vector.py

@@ -453,7 +453,7 @@ class MilvusVectorManager(BaseVectorDB):
         """
         try:
             collection_name = param.get('collection_name')
-
+            logger.info(f"开始 hybrid_search, collection_name: {collection_name}")
             # 使用预创建的连接,避免运行时竞争
             if collection_name in self._vectorstore_cache:
                 vectorstore = self._vectorstore_cache[collection_name]