| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- """
- Celery Worker 生命周期管理。
- """
- import os
- import sys
- import time
- import threading
- from foundation.observability.logger.loggering import server_logger
- from foundation.infrastructure.messaging.celery_app import app as celery_app
- from server.redis_cleanup import cleanup_redis_tasks
- class CeleryWorkerManager:
- """Celery Worker 程序化管理器。"""
- def __init__(self, server_utils):
- self.worker = None
- self.is_running = False
- self.worker_thread = None
- self.shutdown_event = threading.Event()
- self.server_utils = server_utils
- def start_worker(self, **kwargs) -> bool:
- if self.is_running:
- server_logger.warning("Celery Worker已在运行")
- return True
- try:
- r = self.server_utils.get_redis_connection()
- cleanup_redis_tasks(r, "启动前")
- def run_celery_worker():
- try:
- from foundation.observability.logger.loggering import configure_logging_for_subprocess
- configure_logging_for_subprocess()
- worker_args = [
- 'worker', '-c', '4', '-P', 'prefork', '-l', 'info',
- '--without-heartbeat', '--without-gossip', '--without-mingle',
- ]
- if sys.platform == 'win32':
- os.environ['FORKED_BY_MULTIPROCESSING'] = '1'
- celery_app.worker_main(worker_args)
- except KeyboardInterrupt:
- server_logger.info("收到停止信号,Celery Worker退出")
- except Exception as e:
- server_logger.error(f"Celery Worker运行时出错: {e}")
- finally:
- self.is_running = False
- self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
- self.worker_thread.start()
- self.is_running = True
- time.sleep(2)
- success = self.is_running and self.worker_thread.is_alive()
- if success:
- server_logger.info("Celery Worker启动成功")
- else:
- server_logger.error("Celery Worker启动失败")
- self.is_running = False
- return success
- except ImportError as e:
- server_logger.error(f"导入Celery失败: {e}")
- return False
- except Exception as e:
- server_logger.error(f"启动Celery Worker失败: {e}")
- return False
- def stop_worker(self, timeout: int = 5) -> bool:
- if not self.is_running:
- return True
- try:
- server_logger.info("停止Celery Worker...")
- self.shutdown_event.set()
- if self.worker_thread and self.worker_thread.is_alive():
- start_time = time.time()
- while self.is_running and (time.time() - start_time) < timeout:
- time.sleep(0.1)
- r = self.server_utils.get_redis_connection()
- cleanup_redis_tasks(r, "停止时")
- self.is_running = False
- self.shutdown_event.clear()
- return True
- except Exception as e:
- server_logger.error(f"停止Celery Worker失败: {e}")
- return False
- def stop_worker_immediately(self) -> bool:
- if not self.is_running:
- return True
- try:
- server_logger.info("立即停止Celery Worker...")
- self.shutdown_event.set()
- r = self.server_utils.get_redis_connection()
- cleanup_redis_tasks(r, "立即停止时")
- self.is_running = False
- self.shutdown_event.clear()
- return True
- except Exception as e:
- server_logger.error(f"立即停止Celery Worker失败: {e}")
- self.is_running = False
- return False
- def get_status(self) -> dict:
- return {
- "is_running": self.is_running,
- "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
- }
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.stop_worker()
|