WangXuMing 2 месяцев назад
Родитель
Сommit
6f343c6bfe
4 измененных файлов с 312 добавлено и 305 удалено
  1. 2 1
      .gitignore
  2. 1 1
      config/config.ini
  3. 297 289
      core/construction_review/workflows/ai_review_workflow.py
  4. 12 14
      temp/AI审查结果.json

+ 2 - 1
.gitignore

@@ -65,4 +65,5 @@ todo.md
 .design
 .claude
 .R&D
-temp/
+temp/
+*.json

+ 1 - 1
config/config.ini

@@ -29,7 +29,7 @@ QWEN_API_KEY=ms-9ad4a379-d592-4acd-b92c-8bac08a4a045
 
 [ai_review]
 # 调试模式配置
-MAX_REVIEW_UNITS=1
+MAX_REVIEW_UNITS=3
 REVIEW_MODE=random
 # REVIEW_MODE=all/random/first
 

+ 297 - 289
core/construction_review/workflows/ai_review_workflow.py

@@ -40,8 +40,9 @@
 
 import asyncio
 import json
-import random
 import time
+import random
+import uuid
 from dataclasses import dataclass, asdict
 from typing import Optional, Callable, Dict, Any, TypedDict, Annotated, List
 from langgraph.graph import StateGraph, END
@@ -271,6 +272,172 @@ class AIReviewWorkflow:
 
         return state
     
