""" Celery Trace管理 负责在Celery队列任务中传递和恢复trace_id上下文 """ from celery.signals import task_prerun, task_postrun, task_failure from .trace_context import TraceContext class CeleryTraceManager: """Celery trace上下文管理器""" @staticmethod def init_celery_signals(): """初始化Celery信号,自动管理trace_id上下文""" @task_prerun.connect def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds): """ 任务执行前的信号处理 从任务参数中提取trace_id并设置到TraceContext """ # 延迟导入避免循环依赖 from foundation.observability.logger.loggering import review_logger as logger try: # 从kwargs中提取trace_id参数 trace_id = kwargs.pop('_system_trace_id', None) or kwargs.pop('callback_task_id', None) if trace_id: TraceContext.set_trace_id(trace_id) logger.info(f"Celery任务恢复trace_id: {trace_id}, 任务ID: {task_id}") else: # 如果没有找到trace_id,生成一个临时的 fallback_trace = f"celery-{task_id[:8]}" TraceContext.set_trace_id(fallback_trace) logger.warning(f"Celery任务未找到trace_id,使用临时trace: {fallback_trace}") except Exception as e: logger.error(f"Celery任务trace_id恢复失败: {str(e)}") # 生成临时trace_id fallback_trace = f"celery-error-{task_id[:8]}" TraceContext.set_trace_id(fallback_trace) @task_postrun.connect def task_postrun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds): """ 任务执行后的信号处理 清理trace_id上下文 """ # 延迟导入避免循环依赖 from foundation.observability.logger.loggering import review_logger as logger try: trace_id = TraceContext.get_trace_id() logger.info(f"Celery任务完成: {trace_id}, 任务ID: {task_id}") # 可选:清理trace_id # TraceContext.set_trace_id(None) except Exception as e: logger.error(f"Celery任务trace_id清理失败: {str(e)}") @task_failure.connect def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwds): """ 任务失败时的信号处理 """ # 延迟导入避免循环依赖 from foundation.observability.logger.loggering import review_logger as logger try: trace_id = TraceContext.get_trace_id() logger.error(f"Celery任务失败: {trace_id}, 任务ID: {task_id}, 错误: {str(exception)}") except Exception as e: logger.error(f"Celery任务失败trace_id记录失败: {str(e)}, 任务ID: {task_id}") @staticmethod def submit_celery_task(task_func, *args, **kwargs): """ 提交Celery任务时自动传递当前trace_id Args: task_func: Celery任务函数 *args: 位置参数 **kwargs: 关键字参数 Returns: Celery任务结果 """ # 延迟导入避免循环依赖 from foundation.observability.logger.loggering import review_logger as logger # 获取当前trace_id current_trace_id = TraceContext.get_trace_id() # 将trace_id添加到任务参数中 if current_trace_id and current_trace_id != 'no-trace': kwargs['_system_trace_id'] = current_trace_id logger.info(f"提交Celery任务") # 提交任务 return task_func.delay(*args, **kwargs) def add_trace_to_celery_task(celery_task_func): """ 装饰器:为Celery任务函数自动添加trace_id支持 Usage: @add_trace_to_celery_task @app.task(bind=True) def my_task(self, file_info: dict): # 任务逻辑 pass """ def decorator(*args, **kwargs): # 获取当前trace_id current_trace_id = TraceContext.get_trace_id() if current_trace_id and current_trace_id != 'no-trace': kwargs['_system_trace_id'] = current_trace_id return celery_task_func(*args, **kwargs) return decorator # 自动初始化Celery信号 def init(): """初始化Celery trace系统""" # 延迟导入避免循环依赖 try: from foundation.observability.logger.loggering import review_logger as logger except ImportError: import logging logger = logging.getLogger(__name__) CeleryTraceManager.init_celery_signals() try: logger.info("Celery trace系统初始化完成") except: pass # 如果logger不可用,静默继续