celery_app.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import logging
  2. import sys
  3. from urllib.parse import quote
  4. from celery import Celery
  5. from foundation.infrastructure.config.config import config_handler
  6. logging.getLogger("pymilvus").setLevel(logging.ERROR)
  7. def _redis_url() -> str:
  8. host = config_handler.get("redis", "REDIS_HOST", "localhost")
  9. port = config_handler.get("redis", "REDIS_PORT", "6379")
  10. password = config_handler.get("redis", "REDIS_PASSWORD", "")
  11. db = config_handler.get("redis", "REDIS_DB", "0")
  12. if password:
  13. return f"redis://:{quote(password, safe='')}@{host}:{port}/{db}"
  14. return f"redis://{host}:{port}/{db}"
  15. redis_url = _redis_url()
  16. app = Celery(
  17. "construction_write_tasks",
  18. broker=redis_url,
  19. backend=redis_url,
  20. include=["foundation.infrastructure.messaging.tasks"],
  21. )
  22. app.conf.update(
  23. task_serializer="json",
  24. accept_content=["json"],
  25. result_serializer="json",
  26. timezone="Asia/Shanghai",
  27. enable_utc=True,
  28. worker_pool="solo" if sys.platform == "win32" else "prefork",
  29. worker_concurrency=int(config_handler.get("construction_write", "MAX_CELERY_TASKS", "2")),
  30. worker_prefetch_multiplier=1,
  31. task_acks_late=True,
  32. task_track_started=True,
  33. task_time_limit=3600,
  34. task_soft_time_limit=3540,
  35. worker_max_tasks_per_child=5,
  36. result_expires=3600,
  37. broker_connection_timeout=30,
  38. broker_connection_retry=True,
  39. broker_connection_retry_on_startup=True,
  40. broker_connection_max_retries=10,
  41. broker_heartbeat=60,
  42. broker_transport_options={"visibility_timeout": 3600, "socket_keepalive": True},
  43. task_default_queue="construction_write",
  44. task_routes={
  45. "foundation.infrastructure.messaging.tasks.submit_outline_generation_task": {
  46. "queue": "construction_write"
  47. }
  48. },
  49. broker_pool_limit=None,
  50. result_backend_pool_limit=None,
  51. )
  52. # 初始化 Celery trace 系统(自动管理 trace_id 在 Celery 信号中的传递与恢复)
  53. from foundation.infrastructure.tracing.celery_trace import init
  54. init()