浏览代码

v0.0.4-debug-功能优化
- 优化模型超时实际为120s
- 降低并发审查块为2
- 增加审查项配置功能

WangXuMing 2 月之前
父节点
当前提交
7861584a47

+ 82 - 25
core/construction_review/component/ai_review_engine.py

@@ -271,28 +271,50 @@ class AIReviewEngine(BaseReviewer):
         async def check_with_semaphore(check_func, **kwargs):
             async with self.semaphore:
                 return await check_func(**kwargs)
+            
+        basic_tasks = []
 
-        basic_tasks = [
-            check_with_semaphore(self.check_grammar, trace_id_idx=trace_id_idx, review_content=review_content, review_references=None, review_location_label=review_location_label, state=state, stage_name=stage_name),
-            check_with_semaphore(self.check_semantic_logic, trace_id_idx=trace_id_idx, review_content=review_content, review_references=None, review_location_label=review_location_label, state=state, stage_name=stage_name),
-            #check_with_semaphore(self.check_completeness, trace_id_idx=trace_id_idx, review_content=review_content, review_references=None, review_location_label=review_location_label, state=state, stage_name=stage_name)
-        ]
+        if 'sensitive_word_check'  in self.task_info.get_review_config_list():
+            basic_tasks.append(
+                check_with_semaphore(self.check_grammar, trace_id_idx=trace_id_idx, review_content=review_content, review_references=None, review_location_label=review_location_label, state=state, stage_name=stage_name),
 
+            )
+        if 'semantic_logic_check' in self.task_info.get_review_config_list():
+            basic_tasks.append(
+                check_with_semaphore(self.check_semantic_logic, trace_id_idx=trace_id_idx, review_content=review_content, review_references=None, review_location_label=review_location_label, state=state, stage_name=stage_name),
 
-        #grammar_result, semantic_result, completeness_result = await asyncio.gather(*basic_tasks, return_exceptions=True)
-        grammar_result, semantic_result = await asyncio.gather(*basic_tasks, return_exceptions=True)
-        # 使用公共方法处理结果
-        grammar_result = self._process_review_result(grammar_result)
-        semantic_result = self._process_review_result(semantic_result)
-        #completeness_result = self._process_review_result(completeness_result)
+            )
+
+        # 一次性执行所有任务,避免重复协程调用
+        if not basic_tasks:
+            return {
+                "grammar_check": self._process_review_result(None),
+                "semantic_check": self._process_review_result(None),
+            }
+
+        # 执行任务(只执行一次)
+        results = await asyncio.gather(*basic_tasks, return_exceptions=True)
+
+        # 根据配置项分配结果
+        grammar_result = self._process_review_result(None)
+        semantic_result = self._process_review_result(None)
+
+        result_index = 0
+
+        if 'sensitive_word_check' in self.task_info.get_review_config_list():
+            if result_index < len(results):
+                grammar_result = self._process_review_result(results[result_index])
+            result_index += 1
+
+        if 'semantic_logic_check' in self.task_info.get_review_config_list():
+            if result_index < len(results):
+                semantic_result = self._process_review_result(results[result_index])
+            result_index += 1
 
         return {
             'grammar_check': grammar_result,
             'semantic_check': semantic_result,
-            #'completeness_check': completeness_result,
-            'overall_score': "None"
         }
