| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 |
- """
- Celery应用配置
- 负责任务队列管理,不涉及具体业务逻辑
- """
- import os
- from celery import Celery
- from .config import config_handler
- # 从配置文件获取Redis连接信息
- redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
- redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
- redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
- # 构建Redis连接URL
- if redis_password:
- redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/0"
- else:
- redis_url = f"redis://{redis_host}:{redis_port}/0"
- print(f"Connecting to Redis: {redis_url}")
- app = Celery(
- 'workflow_tasks',
- broker=redis_url,
- backend=redis_url,
- include=['foundation.base.tasks']
- )
- # 配置
- app.conf.update(
- task_serializer='json',
- accept_content=['json'],
- result_serializer='json',
- timezone='Asia/Shanghai',
- enable_utc=True,
- # Worker配置
- worker_prefetch_multiplier=1, # 每个worker一次只取一个任务
- task_acks_late=True, # 任务完成后再确认
- # 并发控制
- worker_concurrency=2, # 每个worker进程数(文档处理较重,不宜过多)
- worker_pool='solo', # 使用单线程模式(避免GIL问题)
- # 任务配置
- task_track_started=True,
- task_time_limit=600, # 10分钟超时(文档处理较慢)
- task_soft_time_limit=540, # 9分钟软超时
- worker_max_tasks_per_child=5, # 每个worker进程最多处理5个任务后重启(防止内存泄漏)
- # 结果过期时间
- result_expires=3600, # 1小时后过期
- )
|