""" Celery任务定义 只负责任务调度,具体业务逻辑由WorkflowManager处理 """ from celery import current_task from .celery_app import app from core.base.workflow_manager import WorkflowManager from foundation.observability.logger.loggering import review_logger, write_logger from foundation.observability.monitoring.time_statistics import track_execution_time @app.task(bind=True) def submit_task_processing_task(self, file_info: dict, _system_trace_id: str = None): """ 提交任务处理到Celery队列 这个任务只负责调用WorkflowManager,不包含业务逻辑 """ import traceback logger = review_logger # 使用审查模块专用logger # 恢复trace_id上下文 if _system_trace_id: from foundation.infrastructure.tracing import TraceContext TraceContext.set_trace_id(_system_trace_id) logger.info(f"Celery任务恢复") # 添加调试信息 logger.info("=== Celery任务接收调试 ===") logger.info(f"队列ID: {self.request.id}") logger.info(f"文件ID: {file_info.get('file_id')}") logger.info(f"回调任务ID: {file_info.get('callback_task_id')}") logger.info("=== 任务接收调用栈 ===") for line in traceback.format_stack(): logger.debug(f" {line.strip()}") logger.info("=== 调用栈结束 ===") try: # 更新任务状态 - 开始处理 self.update_state( state='current', meta={ 'current': 0, 'total': 100, 'status': '开始处理文档', 'file_id': file_info.get('file_id') } ) logger.info(f"开始执行业务逻辑,文件ID: {file_info.get('file_id')}") # 创建独立的WorkflowManager实例执行业务逻辑 workflow_manager = WorkflowManager( max_concurrent_docs=1, # Celery worker中单任务执行 max_concurrent_reviews=5 ) # 同步执行(Celery worker本身就是独立的进程) result = workflow_manager.submit_construction_review_task_processing_sync(file_info) # 更新任务状态 - 完成 self.update_state( state='current', meta={ 'current': 100, 'total': 100, 'status': '处理完成', 'file_id': file_info.get('file_id') } ) return { 'status': 'success', 'file_id': file_info.get('file_id'), 'callback_task_id': file_info.get('callback_task_id'), 'result': result } except Exception as e: # 记录错误并重试 logger.error(f"任务处理失败: {str(e)}") logger.exception("详细错误信息:") # 自动重试,延迟60秒,最多重试2次 self.retry(countdown=60, max_retries=2, exc=e) raise # ==================== 施工方案编写任务 ==================== @app.task(bind=True) def submit_outline_generation_task(self, task_info: dict, _system_trace_id: str = None): """ 提交大纲生成任务到 Celery 队列 这个任务只负责调用 WorkflowManager,不包含业务逻辑 """ import traceback logger = write_logger # 使用编写模块专用logger # 恢复 trace_id 上下文 if _system_trace_id: from foundation.infrastructure.tracing import TraceContext TraceContext.set_trace_id(_system_trace_id) logger.info(f"大纲生成 Celery 任务恢复 trace_id: {_system_trace_id}") logger.info("=== 大纲生成 Celery 任务接收 ===") logger.info(f"队列ID: {self.request.id}") logger.info(f"用户ID: {task_info.get('user_id')}") logger.info(f"项目: {task_info.get('project_info', {}).get('project_name', 'unknown')}") try: # 更新任务状态 - 开始处理 self.update_state( state='current', meta={ 'current': 0, 'total': 100, 'status': '开始生成大纲', 'callback_task_id': task_info.get('callback_task_id'), 'project_name': task_info.get('project_info', {}).get('project_name', '') } ) logger.info(f"开始执行大纲生成业务逻辑") # 创建独立的 WorkflowManager 实例执行业务逻辑 workflow_manager = WorkflowManager( max_concurrent_docs=1, # Celery worker 中单任务执行 max_concurrent_reviews=5 ) # 同步执行 result = workflow_manager.submit_outline_generation_sync(task_info) # 更新任务状态 - 完成 self.update_state( state='current', meta={ 'current': 100, 'total': 100, 'status': '大纲生成完成', 'callback_task_id': result.get('callback_task_id'), 'overall_task_status': result.get('overall_task_status') } ) return { 'status': 'success', 'callback_task_id': result.get('callback_task_id'), 'overall_task_status': result.get('overall_task_status'), 'result': result } except Exception as e: logger.error(f"大纲生成任务失败: {str(e)}") logger.exception("详细错误信息:") # 自动重试,延迟60秒,最多重试2次 self.retry(countdown=60, max_retries=2, exc=e) raise