+    async def _ai_review_node(self, state: AIReviewState) -> AIReviewState:
+        """
+        AI审查节点 
+
+        Args:
+            state: AI审查状态
+
+        Returns:
+            AIReviewState: 更新后的审查状态
+        """
+        try:
+            state["current_stage"] = "ai_review"
+
+            # 1. 准备审查单元数据
+            review_chunks, total_units, total_all_units = await self._prepare_review_units(state)
+
+            # 2. 发送开始审查进度
+            await self._send_start_review_progress(state, total_units)
+
+            # 3. 执行并发审查
+            successful_results = await self._execute_concurrent_reviews(review_chunks, total_units, state)
+
+            # 4. 汇总结果
+            summary = self._aggregate_results(successful_results)
+
+            review_results = {
+                'total_all_units': total_all_units,  # 原始总单元数
+                'total_reviewed_units': total_units,  # 实际审查的单元数
+                'successful_units': len(successful_results),
+                'failed_units': total_units - len(successful_results),
+                'review_mode': self.review_mode,
+                'max_review_units': self.max_review_units,
+                'review_results': successful_results,
+                'summary': summary
+            }
+
+            state["review_results"] = review_results
+            state["messages"].append(AIMessage(
+                content=f"AI审查完成,共处理{total_units}个单元,成功{len(successful_results)}个"
+            ))
+
+            return state
+
+        except Exception as e:
+            logger.error(f"AI审查失败: {str(e)}")
+            state["error_message"] = str(e)
+            state["messages"].append(AIMessage(content=f"AI审查失败: {str(e)}"))
+            return state
+
+    async def _complete_node(self, state: AIReviewState) -> AIReviewState:
+        """完成节点"""
+        logger.info(f"AI审查完成: {state['file_id']}")
+
+        state["current_stage"] = "complete"
+        state["status"] = "completed"
+
+        # 更新完成状态
+        if state["progress_manager"]:
+            await state["progress_manager"].update_stage_progress(
+                callback_task_id=state["callback_task_id"],
+                stage_name="AI审查",
+                current=90,
+                status="processing",
+                message="AI审查完成",
+                overall_task_status="processing",
+                event_type="processing"
+            )
+
+        state["messages"].append(AIMessage(content="AI审查工作流完成"))
+
+        return state
+
+    async def _error_handler_node(self, state: AIReviewState) -> AIReviewState:
+        """错误处理节点"""
+        logger.error(f"AI审查错误处理: {state['file_id']}, 错误: {state['error_message']}")
+
+        state["status"] = "failed"
+        state["current_stage"] = "error_handler"
+
+        # 更新错误状态
+        if state["progress_manager"]:
+            await state["progress_manager"].update_stage_progress(
+                callback_task_id=state["callback_task_id"],
+                stage_name="AI审查",
+                current=50,
+                status="failed",
+                message=f"AI审查失败: {state['error_message']}",
+                overall_task_status="failed",
+                event_type="error"
+            )
+
+        state["messages"].append(AIMessage(
+            content=f"错误处理: {state['error_message']}"
+        ))
+
+        return state
+
+    def _get_workflow_graph(self):
+        """获取工作流图(可视化输出)"""
+        grandalf_graph = self.graph.get_graph()
+        grandalf_graph.print_ascii()
+ 
+    async def _get_status(self) -> dict:
+        """获取工作流状态"""
+        if self.progress_manager:
+            return await self.progress_manager.get_progress(self.callback_task_id)
+        return {}
+
+
+
+    async def _execute_concurrent_reviews(self, review_chunks: List[Dict[str, Any]],
+                                          total_units: int, state: AIReviewState) -> List[ReviewResult]:
+        """
+        执行并发审查
+
+        Args:
+            review_chunks: 审查单元列表
+            total_units: 总单元数
+            state: AI审查状态
+
+        Returns:
+            List[ReviewResult]: 审查结果列表
+        """
+        try:
+            # 实现异步并发
+            review_tasks = [
+                asyncio.create_task(self._review_single_unit(content, i, total_units, state))
+                for i, content in enumerate(review_chunks)
+            ]
+
+            # 等待所有审查完成
+            all_results = await asyncio.gather(*review_tasks)
+
+            # 处理进度更新和问题详情推送
+            for i, result in enumerate(all_results):
+                if result.overall_risk != "error":
+                    unit_content = review_chunks[i]
+                    section_label = unit_content.get('section_label', f'第{i + 1}部分')
+
+                    # 格式化issues以获取问题数量
+                    issues = self._format_review_results_to_issues(
+                        state["callback_task_id"],
+                        i,
+                        f"第{unit_content.get('page', '')}页:{section_label}",
+                        unit_content,
+                        result.basic_compliance,
+                        result.technical_compliance
+                    )
+
+                    issues_count = sum(len(issue.get("review_lists", [])) for issue in issues)
+                    current = int(((i + 1) / total_units) * 100)
+
+                    # 发送进度更新
+                    #await self._send_unit_complete_progress(state, i, total_units, section_label, issues_count)
+
+                    # 发送问题详情
+                    #await self._send_unit_review_details(state, i, total_units, section_label, issues, current)
+
+            # 过滤成功结果
+            successful_results = [result for result in all_results if result.overall_risk != "error"]
+            return successful_results
+
+        except Exception as e:
+            logger.error(f"执行并发审查失败: {str(e)}")
+            return []
+
     async def _prepare_review_units(self, state: AIReviewState) -> tuple:
         """
         准备审查单元数据
@@ -295,27 +462,6 @@ class AIReviewWorkflow:
             logger.error(f"准备审查单元失败: {str(e)}")
             raise
 
-    async def _send_start_review_progress(self, state: AIReviewState, total_units: int) -> None:
-        """
-        发送开始审查的进度更新
-
-        Args:
-            state: AI审查状态
-            total_units: 总审查单元数
-        """
-        try:
-            if state["progress_manager"]:
-                await state["progress_manager"].update_stage_progress(
-                    callback_task_id=state["callback_task_id"],
-                    stage_name="AI审查",
-                    current=0,
-                    status="processing",
-                    message=f"开始AI审查,共 {total_units} 个审查单元",
-                    event_type="processing"
-                )
-        except Exception as e:
-            logger.warning(f"发送开始进度更新失败: {str(e)}")
-
     async def _review_single_unit(self, unit_content: Dict[str, Any], unit_index: int,
                                   total_units: int, state: AIReviewState) -> ReviewResult:
         """
@@ -365,6 +511,7 @@ class AIReviewWorkflow:
 
             # 等待所有审查完成
             review_results = await asyncio.gather(*review_tasks, return_exceptions=True)
+            await self._send_unit_review_details(state, i, total_units, section_label, issues, current)
 
             # 处理异常结果
             basic_result = review_results[0] if not isinstance(review_results[0], Exception) else {"error": str(review_results[0])}
