""" Celery Worker 生命周期管理。 """ import os import sys import time import threading from foundation.observability.logger.loggering import server_logger from foundation.infrastructure.messaging.celery_app import app as celery_app from server.redis_cleanup import cleanup_redis_tasks class CeleryWorkerManager: """Celery Worker 程序化管理器。""" def __init__(self, server_utils): self.worker = None self.is_running = False self.worker_thread = None self.shutdown_event = threading.Event() self.server_utils = server_utils def start_worker(self, **kwargs) -> bool: if self.is_running: server_logger.warning("Celery Worker已在运行") return True try: r = self.server_utils.get_redis_connection() cleanup_redis_tasks(r, "启动前") def run_celery_worker(): try: from foundation.observability.logger.loggering import configure_logging_for_subprocess configure_logging_for_subprocess() worker_args = [ 'worker', '-c', '4', '-P', 'prefork', '-l', 'info', '--without-heartbeat', '--without-gossip', '--without-mingle', ] if sys.platform == 'win32': os.environ['FORKED_BY_MULTIPROCESSING'] = '1' celery_app.worker_main(worker_args) except KeyboardInterrupt: server_logger.info("收到停止信号,Celery Worker退出") except Exception as e: server_logger.error(f"Celery Worker运行时出错: {e}") finally: self.is_running = False self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True) self.worker_thread.start() self.is_running = True time.sleep(2) success = self.is_running and self.worker_thread.is_alive() if success: server_logger.info("Celery Worker启动成功") else: server_logger.error("Celery Worker启动失败") self.is_running = False return success except ImportError as e: server_logger.error(f"导入Celery失败: {e}") return False except Exception as e: server_logger.error(f"启动Celery Worker失败: {e}") return False def stop_worker(self, timeout: int = 5) -> bool: if not self.is_running: return True try: server_logger.info("停止Celery Worker...") self.shutdown_event.set() if self.worker_thread and self.worker_thread.is_alive(): start_time = time.time() while self.is_running and (time.time() - start_time) < timeout: time.sleep(0.1) r = self.server_utils.get_redis_connection() cleanup_redis_tasks(r, "停止时") self.is_running = False self.shutdown_event.clear() return True except Exception as e: server_logger.error(f"停止Celery Worker失败: {e}") return False def stop_worker_immediately(self) -> bool: if not self.is_running: return True try: server_logger.info("立即停止Celery Worker...") self.shutdown_event.set() r = self.server_utils.get_redis_connection() cleanup_redis_tasks(r, "立即停止时") self.is_running = False self.shutdown_event.clear() return True except Exception as e: server_logger.error(f"立即停止Celery Worker失败: {e}") self.is_running = False return False def get_status(self) -> dict: return { "is_running": self.is_running, "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False, } def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop_worker()