| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- """
- Celery任务定义
- 只负责任务调度,具体业务逻辑由WorkflowManager处理
- """
- from celery import current_task
- from .celery_app import app
- from core.base.workflow_manager import WorkflowManager
- from foundation.observability.logger.loggering import review_logger, write_logger
- from foundation.observability.monitoring.time_statistics import track_execution_time
- @app.task(bind=True)
- def submit_task_processing_task(self, file_info: dict, _system_trace_id: str = None):
- """
- 提交任务处理到Celery队列
- 这个任务只负责调用WorkflowManager,不包含业务逻辑
- """
- import traceback
- logger = review_logger # 使用审查模块专用logger
- # 恢复trace_id上下文
- if _system_trace_id:
- from foundation.infrastructure.tracing import TraceContext
- TraceContext.set_trace_id(_system_trace_id)
- logger.info(f"Celery任务恢复")
- # 添加调试信息
- logger.info("=== Celery任务接收调试 ===")
- logger.info(f"队列ID: {self.request.id}")
- logger.info(f"文件ID: {file_info.get('file_id')}")
- logger.info(f"回调任务ID: {file_info.get('callback_task_id')}")
- logger.info("=== 任务接收调用栈 ===")
- for line in traceback.format_stack():
- logger.debug(f" {line.strip()}")
- logger.info("=== 调用栈结束 ===")
- try:
- # 更新任务状态 - 开始处理
- self.update_state(
- state='current',
- meta={
- 'current': 0,
- 'total': 100,
- 'status': '开始处理文档',
- 'file_id': file_info.get('file_id')
- }
- )
- logger.info(f"开始执行业务逻辑,文件ID: {file_info.get('file_id')}")
- # 创建独立的WorkflowManager实例执行业务逻辑
- workflow_manager = WorkflowManager(
- max_concurrent_docs=1, # Celery worker中单任务执行
- max_concurrent_reviews=5
- )
- # 同步执行(Celery worker本身就是独立的进程)
- result = workflow_manager.submit_construction_review_task_processing_sync(file_info)
- # 更新任务状态 - 完成
- self.update_state(
- state='current',
- meta={
- 'current': 100,
- 'total': 100,
- 'status': '处理完成',
- 'file_id': file_info.get('file_id')
- }
- )
- return {
- 'status': 'success',
- 'file_id': file_info.get('file_id'),
- 'callback_task_id': file_info.get('callback_task_id'),
- 'result': result
- }
- except Exception as e:
- # 记录错误并重试
- logger.error(f"任务处理失败: {str(e)}")
- logger.exception("详细错误信息:")
- # 自动重试,延迟60秒,最多重试2次
- self.retry(countdown=60, max_retries=2, exc=e)
- raise
- # ==================== 施工方案编写任务 ====================
- @app.task(bind=True)
- def submit_outline_generation_task(self, task_info: dict, _system_trace_id: str = None):
- """
- 提交大纲生成任务到 Celery 队列
- 这个任务只负责调用 WorkflowManager,不包含业务逻辑
- """
- import traceback
- logger = write_logger # 使用编写模块专用logger
- # 恢复 trace_id 上下文
- if _system_trace_id:
- from foundation.infrastructure.tracing import TraceContext
- TraceContext.set_trace_id(_system_trace_id)
- logger.info(f"大纲生成 Celery 任务恢复 trace_id: {_system_trace_id}")
- logger.info("=== 大纲生成 Celery 任务接收 ===")
- logger.info(f"队列ID: {self.request.id}")
- logger.info(f"用户ID: {task_info.get('user_id')}")
- logger.info(f"项目: {task_info.get('project_info', {}).get('project_name', 'unknown')}")
- try:
- # 更新任务状态 - 开始处理
- self.update_state(
- state='current',
- meta={
- 'current': 0,
- 'total': 100,
- 'status': '开始生成大纲',
- 'callback_task_id': task_info.get('callback_task_id'),
- 'project_name': task_info.get('project_info', {}).get('project_name', '')
- }
- )
- logger.info(f"开始执行大纲生成业务逻辑")
- # 创建独立的 WorkflowManager 实例执行业务逻辑
- workflow_manager = WorkflowManager(
- max_concurrent_docs=1, # Celery worker 中单任务执行
- max_concurrent_reviews=5
- )
- # 同步执行
- result = workflow_manager.submit_outline_generation_sync(task_info)
- # 更新任务状态 - 完成
- self.update_state(
- state='current',
- meta={
- 'current': 100,
- 'total': 100,
- 'status': '大纲生成完成',
- 'callback_task_id': result.get('callback_task_id'),
- 'overall_task_status': result.get('overall_task_status')
- }
- )
- return {
- 'status': 'success',
- 'callback_task_id': result.get('callback_task_id'),
- 'overall_task_status': result.get('overall_task_status'),
- 'result': result
- }
- except Exception as e:
- logger.error(f"大纲生成任务失败: {str(e)}")
- logger.exception("详细错误信息:")
- # 自动重试,延迟60秒,最多重试2次
- self.retry(countdown=60, max_retries=2, exc=e)
- raise
|