Parcourir la source

v0.0.4-功能优化
- 增加chapter_code字段

WangXuMing il y a 3 mois
Parent
commit
86c175222d

+ 2 - 127
core/construction_review/component/ai_review_engine.py

@@ -245,7 +245,7 @@ class AIReviewEngine(BaseReviewer):
                     )
                 )
             )
-        if 'completeness_check' in self.task_info.get_review_config_list():
+        if 'c' in self.task_info.get_review_config_list():
             basic_tasks.append(
                 asyncio.create_task(
                     asyncio.wait_for(
@@ -629,7 +629,7 @@ class AIReviewEngine(BaseReviewer):
             os.makedirs("temp/ai_review_engine", exist_ok=True)
 
             with open("temp/ai_review_engine/enhance_with_parent_docs_grouped.json", "w", encoding='utf-8') as f:
-                json.dump(enhanced_results, f, ensure_ascii=False, indent=4)
+                json.dump(enhancement_result, f, ensure_ascii=False, indent=4)
             logger.info(f"[RAG增强] ✅ 已保存分组增强结果到 temp/ai_review_engine/enhance_with_parent_docs_grouped.json")
             logger.info(f"[RAG增强] 分组增强完成: {enhanced_pairs}/{total_pairs} 个查询对进行了增强")
             logger.info(f"[RAG增强] 成功增强 {enhanced_count} 个结果,使用了 {len(enhancement_result['parent_docs'])} 个父文档")
@@ -1133,131 +1133,6 @@ class AIReviewEngine(BaseReviewer):
             'outline_review_result': outline_review_result
         }
 
-    async def prep_basis_review(self, review_data: Dict[str, Any], trace_id: str,
-                                state: dict = None, stage_name: str = None) -> Dict[str, Any]:
-        """
-        执行编制依据审查:调用prep_basis_reviewer中的异步审查功能
-
-        Args:
-            review_data: 待审查的编制依据数据,包含编制依据文本内容
-            trace_id: 追踪ID
-            state: 状态字典
-            stage_name: 阶段名称
-
-        Returns:
-            审查结果字典,包含编制依据审查结果
-        """
-        start_time = time.time()
-        try:
-            logger.info(f"开始编制依据审查,trace_id: {trace_id}")
-
-            # 提取关键数据
-            review_content = review_data.get('content', '')
-            max_concurrent = review_data.get('max_concurrent', 4)
-
-            # 添加调试信息
-            logger.info(f"提取的编制依据内容长度: {len(review_content)}")
-            if review_content:
-                logger.info(f"编制依据内容预览: {review_content[:50]}...")
-            else:
-                logger.warning("编制依据内容为空,将跳过审查")
-
-            # 检查是否有有效的编制依据内容
-            if not review_content or not review_content.strip():
-                logger.warning("没有可执行的编制依据审查任务")
-                return {
-                    "prep_basis_review_results": {
-                        "review_results": [],
-                        "review_content": review_content,
-                        "total_basis_items": 0,
-                        "valid_items": 0,
-                        "standard_items": 0,
-                        "execution_time": time.time() - start_time,
-                        "error_message": "编制依据内容为空,无法进行审查"
-                    }
-                }
-
-            # 调用prep_basis_reviewer中的异步审查方法
-            logger.info("开始调用编制依据异步审查...")
-
-            try:
-                # 使用信号量控制并发
-                async with self.semaphore:
-                    # 从state中获取progress_manager和callback_task_id
-                    progress_manager = state.get('progress_manager') if state else None
-                    callback_task_id = state.get('callback_task_id') if state else None
-
-                    # 调用带有SSE推送功能的review_all方法
-                    from core.construction_review.component.reviewers.prep_basis_reviewer import BasisReviewService
-                    async with BasisReviewService(max_concurrent=max_concurrent) as service:
-                        prep_basis_review_results = await service.review_all(
-                            review_content,
-                            collection_name="already_basis",
-                            progress_manager=progress_manager,
-                            callback_task_id=callback_task_id
-                        )
-
-                    logger.info(f"编制依据审查完成,批次数量: {len(prep_basis_review_results)}")
-
-                    # 统计审查结果
-                    total_items = 0
-                    valid_items = 0
-                    standard_items = 0
-
-                    for batch in prep_basis_review_results:
-                        if isinstance(batch, list):
-                            total_items += len(batch)
-                            for item in batch:
-                                if isinstance(item, dict):
-                                    valid_items += 1
-                                    if item.get('is_standard', False):
-                                        standard_items += 1
-
-                    logger.info(f"审查统计 - 总编制依据: {total_items}, 有效项: {valid_items}, 标准项: {standard_items}")
-
-            except Exception as e:
-                logger.error(f"编制依据异步审查失败: {str(e)}")
-                return {
-                    "prep_basis_review_results": {
-                        "review_results": [],
-                        "review_content": review_content,
-                        "total_basis_items": 0,
-                        "valid_items": 0,
-                        "standard_items": 0,
-                        "execution_time": time.time() - start_time,
-                        "error_message": f"编制依据审查失败: {str(e)}"
-                    }
-                }
-
-            # 返回完整结果
-            return {
-                "prep_basis_review_results": {
-                    "review_results": prep_basis_review_results,
-                    "review_content": review_content,
-                    "total_basis_items": total_items,
-                    "valid_items": valid_items,
-                    "standard_items": standard_items,
-                    "execution_time": time.time() - start_time,
-                    "error_message": None
-                }
-            }
-
-        except Exception as e:
-            execution_time = time.time() - start_time
-            error_msg = f"编制依据审查失败: {str(e)}"
-            logger.error(error_msg, exc_info=True)
-
-            return {
-                "prep_basis_review_results": {
-                    "review_results": [],
-                    "review_content": review_data.get('content', ''),
-                    "total_basis_items": 0,
-                    "valid_items": 0,
-                    "standard_items": 0,
-                    "execution_time": execution_time,
-                    "error_message": error_msg
-                }
-            }
 
     async def reference_basis_reviewer(self, review_data: Dict[str, Any], trace_id: str,
                                 state: dict = None, stage_name: str = None) -> Dict[str, Any]:

+ 98 - 28
core/construction_review/component/infrastructure/parent_tool.py

@@ -63,12 +63,66 @@ def fetch_parent_document(
         return None
 
 
+def fetch_parent_chunks_by_parent_id(
+    milvus_manager,
+    parent_id: str,
+    collection_name: str = "rag_parent_hybrid",  # 子块集合
+    output_fields: List[str] = None
+) -> Optional[List[Dict[str, Any]]]:
+    """
+    通过 parent_id 召回所有子块片段
+
+    Args:
+        milvus_manager: MilvusManager 实例
+        parent_id: 父文档ID
+        collection_name: 集合名称 (默认为子块集合 rag_children_hybrid)
+        output_fields: 需要返回的字段列表
+
+    Returns:
+        子块片段列表,按 pk 排序,如果不存在返回 None
+    """
+    if output_fields is None:
+        output_fields = ["pk", "text", "parent_id", "file_name", "title"]
+
+    try:
+        rows = milvus_manager.condition_query(
+            collection_name=collection_name,
+            filter=f"parent_id == '{parent_id}'",
+            output_fields=output_fields,
+            limit=1000,  # 足够大的数字,获取所有片段
+        )
+        
+        if not rows:
+            logger.warning(f"[父文档工具] parent_id {parent_id} 没有召回任何片段")
+            return None
+
+        # 转换为字典列表
+        chunks = []
+        for row in rows:
+            chunk = {}
+            for field in output_fields:
+                if field in row:
+                    value = row[field]
+                    # 处理字节类型
+                    if isinstance(value, bytes):
+                        value = value.decode('utf-8')
+                    chunk[field] = value
+            chunks.append(chunk)
+
+        logger.info(f"[父文档工具] parent_id {parent_id} 召回了 {len(chunks)} 个片段")
+        return chunks
+
+    except Exception as e:
+        logger.error(f"[父文档工具] 召回 parent_id {parent_id} 的片段失败: {e}")
+        return None
+
+
 
 def enhance_with_parent_docs_grouped(
     milvus_manager,
     bfp_result_lists: List,
     score_threshold: float = 0.5,
-    max_parents_per_pair: int = 2,
+    max_parents_per_pair: int = 3,
     # max_parent_text_length: Optional[int] = None
 ) -> Dict[str, Any]:
     """
@@ -114,56 +168,72 @@ def enhance_with_parent_docs_grouped(
             logger.info(f"[分组增强] 查询对 {pair_idx}: 所有结果分数均低于 {score_threshold},跳过")
             continue
 
-        # 2. 提取父ID(去重,限制数量)
+        # 2. 对 parent_id 去重
         parent_ids = list(set([
             r.get('metadata', {}).get('parent_id')
-            for r in high_score_results[:max_parents_per_pair]
+            for r in high_score_results
             if r.get('metadata', {}).get('parent_id')
         ]))
 
-        if not parent_ids:
-            logger.warning(f"[分组增强] 查询对 {pair_idx}: 没有有效的parent_id,跳过")
-            continue
+        # 从第一个结果中提取 source_entity(用于日志)
+        source_entity = high_score_results[0].get('source_entity', '') if high_score_results else ''
 
-        # 3. 查询父文档内容
-        parent_docs = []
+        # 限制父文档数量
+        parent_ids = parent_ids[:max_parents_per_pair]
+        logger.debug(f"实体:{source_entity},[分组增强] 父文档数量限制为 {max_parents_per_pair},实际数量为 {len(parent_ids)},{parent_ids}")
+        # 3. 用 parent_id 召回片段并按 pk 排序拼接
+        parent_id_to_doc = {}
         for pid in parent_ids:
-            doc = fetch_parent_document(milvus_manager, str(pid))
-            if doc and doc.get('text'):
-                text = doc['text']
-                # if max_parent_text_length and len(text) > max_parent_text_length:
-                #     text = text[:max_parent_text_length] + "\n...(已截断)"
-                parent_docs.append({'parent_id': pid, 'text': text})
-
-        if not parent_docs:
-            logger.warning(f"[分组增强] 查询对 {pair_idx}: 父文档查询失败,跳过")
+            # 用 parent_id 查询 Milvus,召回该父文档的所有片段
+            chunks = fetch_parent_chunks_by_parent_id(milvus_manager, pid)
+
+            if chunks:
+                # 按 pk 排序
+                sorted_chunks = sorted(chunks, key=lambda x: x.get('pk', 0))
+                logger.debug(f"排序结果:{pid}:{sorted_chunks}")
+                # 提取元数据(从第一个片段获取)
+                first_chunk = sorted_chunks[0]
+                file_name = first_chunk.get('file_name', '')
+                title = first_chunk.get('title', '')
+
+                # 构建头部信息
+                header = f"【文件】{file_name}\n【标题】{title}\n" if file_name or title else ""
+
+                # 拼接所有片段的 text_content
+                combined_text = header + "\n".join([c.get('text', '') for c in sorted_chunks])
+                parent_id_to_doc[str(pid)] = combined_text
+                logger.debug(f"[分组增强] parent_id={str(pid)[:8]}... 召回并拼接了 {len(sorted_chunks)} 个片段,文件={file_name}")
+
+        if not parent_id_to_doc:
+            logger.warning(f"[分组增强] 查询对 {pair_idx}: 所有父文档召回失败,跳过")
             continue
 
-        # 4. 拼接父文档内容
-        combined_text = "\n".join([f"【参考文档 {i+1}】\n{d['text']}" for i, d in enumerate(parent_docs)])
-
-        # 5. 只保留并增强高分结果
+        # 4. 只保留并增强高分结果(每个结果只用其对应的父文档增强)
         enhanced_list = []
-        for result in result_list:
-            if result.get('bfp_rerank_score', 0) >= score_threshold:
+        for result in high_score_results:
+            parent_id = str(result.get('metadata', {}).get('parent_id'))
+            if parent_id in parent_id_to_doc:
+                # 用该结果对应的父文档增强
+                parent_text = parent_id_to_doc[parent_id]
                 enhanced_list.append({
-                    'text_content': result.get('text_content', '') + f"\n{combined_text}\n",
+                    'text_content': result.get('text_content', '') + f"\n【参考文档】\n{parent_text}\n",
                     'metadata': result.get('metadata', {}),
                     'hybrid_similarity': result.get('hybrid_similarity'),
                     'rerank_score': result.get('rerank_score'),
                     'bfp_rerank_score': result.get('bfp_rerank_score'),
                     'bfp_rerank_parent_id': result.get('bfp_rerank_parent_id', ''),
                     'source_entity': result.get('source_entity', ''),
-                    'enhanced': True,
-                    'parent_docs_count': len(parent_docs)
+                    'enhanced': True
                 })
 
         if enhanced_list:
             enhanced_results.append(enhanced_list)
-            all_parent_docs.extend(parent_docs)
+            # 构建父文档列表用于统计
+            for pid, text in parent_id_to_doc.items():
+                all_parent_docs.append({'parent_id': pid, 'text': text})
             enhanced_pairs_count += 1
             total_enhanced_count += len(enhanced_list)
-            logger.info(f"[分组增强] 查询对 {pair_idx}: 保留 {len(enhanced_list)} 个高分结果")
+            logger.info(f"[分组增强] 查询对 {pair_idx}: 保留 {len(enhanced_list)} 个高分结果,使用 {len(parent_id_to_doc)} 个父文档")
 
     logger.info(f"[分组增强] 完成: {enhanced_pairs_count}/{len(bfp_result_lists)} 个查询对,{total_enhanced_count} 个结果")
 

+ 36 - 36
core/construction_review/component/reviewers/outline_reviewer.py

@@ -59,28 +59,28 @@ class OutlineReviewer:
         """
         start_time = time.time()
         try:
-            logger.info(f"开始两阶段大纲审查,trace_id: {trace_id}")
+            logger.debug(f"开始两阶段大纲审查,trace_id: {trace_id}")
 
             # 提取关键数据
             overall_outline = review_data.get('overall_outline', '')
             detailed_outline = review_data.get('detailed_outline', [])
 
-            # 添加调试信息
-            logger.info(f"提取的数据 - overall_outline长度: {len(overall_outline)}, detailed_outline数量: {len(detailed_outline)}")
-            if overall_outline:
-                logger.info(f"overall_outline内容预览: {overall_outline[:100]}...")
-            else:
-                logger.warning("overall_outline为空,将跳过阶段1审查")
+            # # 添加调试信息
+            # logger.debug(f"提取的数据 - overall_outline长度: {len(overall_outline)}, detailed_outline数量: {len(detailed_outline)}")
+            # if overall_outline:
+            #     logger.debug(f"overall_outline内容预览: {overall_outline[:100]}...")
+            # else:
+            #     logger.warning("overall_outline为空,将跳过阶段1审查")
 
             # 并发执行阶段1和阶段2
-            logger.info("开始并发执行两阶段大纲审查...")
+            logger.debug("开始并发执行两阶段大纲审查...")
 
             # 创建并发任务
             tasks = []
 
             # 阶段1:一级大纲完整性审查(仅在有数据时执行)
             if overall_outline and overall_outline.strip():
-                logger.info("启动阶段1:一级大纲完整性审查...")
+                logger.debug("启动阶段1:一级大纲完整性审查...")
                 # 创建Task对象
                 overall_task = asyncio.create_task(
                     self._overall_completeness_review(overall_outline, trace_id, state, stage_name)
@@ -89,7 +89,7 @@ class OutlineReviewer:
 
             # 阶段2:次级大纲逐项审查
             if detailed_outline:
-                logger.info("启动阶段2:次级大纲逐项审查...")
+                logger.debug("启动阶段2:次级大纲逐项审查...")
                 # 创建Task对象
                 detailed_task = asyncio.create_task(
                     self._detailed_item_review(detailed_outline, trace_id, state, stage_name)
@@ -113,7 +113,7 @@ class OutlineReviewer:
                 stage2_timeout = (50 * len(detailed_outline) / 3) + 60 if detailed_outline else 0
                 total_timeout = max(stage1_timeout, stage2_timeout) + 30  # 并发执行,取最大值+缓冲
 
-                logger.info(f"[大纲审查] 两阶段整体超时设置: {total_timeout:.0f}秒")
+                logger.debug(f"[大纲审查] 两阶段整体超时设置: {total_timeout:.0f}秒")
 
                 # 提取任务列表
                 task_list = [task for _, task in tasks]
@@ -138,11 +138,11 @@ class OutlineReviewer:
                     stage_name_key = task_to_stage.get(task)
                     try:
                         result = task.result()
-                        logger.info(f"[大纲审查] {stage_name_key} task.result()返回, 类型: {type(result).__name__}")
+                        logger.debug(f"[大纲审查] {stage_name_key} task.result()返回, 类型: {type(result).__name__}")
                         if isinstance(result, dict):
-                            logger.info(f"[大纲审查] result是字典, 包含键: {list(result.keys())}")
+                            logger.debug(f"[大纲审查] result是字典, 包含键: {list(result.keys())}")
                         elif isinstance(result, list):
-                            logger.info(f"[大纲审查] result是列表, 长度: {len(result)}")
+                            logger.debug(f"[大纲审查] result是列表, 长度: {len(result)}")
                         stage_results[stage_name_key] = result
                     except asyncio.CancelledError:
                         logger.error(f"[大纲审查] {stage_name_key} 阶段任务被取消")
@@ -162,7 +162,7 @@ class OutlineReviewer:
 
                 for stage_name_key, _ in tasks:
                     result = stage_results.get(stage_name_key)
-                    logger.info(f"[大纲审查] 处理阶段: stage_name={stage_name_key}, result类型={type(result).__name__ if result else 'None'}")
+                    logger.debug(f"[大纲审查] 处理阶段: stage_name={stage_name_key}, result类型={type(result).__name__ if result else 'None'}")
 
                     if stage_name_key == "overall":
                         if isinstance(result, Exception):
@@ -175,7 +175,7 @@ class OutlineReviewer:
                             }
                         elif isinstance(result, dict):
                             overall_review_result = result
-                            logger.info(f"阶段1完成,成功: {overall_review_result.get('success', False)}")
+                            logger.debug(f"阶段1完成,成功: {overall_review_result.get('success', False)}")
                         else:
                             # 处理意外的返回类型(如列表)
                             logger.error(f"阶段1返回了意外的类型: {type(result).__name__}")
@@ -192,9 +192,9 @@ class OutlineReviewer:
                             detailed_review_results = []
                         else:
                             detailed_review_results = result
-                            logger.info(f"阶段2完成,审查项目数: {len(detailed_review_results)}")
+                            logger.debug(f"阶段2完成,审查项目数: {len(detailed_review_results)}")
 
-                logger.info("两阶段并发审查全部完成")
+                logger.debug("两阶段并发审查全部完成")
 
             # 返回完整结果
             return {
@@ -241,7 +241,7 @@ class OutlineReviewer:
                     "parsed_result": None
                 }
 
-            logger.info("执行一级大纲完整性审查...")
+            logger.debug("执行一级大纲完整性审查...")
 
             # 构建提示词参数
             prompt_kwargs = {}
@@ -273,9 +273,9 @@ class OutlineReviewer:
 
             if json_data and isinstance(json_data, list):
                 for item in json_data:
-                    overall_completeness_result.append(self.inter_tool._create_issue_item(item, "completeness_check"))
+                    overall_completeness_result.append(self.inter_tool._create_issue_item(item, "completeness_check","outline"))
             elif json_data and isinstance(json_data, dict):
-                overall_completeness_result.append(self.inter_tool._create_issue_item(json_data, "completeness_check"))
+                overall_completeness_result.append(self.inter_tool._create_issue_item(json_data, "completeness_check","outline"))
             #filtered_issues = [r for r in overall_completeness_result if self._is_non_compliant_item(r)]
             # 只统计exist_issue为true的项目数量
             issue_count = sum(1 for item in overall_completeness_result if item.get('exist_issue', False))
@@ -297,7 +297,7 @@ class OutlineReviewer:
                         issues=issues_copy,
                         event_type="processing"  # 使用专门的事件类型
                     )
-                    logger.info("SSE推送成功: 一级大纲完整性审查完成")
+                    logger.debug("SSE推送成功: 一级大纲完整性审查完成")
                 except Exception as e:
                     logger.error(f"SSE推送失败: 一级大纲完整性审查, 错误: {str(e)}")
                     # 不抛出异常,避免影响主流程
@@ -308,7 +308,7 @@ class OutlineReviewer:
                 "overall_outline": overall_outline,
                 "parsed_result": overall_completeness_result
             }
-            logger.info(f"[大纲审查-阶段1] 准备返回final_result, 类型: {type(final_result).__name__}, 包含键: {list(final_result.keys())}")
+            logger.debug(f"[大纲审查-阶段1] 准备返回final_result, 类型: {type(final_result).__name__}, 包含键: {list(final_result.keys())}")
             return final_result
 
         except Exception as e:
@@ -342,7 +342,7 @@ class OutlineReviewer:
             logger.warning("没有有效的次级大纲项目")
             return []
 
-        logger.info(f"开始次级大纲并发审查,有效项目数量: {len(valid_items)}")
+        logger.debug(f"开始次级大纲并发审查,有效项目数量: {len(valid_items)}")
 
         # 创建并发审查任务 - 降低并发数避免模型服务过载
         semaphore = asyncio.Semaphore(3)  # 限制并发数为3,避免过载
@@ -361,7 +361,7 @@ class OutlineReviewer:
         estimated_time_per_task = 50  # 秒
         total_timeout = (estimated_time_per_task * len(tasks) / 3) + 60  # 加60秒缓冲
 
-        logger.info(f"[大纲审查] 设置整体超时: {total_timeout:.0f}秒,任务数: {len(tasks)}")
+        logger.debug(f"[大纲审查] 设置整体超时: {total_timeout:.0f}秒,任务数: {len(tasks)}")
 
         done, pending = await asyncio.wait(tasks, timeout=total_timeout)
 
@@ -383,7 +383,7 @@ class OutlineReviewer:
                 logger.error(f"[大纲审查] 任务执行失败: {str(e)}", exc_info=True)
                 results.append(e)
 
-        logger.info(f"并发审查完成,总任务数: {len(tasks)}, 成功: {len(done)}, 超时: {len(pending)}")
+        logger.debug(f"并发审查完成,总任务数: {len(tasks)}, 成功: {len(done)}, 超时: {len(pending)}")
 
         # 处理结果
         detailed_review_results = []
@@ -429,9 +429,9 @@ class OutlineReviewer:
         """
         async with semaphore:
             try:
-                logger.info(f"开始审查第{item_index+1}项: {outline_item[:50]}...")
+                logger.debug(f"开始审查第{item_index+1}项: {outline_item[:50]}...")
                 result = await self._single_item_review(outline_item, trace_id, item_index, state, stage_name)
-                logger.info(f"完成审查第{item_index+1}项,成功: {result.get('success', False)}")
+                logger.debug(f"完成审查第{item_index+1}项,成功: {result.get('success', False)}")
                 return result
             except Exception as e:
                 logger.error(f"第{item_index+1}项审查失败: {str(e)}")
@@ -458,7 +458,7 @@ class OutlineReviewer:
 
         for i, outline_item in valid_items:
             try:
-                logger.info(f"串行审查第{i+1}项: {outline_item[:50]}...")
+                logger.debug(f"串行审查第{i+1}项: {outline_item[:50]}...")
                 item_review_result = await self._single_item_review(outline_item, trace_id, i, state, stage_name)
 
                 detailed_review_results.append({
@@ -537,17 +537,17 @@ class OutlineReviewer:
 
         if json_data and isinstance(json_data, list):
             for item in json_data:
-                parsed_result.append(self.inter_tool._create_issue_item(item, 'completeness_check'))
+                parsed_result.append(self.inter_tool._create_issue_item(item, 'completeness_check','outline'))
         elif json_data and isinstance(json_data, dict):
-            parsed_result.append(self.inter_tool._create_issue_item(json_data, 'completeness_check'))
+            parsed_result.append(self.inter_tool._create_issue_item(json_data, 'completeness_check','outline'))
 
         # with open(f"temp\outline_result_temp\次级大纲审查中间结果.json", "a", encoding="utf-8") as f:
         #     f.write(response_text)
         # # 发送单项审查完成进度
-        logger.info(f"state参数检查: state存在={state is not None}")
+        # logger.debug(f"state参数检查: state存在={state is not None}")
         if state:
-            logger.info(f"state keys: {list(state.keys())}")
-            logger.info(f"progress_manager存在: {'progress_manager' in state}")
+            logger.debug(f"state keys: {list(state.keys())}")
+            logger.debug(f"progress_manager存在: {'progress_manager' in state}")
         if state and state.get("progress_manager"):
             # 只统计exist_issue为true的项目数量
             issue_count = sum(1 for item in parsed_result if item.get('exist_issue', False))
@@ -563,8 +563,8 @@ class OutlineReviewer:
                     issues=parsed_result,
                     event_type="processing"  # 使用专门的事件类型
                 )
-                logger.info(f"SSE推送成功: 第{item_index+1}项{category}审查完成")
-                logger.info(f"发送单项审查完成进度: 第{item_index+1}项{category}审查完成")
+                logger.debug(f"SSE推送成功: 第{item_index+1}项{category}审查完成")
+                logger.debug(f"发送单项审查完成进度: 第{item_index+1}项{category}审查完成")
             except Exception as e:
                 logger.error(f"SSE推送失败: 第{item_index+1}项{category}, 错误: {str(e)}")
                 # 不抛出异常,避免影响主流程

+ 0 - 626
core/construction_review/component/reviewers/prep_basis_reviewer.py

@@ -1,626 +0,0 @@
-import os
-import sys
-import json
-import re
-import time
-from typing import Any, Dict, List, Optional
-import asyncio
-
-
-
-# 导入必要的依赖
-try:
-    from pymilvus import connections, Collection
-    from foundation.infrastructure.config.config import config_handler
-    from foundation.ai.models.model_handler import model_handler as mh
-    from foundation.ai.agent.generate.model_generate import generate_model_client
-    from core.construction_review.component.reviewers.utils.prompt_loader import prompt_loader
-    from core.construction_review.component.reviewers.utils.inter_tool import InterTool
-    from foundation.observability.logger.loggering import server_logger as logger
-except ImportError as e:
-    logger.warning(f"Warning: 无法导入依赖: {e}")
-    # 设置默认值,避免程序崩溃
-    mh = None
-    generate_model_client = None
-    prompt_loader = None
-    logger = None
-    InterTool = None
-
-
-class TextProcessor:
-    """文本处理工具类"""
-
-    @staticmethod
-    def extract_basis(text: str) -> List[str]:
-        """从文本中提取编制依据"""
-        pattern = re.compile(r'《[^》]+》(?:([^)]+))?')
-        return pattern.findall(text)
-
-
-
-
-class StandardizedResponseProcessor:
-    """标准化响应处理器 - 统一为outline_reviewer.py格式"""
-
-    def __init__(self):
-        if InterTool:
-            self.inter_tool = InterTool()
-        else:
-            self.inter_tool = None
-
-    def process_llm_response(self, response_text: str, check_name: str = "编制依据检查") -> List[Dict[str, Any]]:
-        """
-        处理LLM响应,返回标准格式
-
-        Args:
-            response_text: LLM原始响应文本
-            check_name: 检查项名称
-
-        Returns:
-            List[Dict]: 标准格式的审查结果列表
-        """
-        if not self.inter_tool:
-            logger.warning("InterTool未初始化,返回空结果")
-            return []
-
-        try:
-            # 使用inter_tool提取JSON数据
-            json_data = self.inter_tool._extract_json_data(response_text)
-            parsed_result = []
-
-            if json_data and isinstance(json_data, list):
-                for item in json_data:
-                    parsed_result.append(self.inter_tool._create_issue_item(item, check_name))
-            elif json_data and isinstance(json_data, dict):
-                parsed_result.append(self.inter_tool._create_issue_item(json_data, check_name))
-
-            return parsed_result
-
-        except Exception as e:
-            logger.error(f"处理LLM响应失败: {str(e)}")
-            # 返回一个错误条目
-            return [{
-                "check_item": check_name,
-                "check_result": {"error": str(e)},
-                "exist_issue": True,
-                "risk_info": {"risk_level": "medium"}
-            }]
-
-
-class MessageBuilder:
-    """消息构建工具类"""
-
-    def __init__(self, prompt_loader_instance=None):
-        self.prompt_loader = prompt_loader_instance
-
-    def get_prompt_template(self):
-        """获取ChatPromptTemplate"""
-        if not self.prompt_loader:
-            # 返回默认模板
-            from langchain_core.prompts import ChatPromptTemplate
-            return ChatPromptTemplate.from_messages([
-                ("system", "你是专业的编制依据审查专家,负责识别和评估编制依据的完整性和有效性。"),
-                ("user", "请审查以下编制依据:{review_content} /no_think")
-            ])
-
-        try:
-            # 强制重新加载提示词,避免缓存问题
-            template = self.prompt_loader.get_prompt_template(
-                reviewer_type="prep_basis",
-                prompt_name="basis_status_check",
-                force_reload=True  # 强制重新加载
-            )
-
-            # 验证返回的是ChatPromptTemplate对象
-            if hasattr(template, 'format_messages'):
-                logger.info(f"成功加载编制依据审查提示词")
-                return template
-            else:
-                logger.warning(f" PromptLoader返回了意外类型: {type(template)}")
-                # 返回默认模板
-                from langchain_core.prompts import ChatPromptTemplate
-                return ChatPromptTemplate.from_messages([
-                    ("system", "你是专业的编制依据审查专家,负责识别和评估编制依据的完整性和有效性。"),
-                    ("user", "请审查以下编制依据:{review_content} /no_think")
-                ])
-
-        except Exception as e:
-            logger.warning(f" 无法加载提示词模板,使用默认格式: {e}")
-            from langchain_core.prompts import ChatPromptTemplate
-            return ChatPromptTemplate.from_messages([
-                ("system", "你是专业的编制依据审查专家,负责识别和评估编制依据的完整性和有效性。"),
-                ("user", "请审查以下编制依据:{review_content} /no_think")
-            ])
-
-    def build_user_content(
-        self,
-        basis_items: List[str],
-        grouped_candidates: List[List[Dict[str, Any]]],
-    ) -> str:
-        """构建用户内容"""
-        items = []
-        for raw, cands in zip(basis_items, grouped_candidates):
-            items.append({
-                "raw_text": raw,
-                "candidates": [
-                    {
-                        "id": c.get("id"),
-                        "similarity": c.get("similarity"),
-                        "text": c.get("text") or c.get("text_content") or "",
-                    }
-                    for c in (cands or [])
-                ],
-            })
-
-        user_content = {
-            "items": items,
-            "required_output_example": [
-                {"is_standard": False, "status": "", "meg": ""} for _ in items
-            ],
-        }
-
-        return json.dumps(user_content, ensure_ascii=False)
-
-
-class BasisSearchEngine:
-    """编制依据向量搜索引擎"""
-
-    def __init__(self):
-        self.emdmodel = None
-        self._initialize()
-
-    def _initialize(self):
-        """初始化搜索引擎"""
-        try:
-            # 连接配置
-            self.host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
-            self.port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
-            self.user = config_handler.get('milvus', 'MILVUS_USER')
-            self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
-
-            # 连接到 Milvus
-            connections.connect(
-                alias="default",
-                host=self.host,
-                port=self.port,
-                user=self.user,
-                db_name="lq_db"
-            )
-            logger.info(f" 成功连接到 Milvus {self.host}:{self.port}")
-
-            # 初始化嵌入模型
-            if mh:
-                self.emdmodel = mh._get_lq_qwen3_8b_emd()
-                logger.info(" 嵌入模型初始化成功")
-            else:
-                raise ImportError("无法获取嵌入模型")
-
-        except Exception as e:
-            logger.error(f" BasisSearchEngine 初始化失败: {e}")
-            self.emdmodel = None
-
-    def text_to_vector(self, text: str) -> List[float]:
-        """将文本转换为向量"""
-        if not self.emdmodel:
-            raise ValueError("嵌入模型未初始化")
-
-        try:
-            embedding = self.emdmodel.embed_query(text)
-            return embedding.tolist() if hasattr(embedding, 'tolist') else list(embedding)
-        except Exception as e:
-            logger.error(f"文本向量化失败: {e}")
-            raise
-
-    def similarity_search(self, collection_name: str, query_text: str,
-                         min_score: float = 0.3, top_k: int = 3,
-                         filters: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
-        """执行相似度搜索"""
-        try:
-            if not self.emdmodel:
-                raise ValueError("搜索器未正确初始化")
-
-            # 获取集合
-            collection = Collection(collection_name)
-            collection.load()
-
-            # 转换查询文本为向量
-            query_embedding = self.text_to_vector(query_text)
-
-            # 搜索参数
-            search_params = {
-                "metric_type": "COSINE",
-                "params": {"nprobe": 10}
-            }
-
-            # 构建过滤表达式
-            filter_expr = self._create_filter(filters)
-
-            # 执行搜索
-            results = collection.search(
-                data=[query_embedding],
-                anns_field="embedding",
-                param=search_params,
-                limit=top_k,
-                expr=filter_expr,
-                output_fields=["text", "metadata"]
-            )
-
-            # 格式化结果
-            formatted_results = []
-            for hits in results:
-                for hit in hits:
-                    formatted_results.append({
-                        'id': hit.id,
-                        'text': hit.entity.get('text', ''),
-                        'text_content': hit.entity.get('text', ''),
-                        'metadata': hit.entity.get('metadata', {}),
-                        'distance': hit.distance,
-                        'similarity': 1 - hit.distance
-                    })
-
-            # 过滤低相似度结果
-            filtered_results = [
-                result for result in formatted_results
-                if result['similarity'] >= min_score
-            ]
-
-            return filtered_results
-
-        except Exception as e:
-            logger.error(f" 相似度搜索失败: {e}")
-            return []
-
-    def _create_filter(self, filters: Dict[str, Any]) -> str:
-        """创建过滤条件"""
-        if not filters:
-            return ""
-
-        conditions = []
-        for key, value in filters.items():
-            if isinstance(value, str):
-                conditions.append(f'metadata["{key}"] == "{value}"')
-            elif isinstance(value, (int, float)):
-                conditions.append(f'metadata["{key}"] == {value}')
-            else:
-                conditions.append(f'metadata["{key}"] == "{value}"')
-
-        return " and ".join(conditions)
-
-
-class LLMReviewClient:
-    """LLM审查客户端"""
-
-    async def review_basis(self, prompt_template, user_content: str, trace_id: str = None) -> str:
-        """编制依据审查模型调用"""
-        if not generate_model_client:
-            raise ImportError("generate_model_client 未初始化,无法调用模型")
-
-        if not trace_id:
-            trace_id = f"prep_basis_review_{int(time.time())}"
-        try:
-            from langchain_core.prompts import ChatPromptTemplate
-
-            final_prompt_obj = None
-
-            # 情况1: 如果传入的是 PromptTemplate 对象
-            if hasattr(prompt_template, 'partial'):
-                # 使用 partial 注入变量,保持对象为 Template 类型
-                try:
-                    final_prompt_obj = prompt_template.partial(review_content=user_content)
-                except Exception:
-                    # 如果模板不需要变量或注入失败,保持原样
-                    final_prompt_obj = prompt_template
-
-            # 情况2: 如果传入的是 List (消息列表)
-            elif isinstance(prompt_template, list):
-                final_prompt_obj = ChatPromptTemplate.from_messages(prompt_template)
-                try:
-                    final_prompt_obj = final_prompt_obj.partial(review_content=user_content)
-                except Exception:
-                    pass
-
-            # 情况3: 兜底默认值
-            else:
-                default_template = ChatPromptTemplate.from_messages([
-                    ("system", "你是专业的编制依据审查专家,负责识别和评估编制依据的完整性和有效性。"),
-                    ("user", "请审查以下编制依据:{review_content} /no_think")
-                ])
-                final_prompt_obj = default_template.partial(review_content=user_content)
-
-            # 构建任务信息 - 传入 Template 对象而不是 List
-            task_prompt_info = {
-                "task_prompt": final_prompt_obj,
-                "task_name": "规范性引用文件识别与状态判断"
-            }
-
-            # 调用统一模型客户端 - 编制依据审查设置90秒超时
-            response = await generate_model_client.get_model_generate_invoke(
-                trace_id=trace_id,
-                task_prompt_info=task_prompt_info,
-                timeout=90
-            )
-            return response
-
-        except Exception as e:
-            logger.error(f" 模型调用准备阶段失败: {e}")
-            # 返回空JSON数组字符串以防解析崩溃
-            return "[]"
-        # ==================== 修复结束 ====================
-
-
-class BasisReviewService:
-    """编制依据审查服务核心类"""
-
-    def __init__(self, max_concurrent: int = 4):
-        self.search_engine = BasisSearchEngine()
-        self.llm_client = LLMReviewClient()
-        self.text_processor = TextProcessor()
-        self.response_processor = StandardizedResponseProcessor()  # 标准化处理器
-        # 确保使用最新的prompt_loader实例
-        from core.construction_review.component.reviewers.utils.prompt_loader import PromptLoader
-        fresh_prompt_loader = PromptLoader()
-        self.message_builder = MessageBuilder(fresh_prompt_loader)
-        self.max_concurrent = max_concurrent
-        self._semaphore = None
-
-    async def __aenter__(self):
-        """异步上下文管理器入口"""
-        if self._semaphore is None:
-            self._semaphore = asyncio.Semaphore(self.max_concurrent)
-        return self
-
-    async def __aexit__(self, exc_type, exc_val, exc_tb):
-        """异步上下文管理器出口"""
-        return False
-
-    async def review_batch(
-        self,
-        basis_items: List[str],
-        collection_name: str = "already_basis",
-        filters: Optional[Dict[str, Any]] = None,
-        min_score: float = 0.3,
-        top_k_each: int = 3,
-    ) -> List[Dict[str, Any]]:
-        """异步批次审查(通常3条)"""
-        basis_items = [x for x in (basis_items or []) if isinstance(x, str) and x.strip()]
-        if not basis_items:
-            return []
-
-        async with self._semaphore:
-            try:
-                # 并发搜索每个编制依据
-                search_tasks = []
-                for basis in basis_items:
-                    task = asyncio.create_task(
-                        self._async_search_basis(basis, collection_name, min_score, top_k_each, filters)
-                    )
-                    search_tasks.append(task)
-
-                # 等待所有搜索完成
-                search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
-
-                grouped_candidates = []
-                for i, result in enumerate(search_results):
-                    if isinstance(result, Exception):
-                        logger.error(f" 搜索失败 '{basis_items[i]}': {result}")
-                        grouped_candidates.append([])
-                    else:
-                        grouped_candidates.append(result)
-
-                # 构建提示词模板和用户内容
-                prompt_template = self.message_builder.get_prompt_template()
-                user_content = self.message_builder.build_user_content(basis_items, grouped_candidates)
-                trace_id = f"prep_basis_batch_{int(time.time())}"
-                llm_out = await self.llm_client.review_basis(prompt_template, user_content, trace_id)
-
-                # 使用标准化处理器处理响应
-                standardized_result = self.response_processor.process_llm_response(llm_out, "reference_check")
-
-                # 统计问题数量
-                issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
-                logger.info(f"编制依据批次审查完成:总计 {len(basis_items)} 项,发现问题 {issue_count} 项")
-
-                return standardized_result
-
-            except Exception as e:
-                logger.error(f" 批次处理失败: {e}")
-                return [{
-                    "check_item": "reference_check",
-                    "check_result": {"error": str(e), "basis_items": basis_items},
-                    "exist_issue": True,
-                    "risk_info": {"risk_level": "high"}
-                }]
-
-    
-    async def _async_search_basis(
-        self,
-        basis: str,
-        collection_name: str,
-        min_score: float,
-        top_k_each: int,
-        filters: Optional[Dict[str, Any]]
-    ) -> List[Dict[str, Any]]:
-        """异步搜索单个编制依据"""
-        try:
-            # 在线程池中执行同步搜索操作
-            loop = asyncio.get_event_loop()
-            retrieved = await loop.run_in_executor(
-                None,
-                self.search_engine.similarity_search,
-                collection_name,
-                basis,
-                min_score,
-                top_k_each,
-                filters
-            )
-            logger.info(f" 搜索 '{basis}' -> 找到 {len(retrieved or [])} 个结果")
-            return retrieved or []
-        except Exception as e:
-            logger.error(f" 搜索失败 '{basis}': {e}")
-            return []
-
-    
-    async def review_all(self, text: str, collection_name: str = "already_basis",
-                        progress_manager=None, callback_task_id: str = None) -> List[List[Dict[str, Any]]]:
-        """异步批量审查所有编制依据"""
-        items = self.text_processor.extract_basis(text)
-        if not items:
-            return []
-
-        start_time = time.time()
-        total_batches = (len(items) + 2) // 3  # 计算总批次数
-        
-        # 发送开始审查的SSE推送
-        if progress_manager and callback_task_id:
-            try:
-                await progress_manager.update_stage_progress(
-                    callback_task_id=callback_task_id,
-                    stage_name="AI审查",
-                    current=0,
-                    status="processing",
-                    message=f"开始编制依据审查,共{len(items)}项编制依据",
-                    overall_task_status="processing",
-                    event_type="processing"
-                )
-            except Exception as e:
-                logger.error(f"SSE推送开始消息失败: {e}")
-
-        # 分批处理
-        batches = []
-        for i in range(0, len(items), 3):
-            batch = items[i:i + 3]
-            batches.append(batch)
-
-        # 异步并发执行所有批次,使用回调处理SSE推送
-        async def process_batch_with_callback(batch_index: int, batch: List[str]) -> List[Dict[str, Any]]:
-            """处理单个批次并执行SSE回调"""
-            try:
-                # 执行单个批次审查
-                result = await self.review_batch(batch, collection_name)
-
-                # 统计当前批次结果
-                batch_standard_count = 0
-                for item in result:
-                    if isinstance(item, dict) and item.get('is_standard', False):
-                        batch_standard_count += 1
-
-                # 立即推送当前批次完成的SSE消息
-                logger.info(f"批次{batch_index + 1}完成,准备推送SSE")
-                if progress_manager and callback_task_id:
-                    try:
-                        progress_percent = int((batch_index + 1) / total_batches * 100)
-                        await progress_manager.update_stage_progress(
-                            callback_task_id=callback_task_id,
-                            stage_name=f"编制依据审查-批次{batch_index + 1}",
-                            current=progress_percent,
-                            status="processing",
-                            message=f"完成第{batch_index + 1}/{total_batches}批次编制依据审查,{len(batch)}项,其中{batch_standard_count}项为标准",
-                            overall_task_status="processing",
-                            event_type="processing",
-                            issues=result  # 推送该批次的审查结果
-                        )
-                        logger.info(f"批次{batch_index + 1} SSE推送成功")
-                    except Exception as e:
-                        logger.error(f"SSE推送批次{batch_index + 1}结果失败: {e}")
-
-                return result
-
-            except Exception as e:
-                logger.error(f" 批次 {batch_index} 处理失败: {e}")
-                error_result = [{"name": name, "is_standard": False, "status": "", "meg": f"批次处理失败: {str(e)}"}
-                                for name in batch]
-
-                # 即使失败也要推送结果
-                if progress_manager and callback_task_id:
-                    try:
-                        progress_percent = int((batch_index + 1) / total_batches * 100)
-                        await progress_manager.update_stage_progress(
-                            callback_task_id=callback_task_id,
-                            stage_name=f"编制依据审查-批次{batch_index + 1}",
-                            current=progress_percent,
-                            status="processing",
-                            message=f"第{batch_index + 1}/{total_batches}批次处理失败",
-                            overall_task_status="processing",
-                            event_type="processing",
-                            issues=error_result
-                        )
-                    except Exception as push_e:
-                        logger.error(f"SSE推送失败批次{batch_index + 1}结果失败: {push_e}")
-
-                return error_result
-
-        # 创建所有批次的异步任务
-        batch_tasks = []
-        for i, batch in enumerate(batches):
-            task = process_batch_with_callback(i, batch)
-            batch_tasks.append(task)
-
-        # 并发执行所有批次
-        logger.info(f"开始并发执行{total_batches}个批次编制依据审查")
-        processed_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
-
-        # 处理异常结果并统计
-        total_items = 0
-        issue_items = 0
-        successful_batches = 0
-
-        # 重新构建结果列表,过滤异常
-        final_results = []
-        for i, result in enumerate(processed_results):
-            if isinstance(result, Exception):
-                logger.error(f" 批次 {i} 返回异常: {result}")
-                error_batch = batches[i] if i < len(batches) else []
-                error_result = [{
-                    "check_item": "reference_check",
-                    "check_result": {"error": str(result), "basis_items": error_batch},
-                    "exist_issue": True,
-                    "risk_info": {"risk_level": "high"}
-                }]
-                final_results.append(error_result)
-            else:
-                final_results.append(result)
-                successful_batches += 1
-
-        # 统计总结果
-        for result in final_results:
-            for item in result:
-                total_items += 1
-                if isinstance(item, dict) and item.get('exist_issue', False):
-                    issue_items += 1
-
-        logger.info(f"并发执行完成,成功批次: {successful_batches}/{total_batches}")
-
-
-        # 发送完成审查的SSE推送
-        elapsed_time = time.time() - start_time
-        if progress_manager and callback_task_id:
-            try:
-                await progress_manager.update_stage_progress(
-                    callback_task_id=callback_task_id,
-                    stage_name="编制依据审查",
-                    current=15,
-                    status="processing",
-                    message=f"编制依据审查完成,共{total_items}项,发现问题{issue_items}项,耗时{elapsed_time:.2f}秒",
-                    overall_task_status="processing",
-                    event_type="processing"
-                )
-            except Exception as e:
-                logger.error(f"SSE推送完成消息失败: {e}")
-
-        logger.info(f" 异步审查完成,耗时: {elapsed_time:.4f} 秒")
-        logger.info(f" 总编制依据: {total_items}, 问题项: {issue_items}, 成功批次: {successful_batches}/{total_batches}")
-        return final_results
-
-
-# 便捷函数
-async def review_basis_batch_async(basis_items: List[str], max_concurrent: int = 4) -> List[Dict[str, Any]]:
-    """异步批次审查便捷函数"""
-    async with BasisReviewService(max_concurrent=max_concurrent) as service:
-        return await service.review_batch(basis_items)
-
-
-async def review_all_basis_async(text: str, max_concurrent: int = 4) -> List[List[Dict[str, Any]]]:
-    """异步全部审查便捷函数"""
-    async with BasisReviewService(max_concurrent=max_concurrent) as service:
-        return await service.review_all(text)
-

+ 8 - 4
core/construction_review/component/reviewers/reference_basis_reviewer.py

@@ -57,13 +57,14 @@ class StandardizedResponseProcessor:
         else:
             self.inter_tool = None
 
-    def process_llm_response(self, response_text: str, check_name: str = "编制依据检查") -> List[Dict[str, Any]]:
+    def process_llm_response(self, response_text: str, check_name: str, chapter_code: str = "") -> List[Dict[str, Any]]:
         """
         处理LLM响应,返回标准格式
 
         Args:
             response_text: LLM原始响应文本
             check_name: 检查项名称
+            chapter_code: 章节代码
 
         Returns:
             List[Dict]: 标准格式的审查结果列表
@@ -79,9 +80,9 @@ class StandardizedResponseProcessor:
 
             if json_data and isinstance(json_data, list):
                 for item in json_data:
-                    parsed_result.append(self.inter_tool._create_issue_item(item, check_name))
+                    parsed_result.append(self.inter_tool._create_issue_item(item, check_name, chapter_code))
             elif json_data and isinstance(json_data, dict):
-                parsed_result.append(self.inter_tool._create_issue_item(json_data, check_name))
+                parsed_result.append(self.inter_tool._create_issue_item(json_data, check_name, chapter_code))
 
             return parsed_result
 
@@ -90,6 +91,7 @@ class StandardizedResponseProcessor:
             # 返回一个错误条目
             return [{
                 "check_item": check_name,
+                "chapter_code": chapter_code or "basis",
                 "check_result": {"error": str(e)},
                 "exist_issue": True,
                 "risk_info": {"risk_level": "medium"}
@@ -188,7 +190,7 @@ class BasisReviewService:
                 print(llm_out)
 
                 # 使用标准化处理器处理响应
-                standardized_result = self.response_processor.process_llm_response(llm_out, "reference_check")
+                standardized_result = self.response_processor.process_llm_response(llm_out, "reference_check", "basis")
 
                 # 统计问题数量
                 issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
@@ -200,6 +202,7 @@ class BasisReviewService:
                 logger.error(f" 批次处理失败: {e}")
                 return [{
                     "check_item": "reference_check",
+                    "chapter_code": "basis",
                     "check_result": {"error": str(e), "basis_items": basis_items},
                     "exist_issue": True,
                     "risk_info": {"risk_level": "high"}
@@ -345,6 +348,7 @@ class BasisReviewService:
                 error_batch = batches[i] if i < len(batches) else []
                 error_result = [{
                     "check_item": "reference_check",
+                    "chapter_code": "basis",
                     "check_result": {"error": str(result), "basis_items": error_batch},
                     "exist_issue": True,
                     "risk_info": {"risk_level": "high"}

+ 10 - 6
core/construction_review/component/reviewers/timeliness_basis_reviewer.py

@@ -57,13 +57,14 @@ class StandardizedResponseProcessor:
         else:
             self.inter_tool = None
 
-    def process_llm_response(self, response_text: str, check_name: str = "编制依据检查") -> List[Dict[str, Any]]:
+    def process_llm_response(self, response_text: str, check_name: str = "编制依据检查", chapter_code: str = "") -> List[Dict[str, Any]]:
         """
         处理LLM响应,返回标准格式
 
         Args:
             response_text: LLM原始响应文本(JSON字符串)
             check_name: 检查项名称
+            chapter_code: 章节代码
 
         Returns:
             List[Dict]: 标准格式的审查结果列表
@@ -83,9 +84,9 @@ class StandardizedResponseProcessor:
 
             if json_data and isinstance(json_data, list):
                 for item in json_data:
-                    parsed_result.append(self.inter_tool._create_issue_item(item, check_name))
+                    parsed_result.append(self.inter_tool._create_issue_item(item, check_name, chapter_code))
             elif json_data and isinstance(json_data, dict):
-                parsed_result.append(self.inter_tool._create_issue_item(json_data, check_name))
+                parsed_result.append(self.inter_tool._create_issue_item(json_data, check_name, chapter_code))
 
             return parsed_result
 
@@ -94,6 +95,7 @@ class StandardizedResponseProcessor:
             # 返回一个错误条目
             return [{
                 "check_item": check_name,
+                "chapter_code":"basis",
                 "check_result": {"error": str(e)},
                 "exist_issue": True,
                 "risk_info": {"risk_level": "medium"}
@@ -301,7 +303,7 @@ class BasisReviewService:
                 # llm_out = await review_reference_timeliness(reference_text=grouped_candidates, review_text=basis_items)
                 
                 
-                standardized_result = self.response_processor.process_llm_response(llm_out, "timeliness_check")
+                standardized_result = self.response_processor.process_llm_response(llm_out, "timeliness_check", "basis")
                 print("标准化处理器处理响应:\n")
                 print(standardized_result)
                 # 统计问题数量
@@ -313,7 +315,8 @@ class BasisReviewService:
             except Exception as e:
                 logger.error(f" 批次处理失败1: {e}")
                 return [{
-                    "check_item": "reference_check",
+                    "check_item": "timeliness_check",
+                    "chapter_code": "basis",
                     "check_result": {"error": str(e), "basis_items": basis_items},
                     "exist_issue": True,
                     "risk_info": {"risk_level": "high"}
@@ -460,7 +463,8 @@ class BasisReviewService:
                 logger.error(f" 批次 {i} 返回异常: {result}")
                 error_batch = batches[i] if i < len(batches) else []
                 error_result = [{
-                    "check_item": "reference_check",
+                    "check_item": "timeliness_check",
+                    "chapter_code": "basis",
                     "check_result": {"error": str(result), "basis_items": error_batch},
                     "exist_issue": True,
                     "risk_info": {"risk_level": "high"}

+ 32 - 30
core/construction_review/component/reviewers/utils/inter_tool.py

@@ -125,7 +125,7 @@ class InterTool:
             return {}
 
     def _format_review_results_to_issues(self, callback_task_id: str, unit_index: int, review_location_label: str,
-                                        unit_content: Dict[str, Any], basic_result: Dict[str, Any],
+                                        chapter_code:str,unit_content: Dict[str, Any], basic_result: Dict[str, Any],
                                         technical_result: Dict[str, Any]) -> List[Dict[str, Any]]:
         """
         将审查结果格式化为issues结构
@@ -134,6 +134,7 @@ class InterTool:
             callback_task_id: 回调任务ID,用于生成唯一issue_id
             unit_index: 单元索引,用于生成唯一issue_id
             review_location_label: 审查位置标签,如"第3页:第一章"
+            chapter_code: 章节编码
             unit_content: 单元内容,包含原始文本等信息
             basic_result: 基础合规性审查结果,包含各项检查结果
             technical_result: 技术性审查结果,包含技术标准检查结果
@@ -149,12 +150,9 @@ class InterTool:
             自动跳过overall_score字段,提取所有检查项的详细结果
             支持风险等级统计和最高风险等级确定
         """
-        BASIC_CHECK_ITEMS = [
-            "词句语法检查",
-            "语义逻辑检查",
-            "完整性检查",
-            "sensitive_check"
-
+        TRCH_CHECK_ITEMS = [
+            "non_parameter_compliance_check",
+            "parameter_compliance_check",
         ]
         issues = []
         review_lists = []
@@ -164,17 +162,16 @@ class InterTool:
         # 合并所有审查结果
         all_results = {}
         if basic_result:
-            logger.info(f"🔍 [DEBUG] basic_result 类型: {type(basic_result)}, 键: {list(basic_result.keys()) if isinstance(basic_result, dict) else 'N/A'}")
+            logger.debug(f"🔍 [DEBUG] basic_result 类型: {type(basic_result)}, 键: {list(basic_result.keys()) if isinstance(basic_result, dict) else 'N/A'}")
             all_results.update(basic_result)
 
-        logger.info(f"basic_result:{basic_result}")
         if technical_result:
-            logger.info(f"🔍 [DEBUG] technical_result 类型: {type(technical_result)}, 键: {list(technical_result.keys()) if isinstance(technical_result, dict) else 'N/A'}")
+            logger.debug(f"🔍 [DEBUG] technical_result 类型: {type(technical_result)}, 键: {list(technical_result.keys()) if isinstance(technical_result, dict) else 'N/A'}")
 
             # 检查是否是 entity_based 模式
             if technical_result.get('review_mode') == 'entity_based' and 'entity_review_results' in technical_result:
                 # entity_based 模式:从 entity_review_results 中提取实际审查结果
-                logger.info(f"🔍 [DEBUG] 检测到 entity_based 模式,从 entity_review_results 提取审查结果")
+                logger.debug(f"🔍 [DEBUG] 检测到 entity_based 模式,从 entity_review_results 提取审查结果")
                 entity_review_results = technical_result.get('entity_review_results', [])
                 total_entities = technical_result.get('total_entities', 0)
 
@@ -186,15 +183,15 @@ class InterTool:
                     if 'non_parameter_compliance' in entity_item:
                         result_key = f'non_parameter_compliance_{entity_info}'
                         all_results[result_key] = entity_item['non_parameter_compliance']
-                        logger.info(f"🔍 [DEBUG] 提取审查结果: {result_key}")
+                        logger.debug(f"🔍 [DEBUG] 提取审查结果: {result_key}")
 
                     # 提取参数性审查结果
                     if 'parameter_compliance' in entity_item:
                         result_key = f'parameter_compliance_{entity_info}'
                         all_results[result_key] = entity_item['parameter_compliance']
-                        logger.info(f"🔍 [DEBUG] 提取审查结果: {result_key}")
+                        logger.debug(f"🔍 [DEBUG] 提取审查结果: {result_key}")
 
-                logger.info(f"🔍 [DEBUG] entity_based 模式处理完成,共提取 {len(entity_review_results)} 个实体的审查结果")
+                logger.debug(f"🔍 [DEBUG] entity_based 模式处理完成,共提取 {len(entity_review_results)} 个实体的审查结果")
 
             else:
                 # general 模式:过滤掉元数据字段,保留实际审查结果
@@ -204,18 +201,18 @@ class InterTool:
                     if key not in metadata_keys:
                         filtered_technical[key] = value
                     else:
-                        logger.info(f"跳过技术审查元数据字段: {key} = {value} (类型: {type(value).__name__})")
+                        logger.debug(f"跳过技术审查元数据字段: {key} = {value} (类型: {type(value).__name__})")
 
-                logger.info(f"🔍 [DEBUG] 过滤后的 technical_result 键: {list(filtered_technical.keys())}")
+                logger.debug(f"🔍 [DEBUG] 过滤后的 technical_result 键: {list(filtered_technical.keys())}")
                 all_results.update(filtered_technical)
 
-        logger.info(f"开始格式化审查结果,合并后结果: {list(all_results.keys())}")
+        logger.debug(f"开始格式化审查结果,合并后结果: {list(all_results.keys())}")
 
         for check_key, check_result in all_results.items():
-            logger.info(f"处理检查项: {check_key}, 结果类型: {type(check_result)}")
+            logger.debug(f"处理检查项: {check_key}, 结果类型: {type(check_result)}")
 
             if check_key == 'overall_score':  # 跳过分数字段
-                logger.info(f"跳过分数字段: {check_key}")
+                logger.debug(f"跳过分数字段: {check_key}")
                 continue
 
             # 🔧 类型安全检查:确保 check_result 是字典类型
@@ -234,18 +231,18 @@ class InterTool:
                 check_name = check_result["details"].get("name")
                 reference_source = check_result["details"].get("rag_reference_source")
                 review_references = check_result["details"].get("rag_review_references")
-                logger.info(f"解析检查项 {check_name} 的响应,长度: {len(response)}")
+                logger.debug(f"解析检查项 {check_name} 的响应,长度: {len(response)}")
 
-                logger.info(f"检查项测试 {check_name}")
+                logger.debug(f"检查项测试 {check_name}")
 
                 # 类型安全检查:确保 check_name 是字符串
                 if not isinstance(check_name, str):
                     logger.warning(f"check_name 类型异常: {type(check_name).__name__}, 值: {check_name}, 将跳过此检查项")
                     continue
 
-                if check_name in BASIC_CHECK_ITEMS:
-                    logger.info(f"检查项 {check_name} 无参考来源,直接解析响应")
-                    check_issues = self.parse_ai_review_response(response, check_name)
+                if check_name  not in TRCH_CHECK_ITEMS:
+                    logger.debug(f"检查项 {check_name} 无参考来源,直接解析响应")
+                    check_issues = self.parse_ai_review_response(response, check_name,chapter_code)
 
                     # 将解析后的结果添加到review_lists中
                     for check_issue in check_issues:
@@ -265,8 +262,8 @@ class InterTool:
                             review_lists.append(check_issue)
 
                 else:
-                    logger.info(f"检查项 {check_name} 存在参考来源,开始处理参考来源")
-                    check_issues = self.parse_ai_review_response(response, check_name)
+                    logger.debug(f"检查项 {check_name} 存在参考来源,开始处理参考来源")
+                    check_issues = self.parse_ai_review_response(response, check_name,chapter_code)
                     processed_issues = []
 
                     for idx, item in enumerate(check_issues):
@@ -364,13 +361,14 @@ class InterTool:
 
         return issues
 
-    def parse_ai_review_response(self, response: str, check_name: str) -> List[Dict[str, Any]]:
+    def parse_ai_review_response(self, response: str, check_name: str,chapter_code:str) -> List[Dict[str, Any]]:
         """
         解析AI审查的JSON格式响应
 
         Args:
             response: AI审查响应内容
             check_name: 检查项名称(如"词句语法检查")
+            chapter_code: 章节编码
 
         Returns:
             List[Dict]: 解析后的审查结果列表,包含check_item、check_result、exist_issue、risk_info等字段
@@ -386,6 +384,7 @@ class InterTool:
             if any(keyword in response for keyword in ["无明显问题", "无问题", "符合要求","无风险"]):
                 return [{
                     "check_item": check_name,
+                    "chapter_code": chapter_code,
                     "check_result": "无明显问题",
                     "exist_issue": False,
                     "risk_info": {"risk_level": "low"}
@@ -397,16 +396,17 @@ class InterTool:
                 # 处理数组格式 - 保存调试数据
                 if isinstance(json_data, list):
                     for issue_data in json_data:
-                        review_lists.append(self._create_issue_item(issue_data, check_name))
+                        review_lists.append(self._create_issue_item(issue_data, check_name, chapter_code))
                 # 处理对象格式
                 elif isinstance(json_data, dict):
-                    review_lists.append(self._create_issue_item(json_data, check_name))
+                    review_lists.append(self._create_issue_item(json_data, check_name, chapter_code))
 
             # 3. 如果JSON解析失败,回退到文本解析
             if not review_lists:
                 risk_level = self._determine_risk_level(response)
                 review_lists.append({
                     "check_item": check_name,
+                    "chapter_code": chapter_code,
                     "check_result": response,
                     "exist_issue": True,
                     "risk_info": {"risk_level": risk_level}
@@ -416,6 +416,7 @@ class InterTool:
             logger.error(f"解析AI审查响应失败: {str(e)}")
             review_lists.append({
                 "check_item": check_name,
+                "chapter_code": chapter_code,
                 "check_result": response,
                 "exist_issue": True,
                 "risk_info": {"risk_level": "low"}
@@ -471,7 +472,7 @@ class InterTool:
 
         return None
 
-    def _create_issue_item(self, issue_data: dict, check_name: str) -> Dict[str, Any]:
+    def _create_issue_item(self, issue_data: dict, check_name: str, chapter_code: str = "") -> Dict[str, Any]:
         """创建单个审查问题项"""
         risk_level = self._determine_risk_level(issue_data.get("risk_level", ""))
 
@@ -481,6 +482,7 @@ class InterTool:
 
         return {
             "check_item": check_name,
+            "chapter_code": chapter_code,
             "check_result": issue_data,
             "exist_issue": exist_issue,
             "risk_info": {"risk_level": risk_level}

+ 9 - 3
core/construction_review/workflows/ai_review_workflow.py

@@ -146,16 +146,18 @@ class AIReviewWorkflow:
             StateGraph: 配置完成的LangGraph工作流图实例
 
         Note:
-            创建包含开始、初始化进度、AI审查、完成、错误处理和终止节点的完整工作流
+            创建包含开始、初始化进度、AI审查、保存结果、完成、错误处理和终止节点的完整工作流
             设置节点间的转换关系和条件边,支持错误处理流程和任务终止流程
+            工作流路径: start → initialize_progress → ai_review → save_results → complete → END
         """
         workflow = StateGraph(AIReviewState)
         workflow.add_node("start", self._start_node)
         workflow.add_node("initialize_progress", self._initialize_progress_node)
         workflow.add_node("ai_review", self._ai_review_node)
+        workflow.add_node("save_results", self._save_results_node)  # 添加保存结果节点
         workflow.add_node("complete", self._complete_node)
         workflow.add_node("error_handler", self._error_handler_node)
-        workflow.add_node("terminate", self._terminate_node)  # 新增终止节点
+        workflow.add_node("terminate", self._terminate_node)  # 终止节点
 
         workflow.set_entry_point("start")
         workflow.add_edge("start", "initialize_progress")
@@ -167,11 +169,13 @@ class AIReviewWorkflow:
             self._should_terminate_or_error,
             {
                 "terminate": "terminate",  # 终止路径
-                "success": "complete",  # 成功后直接完成
+                "success": "save_results",  # 成功后先保存结果
                 "error": "error_handler"  # 错误处理
             }
         )
 
+        # 添加保存结果到完成的边
+        workflow.add_edge("save_results", "complete")
         workflow.add_edge("complete", END)
         workflow.add_edge("error_handler", END)
         workflow.add_edge("terminate", END)
@@ -804,12 +808,14 @@ class AIReviewCoreFun:
                     # 审查完成后立即推送通知
                     if result.overall_risk != "error":
                         section_label = unit_content.get('section_label', f'第{unit_index + 1}部分')
+                        chapter_code = unit_content.get('chapter_classification', '')
                         logger.info(f"section_label:  {section_label}")
                         # 格式化issues以获取问题数量
                         issues = self.inter_tool._format_review_results_to_issues(
                             state["callback_task_id"],
                             unit_index,
                             f"第{unit_content.get('page', '')}页:{section_label}",
+                            chapter_code,
                             unit_content,
                             result.basic_compliance,
                             result.technical_compliance

+ 1 - 1
foundation/observability/logger/loggering.py

@@ -31,7 +31,7 @@ class CompatibleLogger(logging.Logger):
                  log_format=None, datefmt=None):
         # 初始化父类
         super().__init__(name)
-        self.setLevel(logging.INFO)  # 设置logger自身为最低级别
+        self.setLevel(logging.DEBUG)  # 设置logger自身为最低级别
 
         # 存储配置
         self.log_dir = log_dir

+ 8 - 8
utils_test/RAG_Test/rag_pipeline_web/rag_pipeline_server.py

@@ -110,11 +110,11 @@ def rag_enhanced_check(query_content: str) -> dict:
         }
     }
 
-    # 🔍 保存关键节点结果(用于对比分析)
-    os.makedirs(os.path.join(project_root, "temp", "rag_pipeline_server"), exist_ok=True)
-    with open(os.path.join(project_root, "temp", "rag_pipeline_server", "bfp_result_lists.json"), "w", encoding='utf-8') as f:
-        json.dump(bfp_result_lists, f, ensure_ascii=False, indent=4)
-    logger.info("[RAG增强] ✅ 已保存 bfp_result_lists 到 temp/rag_pipeline_server/bfp_result_lists.json")
+    # # 🔍 保存关键节点结果(用于对比分析)
+    # os.makedirs(os.path.join(project_root, "temp", "rag_pipeline_server"), exist_ok=True)
+    # with open(os.path.join(project_root, "temp", "rag_pipeline_server", "bfp_result_lists.json"), "w", encoding='utf-8') as f:
+    #     json.dump(bfp_result_lists, f, ensure_ascii=False, indent=4)
+    # logger.info("[RAG增强] ✅ 已保存 bfp_result_lists 到 temp/rag_pipeline_server/bfp_result_lists.json")
 
     # 检查检索结果
     if not bfp_result_lists:
@@ -138,7 +138,7 @@ def rag_enhanced_check(query_content: str) -> dict:
         enhancement_result = enhance_with_parent_docs_grouped(
             milvus_manager,
             bfp_result_lists,
-            score_threshold=0.5,  # bfp_rerank_score 阈值
+            score_threshold=0.3,  # bfp_rerank_score 阈值
             max_parents_per_pair=3  # 每个查询对最多3个父文档
         )
         enhanced_results = enhancement_result['enhanced_results']
@@ -148,8 +148,8 @@ def rag_enhanced_check(query_content: str) -> dict:
         total_pairs = enhancement_result.get('total_pairs', 0)
 
         # 保存增强后的结果
-        with open(os.path.join(project_root, "temp", "rag_pipeline_server", "enhance_with_parent_docs_grouped.json"), "w", encoding='utf-8') as f:
-            json.dump(enhanced_results, f, ensure_ascii=False, indent=4)
+        # with open(os.path.join(project_root, "temp", "rag_pipeline_server", "enhance_with_parent_docs_grouped.json"), "w", encoding='utf-8') as f:
+        #     json.dump(enhanced_results, f, ensure_ascii=False, indent=4)
 
         logger.info(f"[RAG增强] 分组增强完成: {enhanced_pairs}/{total_pairs} 个查询对进行了增强")
         logger.info(f"[RAG增强] 成功增强 {enhanced_count} 个结果,使用了 {len(parent_docs)} 个父文档")

+ 0 - 126
views/construction_review/review_results.py

@@ -30,132 +30,6 @@ class ReviewResultsResponse(BaseModel):
 
 
 review_results_router = APIRouter(prefix="/sgsc", tags=["前端接口"])
-def generate_risk_stats():
-    """生成模拟风险统计"""
-    return {
-        "high": random.randint(1, 5),
-        "medium": random.randint(3, 8),
-        "low": random.randint(2, 6)
-    }
-
-def generate_dimension_scores():
-    """生成模拟四维评分"""
-    return {
-        "safety": random.randint(60, 95),
-        "quality": random.randint(55, 90),
-        "schedule": random.randint(70, 95),
-        "cost": random.randint(65, 90)
-    }
-
-def generate_summary_report(risk_stats):
-    """生成模拟总结报告"""
-    total_issues = sum(risk_stats.values())
-    if risk_stats["high"] > 0:
-        return f"该施工方案存在{risk_stats['high']}处高风险问题,需重点整改。建议在施工前完善相关技术细节,确保符合规范要求。"
-    elif total_issues > 5:
-        return f"该施工方案整体符合规范要求,但存在{total_issues}处中低风险问题,建议优化完善。"
-    else:
-        return "该施工方案整体符合规范要求,存在少量细节问题,可正常施工。"
-
-def generate_issues():
-    """生成模拟问题条文"""
-    issues = []
-
-    # 高风险问题示例
-    high_risk_issues = [
-        {
-            "page": 12,
-            "chapter": "1.1 路面材料要求",
-            "original_content": "采用沥青、混凝土作为路面施工材料,未明确标号及来源;施工段落仅标注主线段,未细化具体桩号范围"
-        },
-        {
-            "page": 45,
-            "chapter": "3.2 模板安装工艺",
-            "original_content": "模板未按设计要求进行预压,直接浇筑混凝土;预压观测记录采用文字描述,未体现观测点布置及沉降数据"
-        }
-    ]
-
-    # 中风险问题示例
-    medium_risk_issues = [
-        {
-            "page": 28,
-            "chapter": "2.3 施工机械配置",
-            "original_content": "施工机械清单未包含备用设备,未制定设备故障应急预案"
-        },
-        {
-            "page": 67,
-            "chapter": "4.1 质量保证措施",
-            "original_content": "质量检测频次未明确具体标准,检验方法描述不够详细"
-        }
-    ]
-
-    # 生成高风险问题
-    for i, issue_data in enumerate(high_risk_issues):
-        issue_id = f"ISSUE-HL-{datetime.now().strftime('%Y%m%d')}-{i+1:03d}"
-
-        reviews = [
-            {
-                "check_item": "强制性标准符合性检查",
-                "check_result": "不符合",
-                "risk_info": {"risk_level": "high"},
-                "suggestion": {
-                    "suggestion_type": "professional",
-                    "suggestion_content": "按相关规范要求,明确材料规格和施工参数,确保符合技术标准要求",
-                    "verification_standard": "整改后需提供技术规格书,由项目总工签字确认"
-                }
-            },
-            {
-                "check_item": "条文完整性检查",
-                "check_result": "不符合",
-                "risk_info": {"risk_level": "low"},
-                "suggestion": {
-                    "suggestion_type": "completeness",
-                    "suggestion_content": "补充详细的施工范围描述,与施工平面布置图桩号标注一致",
-                    "verification_standard": "参考施工方案编制导则相关条款"
-                }
-            }
-        ]
-
-        issues.append({
-            "issue_id": issue_id,
-            "metadata": issue_data,
-            "risk_summary": {
-                "max_risk_level": "high",
-                "risk_count": {"high": 1, "medium": 1, "low": 1},
-                "key_risk_reminder": "高风险点:技术参数缺失,需24小时内整改"
-            },
-            "review_lists": reviews
-        })
-
-    # 生成中风险问题
-    for i, issue_data in enumerate(medium_risk_issues):
-        issue_id = f"ISSUE-ML-{datetime.now().strftime('%Y%m%d')}-{i+1:03d}"
-
-        reviews = [
-            {
-                "check_item": "规范性检查",
-                "check_result": "不符合",
-                "risk_info": {"risk_level": "medium"},
-                "suggestion": {
-                    "suggestion_type": "normative",
-                    "suggestion_content": "完善应急预案制定,明确设备故障处理流程和备用资源配置",
-                    "verification_standard": "参考《施工组织设计规范》相关要求"
-                }
-            }
-        ]
-
-        issues.append({
-            "issue_id": issue_id,
-            "metadata": issue_data,
-            "risk_summary": {
-                "max_risk_level": "medium",
-                "risk_count": {"high": 0, "medium": 1, "low": 0},
-                "key_risk_reminder": "中风险点:管理措施不完善,需在施工前完善"
-            },
-            "review_lists": reviews
-        })
-
-    return issues
 
 @review_results_router.get("/review_results", response_model=ReviewResultsResponse)
 async def review_results(