@@ -416,45 +563,26 @@ class AIReviewWorkflow:
                 overall_risk="error"
             )
 
-    async def _send_unit_complete_progress(self, state: AIReviewState, unit_index: int,
-                                           total_units: int, section_label: str,
-                                           issues_count: int) -> None:
+    async def _send_start_review_progress(self, state: AIReviewState, total_units: int) -> None:
         """
-        发送单元完成进度更新
+        发送开始审查的进度更新
 
         Args:
             state: AI审查状态
-            unit_index: 单元索引
-            total_units: 总单元数
-            section_label: 章节标签
-            issues_count: 问题数量
+            total_units: 总审查单元数
         """
         try:
-            current = int(((unit_index + 1) / total_units) * 100)
-
-            # 构建完成消息
-            if issues_count > 0:
-                message = f"已完成第 {unit_index + 1}/{total_units} 个单元: {section_label}(已发现{issues_count}个问题)"
-            else:
-                message = f"已完成第 {unit_index + 1}/{total_units} 个单元: {section_label}"
-
-            logger.info(f"单元审查完成,更新进度: {current}% {message}")
-
-            # 发送processing_flag事件
             if state["progress_manager"]:
-                logger.info(f"发送processing_flag事件: current={current}, message={message}")
                 await state["progress_manager"].update_stage_progress(
                     callback_task_id=state["callback_task_id"],
                     stage_name="AI审查",
-                    current=current,
+                    current=0,
                     status="processing",
-                    message=message,
-                    user_id=state.get("user_id", ""),
-                    overall_task_status="processing",
-                    event_type="processing_flag"
+                    message=f"开始AI审查,共 {total_units} 个审查单元",
+                    event_type="processing"
                 )
         except Exception as e:
