Просмотр исходного кода

v0.0.4-功能优化
- 适配prep_basis_reviewer.py 响应字段
- 调整审查顺序,将大纲审查、编制依据审查放在核心审查之后

WangXuMing 3 недель назад
Родитель
Сommit
9ee37877f7

BIN
build_graph_app.png


+ 1 - 1
core/construction_review/component/doc_worker/config/config.yaml

@@ -7,7 +7,7 @@ text_splitting:
   # 最大分块字符数
   max_chunk_size: 3000
   # 最小分块字符数
-  min_chunk_size: 100
+  min_chunk_size: 50
   # 模糊匹配阈值(0-1)
   fuzzy_threshold: 0.80
 

+ 17 - 126
core/construction_review/component/reviewers/prep_basis_reviewer.py

@@ -6,21 +6,7 @@ import time
 from typing import Any, Dict, List, Optional
 import asyncio
 
-# 获取当前脚本的绝对路径
-current_file = os.path.abspath(__file__)
-# 获取脚本所在目录(reviewers)
-current_dir = os.path.dirname(current_file)
-# 获取上一级目录(component)
-parent_dir_1 = os.path.dirname(current_dir)
-# 获取上二级目录(construction_review)
-parent_dir_2 = os.path.dirname(parent_dir_1)
-# 获取上三级目录(core)
-parent_dir_3 = os.path.dirname(parent_dir_2)
-# 获取项目根目录(LQAgentPlatform,即 foundation 所在的目录)
-root_dir = os.path.dirname(parent_dir_3)
-
-# 将项目根目录添加到 sys.path - 必须在导入之前
-sys.path.append(root_dir)
+
 
 # 导入必要的依赖
 try:
@@ -50,64 +36,7 @@ class TextProcessor:
         pattern = re.compile(r'《[^》]+》(?:([^)]+))?')
         return pattern.findall(text)
 
-    @staticmethod
-    def extract_json_array(text: str) -> List[Dict[str, Any]]:
-        """提取JSON数组"""
-        if not text:
-            return []
-
-        # 检查是否是"无明显问题"
-        if "无明显问题" in text or "无明显" in text:
-            return []
-
-        try:
-            obj = json.loads(text.strip())
-            if isinstance(obj, list):
-                return [x for x in obj if isinstance(x, dict)]
-        except Exception:
-            pass
-
-        matches = re.findall(r"\[[\s\S]*?\]", text, flags=re.S)
-        for s in reversed(matches):
-            try:
-                obj = json.loads(s)
-                if isinstance(obj, list):
-                    return [x for x in obj if isinstance(x, dict)]
-            except Exception:
-                continue
-        return []
-
-
-class ResultFormatter:
-    """结果格式化工具类"""
-
-    @staticmethod
-    def normalize_item(obj: Dict[str, Any]) -> Dict[str, Any]:
-        """标准化单个结果项"""
-        name = obj.get("name", "")
-        is_standard = obj.get("is_standard", False)
-        status = obj.get("status", "")
-        meg = obj.get("meg", "")  # 注意:这里应该是meg字段
-
-        if isinstance(is_standard, str):
-            is_standard = is_standard.strip().lower() in ("true", "1", "yes")
-        else:
-            is_standard = bool(is_standard)
-
-        return {
-            "name": str(name or "").strip(),
-            "is_standard": is_standard,
-            "status": str(status or "").strip(),
-            "meg": str(meg or "").strip(),
-        }
 
-    @staticmethod
-    def normalize_output(arr: List[Dict[str, Any]], expected_len: int) -> List[Dict[str, Any]]:
-        """标准化输出结果"""
-        out = [ResultFormatter.normalize_item(x) for x in arr[:expected_len]]
-        while len(out) < expected_len:
-            out.append({"name": "", "is_standard": False, "status": "", "meg": ""})
-        return out
 
 
 class StandardizedResponseProcessor:
@@ -425,8 +354,7 @@ class BasisReviewService:
         self.search_engine = BasisSearchEngine()
         self.llm_client = LLMReviewClient()
         self.text_processor = TextProcessor()
-        self.result_formatter = ResultFormatter()
-        self.response_processor = StandardizedResponseProcessor()  # 新增标准化处理器
+        self.response_processor = StandardizedResponseProcessor()  # 标准化处理器
         # 确保使用最新的prompt_loader实例
         from core.construction_review.component.reviewers.utils.prompt_loader import PromptLoader
         fresh_prompt_loader = PromptLoader()
@@ -484,50 +412,8 @@ class BasisReviewService:
                 trace_id = f"prep_basis_batch_{int(time.time())}"
                 llm_out = await self.llm_client.review_basis(prompt_template, user_content, trace_id)
 
-                # 处理结果
-                arr = self.text_processor.extract_json_array(llm_out)
-                result = self.result_formatter.normalize_output(arr, expected_len=len(basis_items))
-
-                # 回填名称
-                for i, name in enumerate(basis_items):
-                    result[i]["name"] = name
-
-                return result
-
-            except Exception as e:
-                logger.error(f" 批次处理失败: {e}")
-                return [{"name": name, "is_standard": False, "status": "", "meg": f"批次处理失败: {str(e)}"}
-                        for name in basis_items]
-
-    async def review_batch_standardized(self,
-        basis_items: List[str],
-        collection_name: str = "already_basis",
-        filters: Optional[Dict[str, Any]] = None,
-        min_score: float = 0.3,
-        top_k_each: int = 3,
-    ) -> List[Dict[str, Any]]:
-        """
-        异步批次审查(返回outline_reviewer.py标准格式)
-
-        Returns:
-            List[Dict]: 标准格式审查结果,包含check_item, check_result, exist_issue, risk_info
-        """
-        basis_items = [x for x in (basis_items or []) if isinstance(x, str) and x.strip()]
-        if not basis_items:
-            return []
-
-        async with self._semaphore:
-            try:
-                # 构建提示词模板和用户内容(不使用搜索结果,简化处理)
-                prompt_template = self.message_builder.get_prompt_template()
-                combined_content = "\n".join([f"{i+1}. {item}" for i, item in enumerate(basis_items)])
-
-                user_content = f"请审查以下编制依据:\n{combined_content}"
-                trace_id = f"prep_basis_standardized_{int(time.time())}"
-                llm_out = await self.llm_client.review_basis(prompt_template, user_content, trace_id)
-
                 # 使用标准化处理器处理响应
