""" Celery Worker 生命周期管理。 """ import os import sys import time import threading import subprocess import platform from foundation.observability.logger.loggering import server_logger from foundation.infrastructure.messaging.celery_app import app as celery_app from server.redis_cleanup import cleanup_redis_tasks def _kill_windows_child_processes(parent_pid: int): """终止 Windows 上指定父进程的所有子进程。用 PowerShell Get-CimInstance 替代已被移除的 wmic。""" try: result = subprocess.run( ['powershell', '-NoProfile', '-Command', f'Get-CimInstance Win32_Process -Filter "ParentProcessId={parent_pid}" | Select-Object -ExpandProperty ProcessId'], capture_output=True, text=True, timeout=15, ) for line in result.stdout.strip().split('\n'): pid = line.strip() if pid and pid.isdigit(): try: subprocess.run(['taskkill', '/F', '/PID', pid], capture_output=True, timeout=5) server_logger.info(f"已终止子进程 PID={pid}") except Exception: pass except FileNotFoundError: # PowerShell 也不可用时的最后兜底 server_logger.warning("PowerShell不可用,无法终止子进程") except Exception as e: server_logger.warning(f"查找/终止子进程时出错: {e}") class CeleryWorkerManager: """Celery Worker 程序化管理器。""" def __init__(self, server_utils): self.worker = None self.is_running = False self.worker_thread = None self.shutdown_event = threading.Event() self.server_utils = server_utils self._worker_pid = None self._stop_lock = threading.Lock() def start_worker(self, **kwargs) -> bool: if self.is_running: server_logger.warning("Celery Worker已在运行") return True try: r = self.server_utils.get_redis_connection() cleanup_redis_tasks(r, "启动前") def run_celery_worker(): try: from foundation.observability.logger.loggering import configure_logging_for_subprocess configure_logging_for_subprocess() self._worker_pid = os.getpid() worker_args = [ 'worker', '-c', '4', '-P', 'prefork', '-l', 'info', '--without-heartbeat', '--without-gossip', '--without-mingle', ] if sys.platform == 'win32': os.environ['FORKED_BY_MULTIPROCESSING'] = '1' celery_app.worker_main(worker_args) except KeyboardInterrupt: server_logger.info("收到停止信号,Celery Worker退出") except Exception as e: server_logger.error(f"Celery Worker运行时出错: {e}") finally: self.is_running = False self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True) self.worker_thread.start() self.is_running = True time.sleep(2) success = self.is_running and self.worker_thread.is_alive() if success: server_logger.info("Celery Worker启动成功") else: server_logger.error("Celery Worker启动失败") self.is_running = False return success except ImportError as e: server_logger.error(f"导入Celery失败: {e}") return False except Exception as e: server_logger.error(f"启动Celery Worker失败: {e}") return False def stop_worker(self, timeout: int = 5) -> bool: if not self.is_running: return True if not self._stop_lock.acquire(blocking=False): server_logger.info("Worker 停止已在执行中,跳过重复调用") return True try: server_logger.info("停止Celery Worker...") self._stop_worker_processes() if self.worker_thread and self.worker_thread.is_alive(): self.worker_thread.join(timeout=timeout) r = self.server_utils.get_redis_connection() cleanup_redis_tasks(r, "停止时") self.is_running = False self.shutdown_event.clear() return True except Exception as e: server_logger.error(f"停止Celery Worker失败: {e}") return False finally: self._stop_lock.release() def stop_worker_immediately(self) -> bool: if not self.is_running: return True if not self._stop_lock.acquire(blocking=False): server_logger.info("Worker 停止已在执行中,跳过重复调用") return True try: server_logger.info("立即停止Celery Worker...") self._stop_worker_processes() r = self.server_utils.get_redis_connection() cleanup_redis_tasks(r, "立即停止时") self.is_running = False self.shutdown_event.clear() return True except Exception as e: server_logger.error(f"立即停止Celery Worker失败: {e}") self.is_running = False return False finally: self._stop_lock.release() def _stop_worker_processes(self): """终止 Celery worker 及其所有子进程。""" self.shutdown_event.set() # 先尝试优雅关闭 try: celery_app.control.shutdown() except Exception: pass time.sleep(0.5) # 强制终止本进程下的子进程 self._kill_child_processes() @staticmethod def _kill_child_processes(): """终止由当前进程 fork 出的所有 Celery 子进程。""" system = platform.system() current_pid = os.getpid() try: if system == "Windows": _kill_windows_child_processes(current_pid) else: try: subprocess.run(['pkill', '-P', str(current_pid), '-f', 'celery'], capture_output=True, timeout=5) except Exception: pass except Exception as e: server_logger.warning(f"终止子进程时出错: {e}") def get_status(self) -> dict: return { "is_running": self.is_running, "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False, } def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop_worker()