-            logger.warning(f"发送单元完成进度更新失败: {str(e)}")
+            logger.warning(f"发送开始进度更新失败: {str(e)}")
 
     async def _send_unit_review_details(self, state: AIReviewState, unit_index: int,
                                         total_units: int, section_label: str,
@@ -471,9 +599,14 @@ class AIReviewWorkflow:
             current: 当前进度
         """
         try:
-            if issues and state["progress_manager"]:
+            if isinstance(issues, list) and issues and state["progress_manager"]:
                 stage_name = f"AI审查:{section_label}"
-                issues_count = sum(len(issue.get("review_lists", [])) for issue in issues)
+                issues_count = sum(
+                    len(issue.get(issue_id, {}).get("review_lists", []))
+                    for issue in issues
+                    for issue_id in issue.keys()
+                )
+                await self._send_unit_complete_progress(state, unit_index, total_units, section_label, issues_count)
 
                 asyncio.create_task(
                     state["progress_manager"].update_stage_progress(
@@ -489,6 +622,7 @@ class AIReviewWorkflow:
                     )
                 )
 
+
                 # 清空当前issues
                 await state["progress_manager"].update_stage_progress(
                     callback_task_id=state["callback_task_id"],
@@ -497,243 +631,45 @@ class AIReviewWorkflow:
         except Exception as e:
             logger.warning(f"发送单元审查详情失败: {str(e)}")
 
-    async def _execute_concurrent_reviews(self, review_chunks: List[Dict[str, Any]],
-                                          total_units: int, state: AIReviewState) -> List[ReviewResult]:
+    async def _send_unit_complete_progress(self, state: AIReviewState, unit_index: int,
+                                           total_units: int, section_label: str,
+                                           issues_count: int) -> None:
         """
-        执行并发审查
+        发送单元完成进度更新
 
         Args:
-            review_chunks: 审查单元列表
-            total_units: 总单元数
             state: AI审查状态
-
-        Returns:
-            List[ReviewResult]: 审查结果列表
+            unit_index: 单元索引
+            total_units: 总单元数
+            section_label: 章节标签
+            issues_count: 问题数量
         """
         try:
-            # 实现异步并发
-            review_tasks = [
-                asyncio.create_task(self._review_single_unit(content, i, total_units, state))
-                for i, content in enumerate(review_chunks)
-            ]
+            current = int(((unit_index + 1) / total_units) * 100)
 
-            # 等待所有审查完成
-            all_results = await asyncio.gather(*review_tasks)
+            # 构建完成消息
+            if issues_count > 0:
+                message = f"已完成第 {unit_index + 1}/{total_units} 个单元: {section_label}(已发现{issues_count}个问题)"
+            else:
+                message = f"已完成第 {unit_index + 1}/{total_units} 个单元: {section_label}"
 
-            # 处理进度更新和问题详情推送
-            for i, result in enumerate(all_results):
-                if result.overall_risk != "error":
-                    unit_content = review_chunks[i]
-                    section_label = unit_content.get('section_label', f'第{i + 1}部分')
-
-                    # 格式化issues以获取问题数量
-                    issues = self._format_review_results_to_issues(
-                        state["callback_task_id"],
-                        i,
-                        f"第{unit_content.get('page', '')}页:{section_label}",
-                        unit_content,
-                        result.basic_compliance,
-                        result.technical_compliance
-                    )
-
-                    issues_count = sum(len(issue.get("review_lists", [])) for issue in issues)
-                    current = int(((i + 1) / total_units) * 100)
-
-                    # 发送进度更新
-                    await self._send_unit_complete_progress(state, i, total_units, section_label, issues_count)
-
-                    # 发送问题详情
-                    await self._send_unit_review_details(state, i, total_units, section_label, issues, current)
-
-            # 过滤成功结果
-            successful_results = [result for result in all_results if result.overall_risk != "error"]
-            return successful_results
-
-        except Exception as e:
-            logger.error(f"执行并发审查失败: {str(e)}")
-            return []
-
-    async def _ai_review_node(self, state: AIReviewState) -> AIReviewState:
-        """
-        AI审查节点 - 重构后的简化版本
-
-        Args:
-            state: AI审查状态
-
-        Returns:
-            AIReviewState: 更新后的审查状态
-        """
-        try:
-            state["current_stage"] = "ai_review"
-
-            # 1. 准备审查单元数据
-            review_chunks, total_units, total_all_units = await self._prepare_review_units(state)
-
-            # 2. 发送开始审查进度
-            await self._send_start_review_progress(state, total_units)
-
-            # 3. 执行并发审查
-            successful_results = await self._execute_concurrent_reviews(review_chunks, total_units, state)
-
-            # 4. 汇总结果
-            summary = self._aggregate_results(successful_results)
-
-            review_results = {
-                'total_all_units': total_all_units,  # 原始总单元数
-                'total_reviewed_units': total_units,  # 实际审查的单元数
-                'successful_units': len(successful_results),
-                'failed_units': total_units - len(successful_results),
-                'review_mode': self.review_mode,
-                'max_review_units': self.max_review_units,
-                'review_results': successful_results,
-                'summary': summary
-            }
-
-            state["review_results"] = review_results
-            state["messages"].append(AIMessage(
-                content=f"AI审查完成,共处理{total_units}个单元,成功{len(successful_results)}个"
-            ))
-
-            return state
+            logger.info(f"单元审查完成,更新进度: {current}% {message}")
 
+            # 发送processing_flag事件
+            if state["progress_manager"]:
+                logger.info(f"发送processing_flag事件: current={current}, message={message}")
+                await state["progress_manager"].update_stage_progress(
+                    callback_task_id=state["callback_task_id"],
+                    stage_name="AI审查",
+                    current=current,
+                    status="processing",
+                    message=message,
+                    user_id=state.get("user_id", ""),
+                    overall_task_status="processing",
+                    event_type="processing_flag"
+                )
         except Exception as e:
-            logger.error(f"AI审查失败: {str(e)}")
-            state["error_message"] = str(e)
-            state["messages"].append(AIMessage(content=f"AI审查失败: {str(e)}"))
-            return state
-
-    async def _complete_node(self, state: AIReviewState) -> AIReviewState:
-        """完成节点"""
-        logger.info(f"AI审查完成: {state['file_id']}")
-
-        state["current_stage"] = "complete"
-        state["status"] = "completed"
-
-        # 更新完成状态
-        if state["progress_manager"]:
-            await state["progress_manager"].update_stage_progress(
-                callback_task_id=state["callback_task_id"],
-                stage_name="AI审查",
-                current=90,
-                status="processing",
-                message="AI审查完成",
-                overall_task_status="processing",
-                event_type="processing"
-            )
-
-        state["messages"].append(AIMessage(content="AI审查工作流完成"))
-
-        return state
-
-    async def _error_handler_node(self, state: AIReviewState) -> AIReviewState:
-        """错误处理节点"""
-        logger.error(f"AI审查错误处理: {state['file_id']}, 错误: {state['error_message']}")
-
-        state["status"] = "failed"
-        state["current_stage"] = "error_handler"
-
-        # 更新错误状态
-        if state["progress_manager"]:
-            await state["progress_manager"].update_stage_progress(
-                callback_task_id=state["callback_task_id"],
-                stage_name="AI审查",
-                current=50,
-                status="failed",
-                message=f"AI审查失败: {state['error_message']}",
-                overall_task_status="failed",
-                event_type="error"
-            )
-
-        state["messages"].append(AIMessage(
-            content=f"错误处理: {state['error_message']}"
-        ))
-
-        return state
-
-
-    def _calculate_overall_risk(self, basic_result: Dict, technical_result: Dict, rag_result: Dict) -> str:
-        """
-        计算总体风险等级
-
-        Args:
-            basic_result: 基础合规性审查结果
-            technical_result: 技术性审查结果
-            rag_result: RAG增强审查结果
-
-        Returns:
-            str: 风险等级 ("low", "medium", "high")
-        """
-        try:
-            # 基于各种审查结果计算风险等级
-            # 风险等级计算逻辑:基础和技术审查都达到90分以上为低风险,70分以上为中风险,否则为高风险
-            basic_score = basic_result.get('overall_score', 0)
-            technical_score = technical_result.get('overall_score', 0)
-
-            if basic_score >= 90 and technical_score >= 90:
-                return "low"
-            elif basic_score >= 70 and technical_score >= 70:
-                return "medium"
-            else:
-                return "high"
-        except (KeyError, TypeError, ValueError) as e:
-            logger.warning(f"风险等级计算异常: {str(e)},使用默认风险等级")
-            return DEFAULT_RISK_LEVEL
-
-    def _aggregate_results(self, successful_results: List[ReviewResult]) -> Dict[str, Any]:
-        """
-        汇总审查结果
-
-        Args:
-            successful_results: 成功的审查结果列表
-
-        Returns:
-            Dict[str, Any]: 汇总后的统计信息,包括风险统计、平均得分等
-        """
-        try:
-            if not successful_results:
-                return {}
-
-            # 计算风险等级统计
-            risk_stats = {"low": 0, "medium": 0, "high": 0, "error": 0}
-            for result in successful_results:
-                risk_stats[result.overall_risk] += 1
-
-            # 计算平均分
-            total_basic_score = sum(r.basic_compliance.get('overall_score', 0) for r in successful_results)
-            total_technical_score = sum(r.technical_compliance.get('overall_score', 0) for r in successful_results)
-
-            avg_basic_score = total_basic_score / len(successful_results)
-            avg_technical_score = total_technical_score / len(successful_results)
-
-            return {
-                'risk_stats': risk_stats,
-                'avg_basic_score': avg_basic_score,
-                'avg_technical_score': avg_technical_score,
-                'total_reviewed': len(successful_results)
-            }
-        except (ZeroDivisionError, KeyError, TypeError) as e:
-            logger.error(f"结果汇总失败: {str(e)}")
-            return {}
-
-
-    def _check_ai_review_result(self, state: AIReviewState) -> str:
-        """检查AI审查结果"""
-        if state.get("error_message"):
-            return "error"
-        return "success"
-
-    def _get_workflow_graph(self):
-        """获取工作流图(可视化输出)"""
-        grandalf_graph = self.graph.get_graph()
-        grandalf_graph.print_ascii()
- 
-
-    async def _get_status(self) -> dict:
-        """获取工作流状态"""
-        if self.progress_manager:
-            return await self.progress_manager.get_progress(self.callback_task_id)
-        return {}
-
+            logger.warning(f"发送单元完成进度更新失败: {str(e)}")
 
     def _format_review_results_to_issues(self,callback_task_id: str, unit_index: int, review_location_label: str,
                                     unit_content: Dict[str, Any], basic_result: Dict[str, Any],
@@ -799,23 +735,24 @@ class AIReviewWorkflow:
 
         # 如果有审查结果,创建issue
         if review_lists:
+            issue_id = f"{callback_task_id}-{max_risk_level  or '0'}-{unit_index}"
             issue = {
-                "issue_id": f"{callback_task_id}-{max_risk_level}-{unit_index}",
-                "metadata": { 
-                    "review_location_label": review_location_label,
-                    "original_content": unit_content.get('content', '')
-                },
-                "risk_summary": {
-                    "max_risk_level": max_risk_level,
-                    "risk_count": risk_count
-                },
-                "review_lists": review_lists
+                issue_id: {  
+                    "metadata": {
+                        "review_location_label": review_location_label,
+                        "original_content": unit_content.get('content', '')
+                    },
+                    "risk_summary": {
+                        "max_risk_level": max_risk_level or '0',
+                        "risk_count": risk_count or 0
+                    },
+                    "review_lists": review_lists
+                }
             }
             issues.append(issue)
 
         return issues
 
-
     def _parse_ai_review_response(self,response: str, check_name: str) -> List[Dict[str, Any]]:
         """
         解析AI审查的Markdown格式响应
@@ -877,4 +814,75 @@ class AIReviewWorkflow:
                 "risk_info": {"risk_level": "low"}
             })
 
