celery_app.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. """
  2. Celery应用配置
  3. 负责任务队列管理,不涉及具体业务逻辑
  4. """
  5. import os
  6. from celery import Celery
  7. from .config import config_handler
  8. # 导入trace系统
  9. from foundation.trace.celery_trace import init
  10. # 从配置文件获取Redis连接信息
  11. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  12. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  13. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  14. # 构建Redis连接URL
  15. if redis_password:
  16. redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/0"
  17. else:
  18. redis_url = f"redis://{redis_host}:{redis_port}/0"
  19. print(f"Connecting to Redis: {redis_url}")
  20. app = Celery(
  21. 'workflow_tasks',
  22. broker=redis_url,
  23. backend=redis_url,
  24. include=['foundation.base.tasks']
  25. )
  26. # 配置
  27. app.conf.update(
  28. task_serializer='json',
  29. accept_content=['json'],
  30. result_serializer='json',
  31. timezone='Asia/Shanghai',
  32. enable_utc=True,
  33. # Worker配置
  34. worker_prefetch_multiplier=1, # 每个worker一次只取一个任务
  35. task_acks_late=True, # 任务完成后再确认
  36. # 并发控制
  37. worker_concurrency=2, # 每个worker进程数(文档处理较重,不宜过多)
  38. worker_pool='solo', # 使用单线程模式(避免GIL问题)
  39. # 任务配置
  40. task_track_started=True,
  41. task_time_limit=600, # 10分钟超时(文档处理较慢)
  42. task_soft_time_limit=540, # 9分钟软超时
  43. worker_max_tasks_per_child=5, # 每个worker进程最多处理5个任务后重启(防止内存泄漏)
  44. # 结果过期时间
  45. result_expires=3600, # 1小时后过期
  46. )
  47. # 初始化Celery trace系统
  48. init()