-                standardized_result = self.response_processor.process_llm_response(llm_out, "编制依据规范性检查")
+                standardized_result = self.response_processor.process_llm_response(llm_out, "reference_check")
 
                 # 统计问题数量
                 issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
@@ -536,14 +422,15 @@ class BasisReviewService:
                 return standardized_result
 
             except Exception as e:
-                logger.error(f" 标准化批次处理失败: {e}")
+                logger.error(f" 批次处理失败: {e}")
                 return [{
-                    "check_item": "编制依据规范性检查",
+                    "check_item": "reference_check",
                     "check_result": {"error": str(e), "basis_items": basis_items},
                     "exist_issue": True,
                     "risk_info": {"risk_level": "high"}
                 }]
 
+    
     async def _async_search_basis(
         self,
         basis: str,
@@ -673,7 +560,7 @@ class BasisReviewService:
 
         # 处理异常结果并统计
         total_items = 0
-        standard_items = 0
+        issue_items = 0
         successful_batches = 0
 
         # 重新构建结果列表,过滤异常
@@ -682,8 +569,12 @@ class BasisReviewService:
             if isinstance(result, Exception):
                 logger.error(f" 批次 {i} 返回异常: {result}")
                 error_batch = batches[i] if i < len(batches) else []
-                error_result = [{"name": name, "is_standard": False, "status": "", "meg": f"批次异常: {str(result)}"}
-                                for name in error_batch]
+                error_result = [{
+                    "check_item": "reference_check",
+                    "check_result": {"error": str(result), "basis_items": error_batch},
+                    "exist_issue": True,
+                    "risk_info": {"risk_level": "high"}
+                }]
                 final_results.append(error_result)
             else:
                 final_results.append(result)
@@ -693,8 +584,8 @@ class BasisReviewService:
         for result in final_results:
             for item in result:
                 total_items += 1
-                if isinstance(item, dict) and item.get('is_standard', False):
-                    standard_items += 1
+                if isinstance(item, dict) and item.get('exist_issue', False):
+                    issue_items += 1
 
         logger.info(f"并发执行完成,成功批次: {successful_batches}/{total_batches}")
 
@@ -708,7 +599,7 @@ class BasisReviewService:
                     stage_name="编制依据审查",
                     current=15,
                     status="processing",
-                    message=f"编制依据审查完成,共{total_items}项,其中{standard_items}项为标准,耗时{elapsed_time:.2f}秒",
+                    message=f"编制依据审查完成,共{total_items}项,发现问题{issue_items}项,耗时{elapsed_time:.2f}秒",
                     overall_task_status="processing",
                     event_type="processing"
                 )
@@ -716,7 +607,7 @@ class BasisReviewService:
                 logger.error(f"SSE推送完成消息失败: {e}")
 
         logger.info(f" 异步审查完成,耗时: {elapsed_time:.4f} 秒")
-        logger.info(f" 总编制依据: {total_items}, 标准项: {standard_items}, 成功批次: {successful_batches}/{total_batches}")
+        logger.info(f" 总编制依据: {total_items}, 问题项: {issue_items}, 成功批次: {successful_batches}/{total_batches}")
         return final_results
 
 

+ 8 - 6
core/construction_review/workflows/ai_review_workflow.py

@@ -282,13 +282,19 @@ class AIReviewWorkflow:
                     "messages": [AIMessage(content=f"没有可审查的单元,任务ID: {state['callback_task_id']}")]
                 }
 
-            # 2. 发送开始审查进度
+
+
+            # 2. 执行基础并发审查
+            logger.info(f"开始执行并发审查,任务ID: {state['callback_task_id']}")
+            successful_results = await self.core_fun._execute_concurrent_reviews(review_chunks, total_units, state)
+            logger.info(f"并发审查完成,成功结果: {len(successful_results)}, 任务ID: {state['callback_task_id']}")
+
             await self.core_fun._send_start_review_progress(state, total_units)
             completeness_check = "completeness_check" in  self.task_info.get_review_config_list()
             if not completeness_check:
                 logger.info(f"跳过执行大纲审查")
             else:
-                # 3. 执行大纲审查
+            # 3. 执行大纲审查
                 logger.info(f"开始执行大纲审查")
                 outline_review_result = await self.ai_review_engine.outline_check(state["callback_task_id"], state["structured_content"],
                                                     state, state.get("stage_name", "大纲审查"))
@@ -323,10 +329,6 @@ class AIReviewWorkflow:
                 else:
                     logger.warning(f"未找到编制依据内容,跳过编制依据审查")
 
-            # 5. 执行并发审查
-            logger.info(f"开始执行并发审查,任务ID: {state['callback_task_id']}")
-            successful_results = await self.core_fun._execute_concurrent_reviews(review_chunks, total_units, state)
-            logger.info(f"并发审查完成,成功结果: {len(successful_results)}, 任务ID: {state['callback_task_id']}")
 
             # 6. 汇总结果
             summary = self.inter_tool._aggregate_results(successful_results)