-        return review_lists
+        return review_lists
+    
+
+    def _calculate_overall_risk(self, basic_result: Dict, technical_result: Dict, rag_result: Dict) -> str:
+        """
+        计算总体风险等级
+
+        Args:
+            basic_result: 基础合规性审查结果
+            technical_result: 技术性审查结果
+            rag_result: RAG增强审查结果
+
+        Returns:
+            str: 风险等级 ("low", "medium", "high")
+        """
+        try:
+            # 基于各种审查结果计算风险等级
+            # 风险等级计算逻辑:基础和技术审查都达到90分以上为低风险,70分以上为中风险,否则为高风险
+            basic_score = basic_result.get('overall_score', 0)
+            technical_score = technical_result.get('overall_score', 0)
+
+            if basic_score >= 90 and technical_score >= 90:
+                return "low"
+            elif basic_score >= 70 and technical_score >= 70:
+                return "medium"
+            else:
+                return "high"
+        except (KeyError, TypeError, ValueError) as e:
+            logger.warning(f"风险等级计算异常: {str(e)},使用默认风险等级")
+            return DEFAULT_RISK_LEVEL
+
+    def _aggregate_results(self, successful_results: List[ReviewResult]) -> Dict[str, Any]:
+        """
+        汇总审查结果
+
+        Args:
+            successful_results: 成功的审查结果列表
+
+        Returns:
+            Dict[str, Any]: 汇总后的统计信息,包括风险统计、平均得分等
+        """
+        try:
+            if not successful_results:
+                return {}
+
+            # 计算风险等级统计
+            risk_stats = {"low": 0, "medium": 0, "high": 0, "error": 0}
+            for result in successful_results:
+                risk_stats[result.overall_risk] += 1
+
+            # 计算平均分
+            total_basic_score = sum(r.basic_compliance.get('overall_score', 0) for r in successful_results)
+            total_technical_score = sum(r.technical_compliance.get('overall_score', 0) for r in successful_results)
+
+            avg_basic_score = total_basic_score / len(successful_results)
+            avg_technical_score = total_technical_score / len(successful_results)
+
+            return {
+                'risk_stats': risk_stats,
+                'avg_basic_score': avg_basic_score,
+                'avg_technical_score': avg_technical_score,
+                'total_reviewed': len(successful_results)
+            }
+        except (ZeroDivisionError, KeyError, TypeError) as e:
+            logger.error(f"结果汇总失败: {str(e)}")
+            return {}
+
+    def _check_ai_review_result(self, state: AIReviewState) -> str:
+        """检查AI审查结果"""
+        if state.get("error_message"):
+            return "error"
+        return "success"

Разница между файлами не показана из-за своего большого размера
+ 12 - 14
temp/AI审查结果.json


Некоторые файлы не были показаны из-за большого количества измененных файлов