| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- """
- Celery Trace管理
- 负责在Celery队列任务中传递和恢复trace_id上下文
- """
- from celery.signals import task_prerun, task_postrun, task_failure
- from foundation.trace.trace_context import TraceContext
- from foundation.logger.loggering import server_logger as logger
- class CeleryTraceManager:
- """Celery trace上下文管理器"""
- @staticmethod
- def init_celery_signals():
- """初始化Celery信号,自动管理trace_id上下文"""
- @task_prerun.connect
- def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
- """
- 任务执行前的信号处理
- 从任务参数中提取trace_id并设置到TraceContext
- """
- try:
- # 从kwargs中提取trace_id参数
- trace_id = kwargs.pop('_system_trace_id', None) or kwargs.pop('callback_task_id', None)
- if trace_id:
- TraceContext.set_trace_id(trace_id)
- logger.info(f"Celery任务恢复trace_id: {trace_id}, 任务ID: {task_id}")
- else:
- # 如果没有找到trace_id,生成一个临时的
- fallback_trace = f"celery-{task_id[:8]}"
- TraceContext.set_trace_id(fallback_trace)
- logger.warning(f"Celery任务未找到trace_id,使用临时trace: {fallback_trace}")
- except Exception as e:
- logger.error(f"Celery任务trace_id恢复失败: {str(e)}")
- # 生成临时trace_id
- fallback_trace = f"celery-error-{task_id[:8]}"
- TraceContext.set_trace_id(fallback_trace)
- @task_postrun.connect
- def task_postrun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
- """
- 任务执行后的信号处理
- 清理trace_id上下文
- """
- try:
- trace_id = TraceContext.get_trace_id()
- logger.info(f"Celery任务完成: {trace_id}, 任务ID: {task_id}")
- # 可选:清理trace_id
- # TraceContext.set_trace_id(None)
- except Exception as e:
- logger.error(f"Celery任务trace_id清理失败: {str(e)}")
- @task_failure.connect
- def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwds):
- """
- 任务失败时的信号处理
- """
- try:
- trace_id = TraceContext.get_trace_id()
- logger.error(f"Celery任务失败: {trace_id}, 任务ID: {task_id}, 错误: {str(exception)}")
- except Exception as e:
- logger.error(f"Celery任务失败trace_id记录失败: {str(e)}, 任务ID: {task_id}")
- @staticmethod
- def submit_celery_task(task_func, *args, **kwargs):
- """
- 提交Celery任务时自动传递当前trace_id
- Args:
- task_func: Celery任务函数
- *args: 位置参数
- **kwargs: 关键字参数
- Returns:
- Celery任务结果
- """
- # 获取当前trace_id
- current_trace_id = TraceContext.get_trace_id()
- # 将trace_id添加到任务参数中
- if current_trace_id and current_trace_id != 'no-trace':
- kwargs['_system_trace_id'] = current_trace_id
- logger.info(f"提交Celery任务")
- # 提交任务
- return task_func.delay(*args, **kwargs)
- def add_trace_to_celery_task(celery_task_func):
- """
- 装饰器:为Celery任务函数自动添加trace_id支持
- Usage:
- @add_trace_to_celery_task
- @app.task(bind=True)
- def my_task(self, file_info: dict):
- # 任务逻辑
- pass
- """
- def decorator(*args, **kwargs):
- # 获取当前trace_id
- current_trace_id = TraceContext.get_trace_id()
- if current_trace_id and current_trace_id != 'no-trace':
- kwargs['_system_trace_id'] = current_trace_id
- return celery_task_func(*args, **kwargs)
- return decorator
- # 自动初始化Celery信号
- def init():
- """初始化Celery trace系统"""
- CeleryTraceManager.init_celery_signals()
- logger.info("Celery trace系统初始化完成")
|