| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- import logging
- import sys
- from urllib.parse import quote
- from celery import Celery
- from foundation.infrastructure.config.config import config_handler
- logging.getLogger("pymilvus").setLevel(logging.ERROR)
- def _redis_url() -> str:
- host = config_handler.get("redis", "REDIS_HOST", "localhost")
- port = config_handler.get("redis", "REDIS_PORT", "6379")
- password = config_handler.get("redis", "REDIS_PASSWORD", "")
- db = config_handler.get("redis", "REDIS_DB", "0")
- if password:
- return f"redis://:{quote(password, safe='')}@{host}:{port}/{db}"
- return f"redis://{host}:{port}/{db}"
- redis_url = _redis_url()
- app = Celery(
- "construction_write_tasks",
- broker=redis_url,
- backend=redis_url,
- include=["foundation.infrastructure.messaging.tasks"],
- )
- app.conf.update(
- task_serializer="json",
- accept_content=["json"],
- result_serializer="json",
- timezone="Asia/Shanghai",
- enable_utc=True,
- worker_pool="solo" if sys.platform == "win32" else "prefork",
- worker_concurrency=int(config_handler.get("construction_write", "MAX_CELERY_TASKS", "2")),
- worker_prefetch_multiplier=1,
- task_acks_late=True,
- task_track_started=True,
- task_time_limit=3600,
- task_soft_time_limit=3540,
- worker_max_tasks_per_child=5,
- result_expires=3600,
- broker_connection_timeout=30,
- broker_connection_retry=True,
- broker_connection_retry_on_startup=True,
- broker_connection_max_retries=10,
- broker_heartbeat=60,
- broker_transport_options={"visibility_timeout": 3600, "socket_keepalive": True},
- task_default_queue="construction_write",
- task_routes={
- "foundation.infrastructure.messaging.tasks.submit_outline_generation_task": {
- "queue": "construction_write"
- }
- },
- broker_pool_limit=None,
- result_backend_pool_limit=None,
- )
- # 初始化 Celery trace 系统(自动管理 trace_id 在 Celery 信号中的传递与恢复)
- from foundation.infrastructure.tracing.celery_trace import init
- init()
|