import logging import sys from urllib.parse import quote from celery import Celery from foundation.infrastructure.config.config import config_handler logging.getLogger("pymilvus").setLevel(logging.ERROR) def _redis_url() -> str: host = config_handler.get("redis", "REDIS_HOST", "localhost") port = config_handler.get("redis", "REDIS_PORT", "6379") password = config_handler.get("redis", "REDIS_PASSWORD", "") db = config_handler.get("redis", "REDIS_DB", "0") if password: return f"redis://:{quote(password, safe='')}@{host}:{port}/{db}" return f"redis://{host}:{port}/{db}" redis_url = _redis_url() app = Celery( "construction_write_tasks", broker=redis_url, backend=redis_url, include=["foundation.infrastructure.messaging.tasks"], ) app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="Asia/Shanghai", enable_utc=True, worker_pool="solo" if sys.platform == "win32" else "prefork", worker_concurrency=int(config_handler.get("construction_write", "MAX_CELERY_TASKS", "2")), worker_prefetch_multiplier=1, task_acks_late=True, task_track_started=True, task_time_limit=3600, task_soft_time_limit=3540, worker_max_tasks_per_child=5, result_expires=3600, broker_connection_timeout=30, broker_connection_retry=True, broker_connection_retry_on_startup=True, broker_connection_max_retries=10, broker_heartbeat=60, broker_transport_options={"visibility_timeout": 3600, "socket_keepalive": True}, task_default_queue="construction_write", task_routes={ "foundation.infrastructure.messaging.tasks.submit_outline_generation_task": { "queue": "construction_write" } }, broker_pool_limit=None, result_backend_pool_limit=None, ) # 初始化 Celery trace 系统(自动管理 trace_id 在 Celery 信号中的传递与恢复) from foundation.infrastructure.tracing.celery_trace import init init()