-
     async def technical_compliance_check(self,trace_id_idx: str, unit_content: Dict[str, Any],
                                       review_location_label: str,state:str,stage_name:str) -> Dict[str, Any]:
         """
@@ -320,22 +342,57 @@ class AIReviewEngine(BaseReviewer):
             async with self.semaphore:
                 return await check_func(**kwargs)
 
-        # 并发执行技术性检查任务,使用新的非参数和参数合规性检查
-        technical_tasks = [
-            check_with_semaphore(self.check_non_parameter_compliance, trace_id_idx=trace_id_idx, review_content=review_content, review_references=review_references, reference_source=reference_source, review_location_label=review_location_label, state=state, stage_name=stage_name),
-            check_with_semaphore(self.check_parameter_compliance, trace_id_idx=trace_id_idx, review_content=review_content, review_references=review_references, reference_source=reference_source, review_location_label=review_location_label, state=state, stage_name=stage_name)
-        ]
+        # 根据配置动态创建技术性检查任务
+        technical_tasks = []
+        task_mapping = []  # 任务名称映射
+
+        if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():
+            task_mapping.append('non_parameter_compliance')
+            technical_tasks.append(
+                check_with_semaphore(self.check_non_parameter_compliance, trace_id_idx=trace_id_idx,
+                                   review_content=review_content, review_references=review_references,
+                                   reference_source=reference_source, review_location_label=review_location_label,
+                                   state=state, stage_name=stage_name)
+            )
+
+        if 'parameter_compliance_check' in self.task_info.get_review_config_list():
+            task_mapping.append('parameter_compliance')
+            technical_tasks.append(
+                check_with_semaphore(self.check_parameter_compliance, trace_id_idx=trace_id_idx,
+                                   review_content=review_content, review_references=review_references,
+                                   reference_source=reference_source, review_location_label=review_location_label,
+                                   state=state, stage_name=stage_name)
+            )
+
+        # 一次性执行所有任务,避免重复协程调用
+        if not technical_tasks:
+            return {
+                "non_parameter_compliance": self._process_review_result(None),
+                "parameter_compliance": self._process_review_result(None),
+            }
+
+        # 执行任务(只执行一次)
+        results = await asyncio.gather(*technical_tasks, return_exceptions=True)
+
+        # 根据配置项分配结果
+        non_parameter_result = self._process_review_result(None)
+        parameter_result = self._process_review_result(None)
+
+        result_index = 0
 
-        non_parameter_result, parameter_result = await asyncio.gather(*technical_tasks, return_exceptions=True)
+        if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():
+            if result_index < len(results):
+                non_parameter_result = self._process_review_result(results[result_index])
+            result_index += 1
 
-        # 使用公共方法处理结果
-        non_parameter_result = self._process_review_result(non_parameter_result)
-        parameter_result = self._process_review_result(parameter_result)
+        if 'parameter_compliance_check' in self.task_info.get_review_config_list():
+            if result_index < len(results):
+                parameter_result = self._process_review_result(results[result_index])
+            result_index += 1
 
         return {
             'non_parameter_compliance': non_parameter_result,
-            'parameter_compliance': parameter_result,
-            'overall_score': self._calculate_technical_score(non_parameter_result, parameter_result)
+            'parameter_compliance': parameter_result
         }
 
     def rag_enhanced_check(self, unit_content: Dict[str, Any]) -> Dict[str, Any]:

+ 2 - 2
core/construction_review/workflows/ai_review_workflow.py

@@ -645,8 +645,8 @@ class AIReviewCoreFun:
         """
         
         try:
-            # 简化方案:并发执行,每个单元完成时立即推送消息
-            semaphore = asyncio.Semaphore(3)  # 允许3个并发审查
+
+            semaphore = asyncio.Semaphore(2)  # 并发审查数
 
             async def process_unit_and_notify(unit_index, unit_content):
                 """处理单个单元,完成后立即推送通知"""

+ 1 - 1
foundation/ai/agent/generate/model_generate.py

@@ -115,4 +115,4 @@ class GenerateModelClient:
             logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误类型: {type(e).__name__}, 错误信息: {str(e)}")
             raise
 
-generate_model_client = GenerateModelClient()
+generate_model_client = GenerateModelClient(default_timeout=120, max_retries=3, backoff_factor=1.0)

+ 20 - 9
foundation/ai/agent/workflow/test_workflow_graph.py

@@ -30,8 +30,8 @@ class TestWorkflowGraph:
         self.workflow_node = TestWorkflowNode()
         self.checkpoint_saver = MemorySaver()
         self.app = self.init_workflow_graph()
-        # 将生成的图片保存到文件
-        self.write_graph()
+        # 将生成的图片保存到文件(移除自动生成,避免启动失败)
+        # self.write_graph()
 
 
 
@@ -177,15 +177,26 @@ class TestWorkflowGraph:
 
 
 
-    def write_graph(self):
+    def write_graph(self, max_retries=3, retry_delay=2.0):
         """
-            将图写入文件
+            将图写入文件(包含错误处理和重试机制)
         """
-        # 
-        graph_png = self.app.get_graph().draw_mermaid_png()
-        with open("build_graph_app.png", "wb") as f:
-            f.write(graph_png)
-        server_logger.info(f"【图工作流写入文件完成】")
+        try:
+            # 使用增加的重试设置
+            graph_png = self.app.get_graph().draw_mermaid_png(
+                max_retries=max_retries,
+                retry_delay=retry_delay
+            )
+            with open("build_graph_app.png", "wb") as f:
+                f.write(graph_png)
+            server_logger.info(f"【图工作流写入文件完成】")
+            return True
+        except Exception as e:
+            server_logger.warning(f"【图工作流生成失败】{str(e)}")
+            server_logger.info("【图工作流】可以使用其他方式生成图表,如:")
+            server_logger.info("1. 使用本地图表渲染器:draw_mermaid_png(draw_method=MermaidDrawMethod.PYPPETEER)")
+            server_logger.info("2. 检查网络连接后重试")
+            return False
 
 
 # 实例化