celery_trace.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. """
  2. Celery Trace管理
  3. 负责在Celery队列任务中传递和恢复trace_id上下文
  4. """
  5. from celery.signals import task_prerun, task_postrun, task_failure
  6. from foundation.trace.trace_context import TraceContext
  7. from foundation.logger.loggering import server_logger as logger
  8. class CeleryTraceManager:
  9. """Celery trace上下文管理器"""
  10. @staticmethod
  11. def init_celery_signals():
  12. """初始化Celery信号,自动管理trace_id上下文"""
  13. @task_prerun.connect
  14. def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
  15. """
  16. 任务执行前的信号处理
  17. 从任务参数中提取trace_id并设置到TraceContext
  18. """
  19. try:
  20. # 从kwargs中提取trace_id参数
  21. trace_id = kwargs.pop('_system_trace_id', None) or kwargs.pop('callback_task_id', None)
  22. if trace_id:
  23. TraceContext.set_trace_id(trace_id)
  24. logger.info(f"Celery任务恢复trace_id: {trace_id}, 任务ID: {task_id}")
  25. else:
  26. # 如果没有找到trace_id,生成一个临时的
  27. fallback_trace = f"celery-{task_id[:8]}"
  28. TraceContext.set_trace_id(fallback_trace)
  29. logger.warning(f"Celery任务未找到trace_id,使用临时trace: {fallback_trace}")
  30. except Exception as e:
  31. logger.error(f"Celery任务trace_id恢复失败: {str(e)}")
  32. # 生成临时trace_id
  33. fallback_trace = f"celery-error-{task_id[:8]}"
  34. TraceContext.set_trace_id(fallback_trace)
  35. @task_postrun.connect
  36. def task_postrun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
  37. """
  38. 任务执行后的信号处理
  39. 清理trace_id上下文
  40. """
  41. try:
  42. trace_id = TraceContext.get_trace_id()
  43. logger.info(f"Celery任务完成: {trace_id}, 任务ID: {task_id}")
  44. # 可选:清理trace_id
  45. # TraceContext.set_trace_id(None)
  46. except Exception as e:
  47. logger.error(f"Celery任务trace_id清理失败: {str(e)}")
  48. @task_failure.connect
  49. def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwds):
  50. """
  51. 任务失败时的信号处理
  52. """
  53. try:
  54. trace_id = TraceContext.get_trace_id()
  55. logger.error(f"Celery任务失败: {trace_id}, 任务ID: {task_id}, 错误: {str(exception)}")
  56. except Exception as e:
  57. logger.error(f"Celery任务失败trace_id记录失败: {str(e)}, 任务ID: {task_id}")
  58. @staticmethod
  59. def submit_celery_task(task_func, *args, **kwargs):
  60. """
  61. 提交Celery任务时自动传递当前trace_id
  62. Args:
  63. task_func: Celery任务函数
  64. *args: 位置参数
  65. **kwargs: 关键字参数
  66. Returns:
  67. Celery任务结果
  68. """
  69. # 获取当前trace_id
  70. current_trace_id = TraceContext.get_trace_id()
  71. # 将trace_id添加到任务参数中
  72. if current_trace_id and current_trace_id != 'no-trace':
  73. kwargs['_system_trace_id'] = current_trace_id
  74. logger.info(f"提交Celery任务")
  75. # 提交任务
  76. return task_func.delay(*args, **kwargs)
  77. def add_trace_to_celery_task(celery_task_func):
  78. """
  79. 装饰器:为Celery任务函数自动添加trace_id支持
  80. Usage:
  81. @add_trace_to_celery_task
  82. @app.task(bind=True)
  83. def my_task(self, file_info: dict):
  84. # 任务逻辑
  85. pass
  86. """
  87. def decorator(*args, **kwargs):
  88. # 获取当前trace_id
  89. current_trace_id = TraceContext.get_trace_id()
  90. if current_trace_id and current_trace_id != 'no-trace':
  91. kwargs['_system_trace_id'] = current_trace_id
  92. return celery_task_func(*args, **kwargs)
  93. return decorator
  94. # 自动初始化Celery信号
  95. def init():
  96. """初始化Celery trace系统"""
  97. CeleryTraceManager.init_celery_signals()
  98. logger.info("Celery trace系统初始化完成")