Browse Source

fix(workflow_manager): 添加模块级单例修复 outline_generator 章节级取消失效

历史上 7 个 view + Celery tasks.py 各自 new WorkflowManager(),导致同进程
内 active_chains/active_outline_tasks 等内存字典被切成多份;同时
outline_generator.py 写了 `from ... import workflow_manager`(小写)但模块未
定义此名字,每次大纲章节级取消检查都抛 ImportError,取消功能完全失效。

- core/base/workflow_manager.py 末尾追加模块级单例
- 11 处 view/task 实例化收敛为 import 单例
- ai_review_workflow.py / ai_review_core_fun.py 保持延迟 import 避免循环
WangXuMing 1 month ago
parent
commit
5d47d7236a

+ 18 - 1
core/base/workflow_manager.py

@@ -1575,4 +1575,21 @@ class WorkflowManager:
 
         except Exception as e:
             logger.error(f"获取大纲任务信息失败: {str(e)}", exc_info=True)
-            return None
+            return None
+
+
+# ============================================================================
+# 模块级单例 - 同进程内所有调用方共享同一个 WorkflowManager
+#
+# 修复以下隐患:
+# 1. outline_generator.py 引用了 `from ... import workflow_manager`(小写实例)
+#    历史上没有定义此名字,导致 ImportError 与 “检查终止信号失败” ERROR 频刷
+# 2. 各 view / Celery task 各自 new WorkflowManager(),导致 active_chains 等
+#    内存状态彼此隔离,跨模块的任务追踪/取消依赖 Redis 兜底,可靠性下降
+#
+# 注意:
+# - 主进程 (FastAPI) 与 Celery worker 是不同进程,本就不共享内存;
+#   单例语义只覆盖同进程内的引用收敛。
+# - 实例化只是初始化字典/Semaphore/状态,不会触发 IO 或循环导入。
+# ============================================================================
+workflow_manager = WorkflowManager(max_concurrent_docs=3, max_concurrent_reviews=5)

+ 5 - 3
core/construction_review/workflows/ai_review_workflow.py

@@ -97,9 +97,10 @@ class AIReviewWorkflow:
 
         self.max_concurrent = 20 # 规范性与时效性审查最大并发数
 
-        # 延迟导入 WorkflowManager(避免循环导入)
-        from core.base.workflow_manager import WorkflowManager
-        self.workflow_manager = WorkflowManager()
+        # 延迟导入 workflow_manager 单例(避免循环导入:
+        # core.base.workflow_manager 顶部已经 import 了本模块)
+        from core.base.workflow_manager import workflow_manager
+        self.workflow_manager = workflow_manager
 
         self.graph = self._build_workflow()
 
