celery_app.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. """
  2. Celery应用配置
  3. 负责任务队列管理,不涉及具体业务逻辑
  4. 平台适配说明:
  5. - Windows开发环境: 使用 solo 池(避免 BrokenPipeError)
  6. - Linux生产环境: 使用 prefork 池(多进程高性能)
  7. """
  8. import os
  9. import sys
  10. import logging
  11. # 抑制 pymilvus 的 AsyncMilvusClient 警告(在多进程环境中没有事件循环)
  12. logging.getLogger('pymilvus').setLevel(logging.ERROR)
  13. from celery import Celery
  14. from foundation.infrastructure.config.config import config_handler
  15. # 导入trace系统
  16. from foundation.infrastructure.tracing.celery_trace import init
  17. # 从配置文件获取Redis连接信息
  18. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  19. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  20. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  21. redis_db = config_handler.get('redis', 'REDIS_DB', '0')
  22. # 构建Redis连接URL
  23. if redis_password:
  24. redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
  25. else:
  26. redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}"
  27. print(f"Connecting to Redis: {redis_url}")
  28. app = Celery(
  29. 'workflow_tasks',
  30. broker=redis_url,
  31. backend=redis_url,
  32. include=['foundation.infrastructure.messaging.tasks']
  33. )
  34. # 配置
  35. app.conf.update(
  36. task_serializer='json',
  37. accept_content=['json'],
  38. result_serializer='json',
  39. timezone='Asia/Shanghai',
  40. enable_utc=True,
  41. # Worker配置
  42. worker_prefetch_multiplier=1, # 每个worker一次只取一个任务
  43. task_acks_late=True, # 任务完成后再确认
  44. # 并发控制 - 根据平台自动适配
  45. # Windows开发环境: 使用 solo 池(单进程,避免 BrokenPipeError)
  46. # Linux生产环境: 使用 prefork 池(多进程高性能)
  47. worker_pool='solo' if sys.platform == 'win32' else 'prefork',
  48. worker_concurrency=1 if sys.platform == 'win32' else 4,
  49. # 网络和连接配置 - 防止30分钟断连
  50. broker_connection_timeout=30, # 连接超时30秒
  51. broker_connection_retry=True, # 启用连接重试
  52. broker_connection_retry_on_startup=True, # 启动时重试
  53. broker_connection_max_retries=10, # 最大重试次数
  54. broker_heartbeat=60, # 心跳间隔60秒(默认是30秒的2倍)
  55. broker_transport_options={
  56. 'visibility_timeout': 3600, # 任务可见性超时
  57. 'socket_keepalive': True, # 启用socket keepalive
  58. },
  59. # 任务配置
  60. task_track_started=True,
  61. task_time_limit=600, # 10分钟超时(文档处理较慢)
  62. task_soft_time_limit=540, # 9分钟软超时
  63. worker_max_tasks_per_child=5, # 每个worker进程最多处理5个任务后重启(防止内存泄漏)
  64. # 结果过期时间
  65. result_expires=3600, # 1小时后过期
  66. # 连接池配置
  67. broker_pool_limit=None, # 无连接池限制
  68. result_backend_pool_limit=None, # 无结果后端连接池限制
  69. )
  70. # 初始化Celery trace系统
  71. init()