""" Celery应用配置 负责任务队列管理,不涉及具体业务逻辑 """ import os from celery import Celery from .config import config_handler # 导入trace系统 from foundation.trace.celery_trace import init # 从配置文件获取Redis连接信息 redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost') redis_port = config_handler.get('redis', 'REDIS_PORT', '6379') redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '') redis_db = config_handler.get('redis', 'REDIS_DB', '0') # 构建Redis连接URL if redis_password: redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}" else: redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}" print(f"Connecting to Redis: {redis_url}") app = Celery( 'workflow_tasks', broker=redis_url, backend=redis_url, include=['foundation.base.tasks'] ) # 配置 app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='Asia/Shanghai', enable_utc=True, # Worker配置 worker_prefetch_multiplier=2, # 每个worker一次只取一个任务 task_acks_late=True, # 任务完成后再确认 # 并发控制 worker_concurrency=2, # 每个worker进程数(文档处理较重,不宜过多) worker_pool='solo', # 使用单线程模式(避免GIL问题) # 网络和连接配置 - 防止30分钟断连 broker_connection_timeout=30, # 连接超时30秒 broker_connection_retry=True, # 启用连接重试 broker_connection_retry_on_startup=True, # 启动时重试 broker_connection_max_retries=10, # 最大重试次数 broker_heartbeat=60, # 心跳间隔60秒(默认是30秒的2倍) broker_transport_options={ 'visibility_timeout': 3600, # 任务可见性超时 'socket_keepalive': True, # 启用socket keepalive }, # 任务配置 task_track_started=True, task_time_limit=600, # 10分钟超时(文档处理较慢) task_soft_time_limit=540, # 9分钟软超时 worker_max_tasks_per_child=5, # 每个worker进程最多处理5个任务后重启(防止内存泄漏) # 结果过期时间 result_expires=3600, # 1小时后过期 # 连接池配置 broker_pool_limit=None, # 无连接池限制 result_backend_pool_limit=None, # 无结果后端连接池限制 ) # 初始化Celery trace系统 init()