@@ -312,6 +313,7 @@ class AIReviewWorkflow:
             # 2. 解析审查项配置
             review_func_mapping: Dict[str, Union[str, List[str]]] = {
                 'sensitive_word_check': 'sensitive_word_check',
+                'grammar_check': 'grammar_check',
                 'semantic_logic_check': 'check_semantic_logic',
                 'completeness_check': 'check_completeness',
                 'timeliness_check': 'timeliness_reviewer',  # 统一入口

+ 25 - 35
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -97,9 +97,10 @@ class AIReviewCoreFun:
         
         self.max_concurrent = 20
 
-        # 延迟导入 WorkflowManager(避免循环导入)
-        from core.base.workflow_manager import WorkflowManager
-        self.workflow_manager = WorkflowManager()
+        # 延迟导入 workflow_manager 单例(避免循环导入:
+        # core.base.workflow_manager 顶部已经 import 了本模块所属包)
+        from core.base.workflow_manager import workflow_manager
+        self.workflow_manager = workflow_manager
 
     async def _process_chapter_item(
         self,
@@ -269,7 +270,8 @@ class AIReviewCoreFun:
             logger.debug("开始执行RAG检索增强")
             rag_enhanced_content = self.ai_review_engine.rag_enhanced_check(chunk.get('content', ''))
 
-        if ('reference_basis_reviewer' in func_names or 'timeliness_basis_reviewer' in func_names) and not is_complete_field:
+        if ('reference_basis_reviewer' in func_names or 'timeliness_reviewer' in func_names or
+            'timeliness_basis_reviewer' in func_names or 'timeliness_content_reviewer' in func_names) and not is_complete_field:
             logger.debug("开始执行编制依据/时效性预处理")
             # 预处理编制依据/时效性审查所需内容
             basis_content = await directory_extraction.extract_basis_with_langchain_qwen(
@@ -392,6 +394,19 @@ class AIReviewCoreFun:
                 is_sse_push=True
             )
 
+        elif func_name == "grammar_check" and not is_complete_field:
+            raw_result = await method(trace_id, review_content, state, stage_name)
+            # 基础审查方法,放入 basic_compliance
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={func_name: raw_result},
+                technical_compliance={},
+                rag_enhanced={},
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
+            )
+
         elif func_name == "check_semantic_logic" and not is_complete_field:
             raw_result = await method(trace_id, review_content, state, stage_name)
             # 基础审查方法,放入 basic_compliance
@@ -503,11 +518,12 @@ class AIReviewCoreFun:
 
 
 
-        # reference_basis_reviewer:编制依据审查(逐块处理
+        # reference_basis_reviewer:规范性审查(逐块处理,支持basis和其他章节
         elif func_name == "reference_basis_reviewer" and not is_complete_field:
             review_data = {
                 "content": review_content,  # 原始文本内容
-                "basis_items": basis_content,  # 提取的 BasisItems 对象
+                "basis_items": basis_content,  # 提取的 BasisItems 对象(basis章节使用)
+                "chapter_code": chapter_code,  # 章节代码
                 "max_concurrent": self.max_concurrent
             }
             raw_result = await method(
@@ -527,37 +543,11 @@ class AIReviewCoreFun:
                 is_sse_push=True
             )
 
-        # timeliness_basis_reviewer:时效性审查(逐块处理
-        elif func_name == "timeliness_basis_reviewer" and not is_complete_field:
+        # timeliness_reviewer:统一的时效性审查入口(支持basis和content两种来源
+        elif func_name in ("timeliness_basis_reviewer", "timeliness_content_reviewer", "timeliness_reviewer") and not is_complete_field:
             review_data = {
                 "content": review_content,  # 原始文本内容
-                "basis_items": basis_content,  # 提取的 BasisItems 对象
-                "max_concurrent": self.max_concurrent
-            }
-            raw_result = await method(
-                review_data=review_data,
-                trace_id=trace_id,
-                state=state,
-                stage_name=stage_name
-            )
-            # 基础审查方法,放入 basic_compliance
-            return UnitReviewResult(
-                unit_index=chunk_index,
-                unit_content=chunk,
-                basic_compliance={func_name: raw_result},
-                technical_compliance={},
-                rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result),
-                is_sse_push=True
-            )
-
-        # timeliness_content_reviewer:三级分类内容时效性审查(逐块处理)
-        elif func_name == "timeliness_content_reviewer" and not is_complete_field:
-            # 从chunk中获取三级分类详情
-            tertiary_details = chunk.get("tertiary_classification_details", [])
-            review_data = {
-                "tertiary_classification_details": tertiary_details,  # 三级分类详情
-                "max_concurrent": 4
+                "basis_items": basis_content,  # 提取的 BasisItems 对象(可能为None)
             }
             raw_result = await method(
                 review_data=review_data,

+ 4 - 13
foundation/infrastructure/messaging/tasks.py

@@ -5,7 +5,7 @@ Celery任务定义
 
 from celery import current_task
 from .celery_app import app
-from core.base.workflow_manager import WorkflowManager
+from core.base.workflow_manager import workflow_manager
 from foundation.observability.logger.loggering import review_logger, write_logger
 from foundation.observability.monitoring.time_statistics import track_execution_time
 
@@ -49,12 +49,8 @@ def submit_task_processing_task(self, file_info: dict, _system_trace_id: str = N
 
         logger.info(f"开始执行业务逻辑,文件ID: {file_info.get('file_id')}")
 
-        # 创建独立的WorkflowManager实例执行业务逻辑
-        workflow_manager = WorkflowManager(
-            max_concurrent_docs=1,  # Celery worker中单任务执行
-            max_concurrent_reviews=5
-        )
-
+        # 复用进程内的 WorkflowManager 单例(Celery worker 与主进程是不同进程,
+        # 单例在 worker 进程内独立创建一次,并发参数沿用模块级默认值)
         # 同步执行(Celery worker本身就是独立的进程)
 
         result = workflow_manager.submit_construction_review_task_processing_sync(file_info)
@@ -126,12 +122,7 @@ def submit_outline_generation_task(self, task_info: dict, _system_trace_id: str
 
         logger.info(f"开始执行大纲生成业务逻辑")
 
-        # 创建独立的 WorkflowManager 实例执行业务逻辑
-        workflow_manager = WorkflowManager(
-            max_concurrent_docs=1,  # Celery worker 中单任务执行
-            max_concurrent_reviews=5
-        )
-
+        # 复用进程内的 WorkflowManager 单例
         # 同步执行
         result = workflow_manager.submit_outline_generation_sync(task_info)
 

+ 1 - 6
views/construction_review/file_upload.py

@@ -16,7 +16,7 @@ from typing import List
 from foundation.utils import md5
 from foundation.infrastructure.config import config_handler
 from .schemas.error_schemas import FileUploadErrors
-from core.base.workflow_manager import WorkflowManager
+from core.base.workflow_manager import workflow_manager
 from foundation.observability.logger.loggering import review_logger as logger
 from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Request
 from foundation.infrastructure.tracing import TraceContext, auto_trace
@@ -171,11 +171,6 @@ def _convert_via_libreoffice(docx_content: bytes, filename: str) -> tuple[bytes,
 
 file_upload_router = APIRouter(prefix="/sgsc", tags=["前端接口"])
 uploaded_files = {}
-# 初始化工作流管理器
-workflow_manager = WorkflowManager(
-    max_concurrent_docs=3,
-    max_concurrent_reviews=5
-)
 # 使用workflow_manager的duplicatechecker实例,确保一致性
 duplicatechecker = workflow_manager.redis_duplicate_checker
 

+ 5 - 9
views/construction_review/launch_review.py

@@ -17,18 +17,13 @@ from core.base.redis_duplicate_checker import RedisDuplicateChecker
 from foundation.observability.logger.loggering import review_logger as logger
 from foundation.infrastructure.tracing import TraceContext, auto_trace
 from foundation.utils.redis_utils import get_file_info,store_file_info
-from core.base.workflow_manager import WorkflowManager
+from core.base.workflow_manager import workflow_manager
 from core.base.progress_manager import ProgressManager
 from views.construction_review.file_upload import validate_upload_parameters
 from .schemas.error_schemas import LaunchReviewErrors
 
 launch_review_router = APIRouter(prefix="/sgsc", tags=["前端接口"])
 duplicatechecker = RedisDuplicateChecker()
-# 初始化工作流管理器
-workflow_manager = WorkflowManager(
-    max_concurrent_docs=3,
-    max_concurrent_reviews=5
-)
 # 初始化进度管理器
 progress_manager = ProgressManager()
 
@@ -88,12 +83,13 @@ def validate_review_config(review_config: List[str]) -> None:
 
     # 支持的审查项枚举值
     supported_review_items = {
-        'sensitive_word_check',       # 词句语法检查
+        'sensitive_word_check',       # 敏感词审查
+        'grammar_check',              # 词句语法检查
         'semantic_logic_check',       # 语义逻辑审查
         'completeness_check',         # 条文完整性审查
         'timeliness_check',           # 时效性审查
         'reference_check',            # 规范性审查
-        'sensitive_check',      # 敏感词审
+        'sensitive_check',            # 敏感信息检
         'non_parameter_compliance_check',  # 非参数合规性检查功能
         'parameter_compliance_check', # 参数合规性检查功能
     }
@@ -165,7 +161,7 @@ def validate_review_item_config(review_item_config: List[str]) -> None:
             "acceptance", "other"
         },
         "review_dimensions": {
-            "sensitive_word_check", "semantic_logic_check", "completeness_check",
+            "sensitive_word_check", "grammar_check", "semantic_logic_check", "completeness_check",
             "timeliness_check", "reference_check", "sensitive_check",
             "non_parameter_compliance_check", "parameter_compliance_check"
         }

+ 1 - 4
views/construction_review/task_control.py

@@ -7,13 +7,10 @@ from typing import List, Optional, Dict, Any
 from pydantic import BaseModel, Field
 from fastapi import APIRouter, HTTPException, Query
 from foundation.observability.logger.loggering import review_logger as logger
-from core.base.workflow_manager import WorkflowManager
+from core.base.workflow_manager import workflow_manager
 
 task_control_router = APIRouter(prefix="/sgsc", tags=["任务控制"])
 
-# 初始化工作流管理器
-workflow_manager = WorkflowManager()
-
 
 class TerminateTaskRequest(BaseModel):
     """终止任务请求模型"""

+ 1 - 2
views/construction_write/content_completion.py

@@ -18,13 +18,12 @@ from fastapi.responses import StreamingResponse
 from foundation.observability.logger.loggering import write_logger as logger
 from foundation.infrastructure.tracing import TraceContext, auto_trace
 from foundation.infrastructure.config.config import config_handler
-from core.base.workflow_manager import WorkflowManager
+from core.base.workflow_manager import workflow_manager
 from redis.asyncio import Redis as AsyncRedis
 
 # ==================== 1. 配置与路径初始化 ====================
 
 content_completion_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"])
-workflow_manager = WorkflowManager(max_concurrent_docs=3, max_concurrent_reviews=5)
 
 # ==================== 2. 全局资源池 (速度优化核心) ====================
 

+ 1 - 7
views/construction_write/outline_views.py

@@ -22,7 +22,7 @@ from fastapi.responses import StreamingResponse
 from foundation.observability.logger.loggering import write_logger as logger
 from foundation.infrastructure.tracing import TraceContext, auto_trace
 from foundation.infrastructure.config.config import config_handler
-from core.base.workflow_manager import WorkflowManager
+from core.base.workflow_manager import workflow_manager
 from core.base.sse_manager import unified_sse_manager
 from core.base.progress_manager import ProgressManager
 from redis.asyncio import Redis as AsyncRedis
@@ -30,12 +30,6 @@ from redis.asyncio import Redis as AsyncRedis
 # 创建路由
 outline_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"])
 
-# 初始化工作流管理器
-workflow_manager = WorkflowManager(
-    max_concurrent_docs=3,
-    max_concurrent_reviews=5
-)
-
 # 初始化进度管理器
 progress_manager = ProgressManager()
 

+ 1 - 7
views/construction_write/regenerate_views.py

@@ -13,7 +13,7 @@ from fastapi import APIRouter, HTTPException
 from fastapi.responses import StreamingResponse
 from foundation.observability.logger.loggering import write_logger as logger
 from foundation.infrastructure.tracing import TraceContext, auto_trace
-from core.base.workflow_manager import WorkflowManager
+from core.base.workflow_manager import workflow_manager
 from core.base.sse_manager import unified_sse_manager
 from core.base.progress_manager import ProgressManager
 from redis.asyncio import Redis as AsyncRedis
@@ -22,12 +22,6 @@ from redis.asyncio import Redis as AsyncRedis
 # 创建路由
 regenerate_outline_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"])
 
-# 初始化工作流管理器
-workflow_manager = WorkflowManager(
-    max_concurrent_docs=3,
-    max_concurrent_reviews=5
-)
-
 # 初始化进度管理器
 progress_manager = ProgressManager()
 

+ 1 - 7
views/construction_write/task_cancel_views.py

@@ -11,18 +11,12 @@ from pydantic import BaseModel, Field
 from fastapi import APIRouter, HTTPException
 from foundation.observability.logger.loggering import write_logger as logger
 from foundation.infrastructure.tracing import TraceContext, auto_trace
-from core.base.workflow_manager import WorkflowManager
+from core.base.workflow_manager import workflow_manager
 from core.base.sse_manager import unified_sse_manager
 
 # 创建路由
 task_cancel_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"])
 
-# 初始化工作流管理器
-workflow_manager = WorkflowManager(
-    max_concurrent_docs=3,
-    max_concurrent_reviews=5
-)
-
 
 class TaskCancelRequest(BaseModel):
     """任务取消请求模型