celery_app.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. """
  2. Celery应用配置
  3. 负责任务队列管理,不涉及具体业务逻辑
  4. """
  5. import os
  6. from celery import Celery
  7. from .config import config_handler
  8. # 从配置文件获取Redis连接信息
  9. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  10. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  11. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  12. # 构建Redis连接URL
  13. if redis_password:
  14. redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/0"
  15. else:
  16. redis_url = f"redis://{redis_host}:{redis_port}/0"
  17. print(f"Connecting to Redis: {redis_url}")
  18. app = Celery(
  19. 'workflow_tasks',
  20. broker=redis_url,
  21. backend=redis_url,
  22. include=['foundation.base.tasks']
  23. )
  24. # 配置
  25. app.conf.update(
  26. task_serializer='json',
  27. accept_content=['json'],
  28. result_serializer='json',
  29. timezone='Asia/Shanghai',
  30. enable_utc=True,
  31. # Worker配置
  32. worker_prefetch_multiplier=1, # 每个worker一次只取一个任务
  33. task_acks_late=True, # 任务完成后再确认
  34. # 并发控制
  35. worker_concurrency=2, # 每个worker进程数(文档处理较重,不宜过多)
  36. worker_pool='solo', # 使用单线程模式(避免GIL问题)
  37. # 任务配置
  38. task_track_started=True,
  39. task_time_limit=600, # 10分钟超时(文档处理较慢)
  40. task_soft_time_limit=540, # 9分钟软超时
  41. worker_max_tasks_per_child=5, # 每个worker进程最多处理5个任务后重启(防止内存泄漏)
  42. # 结果过期时间
  43. result_expires=3600, # 1小时后过期
  44. )