| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- """
- Celery Worker 生命周期管理。
- """
- import os
- import sys
- import time
- import threading
- import subprocess
- import platform
- from foundation.observability.logger.loggering import server_logger
- from foundation.infrastructure.messaging.celery_app import app as celery_app
- from server.redis_cleanup import cleanup_redis_tasks
- def _kill_windows_child_processes(parent_pid: int):
- """终止 Windows 上指定父进程的所有子进程。用 PowerShell Get-CimInstance 替代已被移除的 wmic。"""
- try:
- result = subprocess.run(
- ['powershell', '-NoProfile', '-Command',
- f'Get-CimInstance Win32_Process -Filter "ParentProcessId={parent_pid}" | Select-Object -ExpandProperty ProcessId'],
- capture_output=True, text=True, timeout=15,
- )
- for line in result.stdout.strip().split('\n'):
- pid = line.strip()
- if pid and pid.isdigit():
- try:
- subprocess.run(['taskkill', '/F', '/PID', pid],
- capture_output=True, timeout=5)
- server_logger.info(f"已终止子进程 PID={pid}")
- except Exception:
- pass
- except FileNotFoundError:
- # PowerShell 也不可用时的最后兜底
- server_logger.warning("PowerShell不可用,无法终止子进程")
- except Exception as e:
- server_logger.warning(f"查找/终止子进程时出错: {e}")
- class CeleryWorkerManager:
- """Celery Worker 程序化管理器。"""
- def __init__(self, server_utils):
- self.worker = None
- self.is_running = False
- self.worker_thread = None
- self.shutdown_event = threading.Event()
- self.server_utils = server_utils
- self._worker_pid = None
- self._stop_lock = threading.Lock()
- def start_worker(self, **kwargs) -> bool:
- if self.is_running:
- server_logger.warning("Celery Worker已在运行")
- return True
- try:
- r = self.server_utils.get_redis_connection()
- cleanup_redis_tasks(r, "启动前")
- def run_celery_worker():
- try:
- from foundation.observability.logger.loggering import configure_logging_for_subprocess
- configure_logging_for_subprocess()
- self._worker_pid = os.getpid()
- worker_args = [
- 'worker', '-c', '4', '-P', 'prefork', '-l', 'info',
- '--without-heartbeat', '--without-gossip', '--without-mingle',
- ]
- if sys.platform == 'win32':
- os.environ['FORKED_BY_MULTIPROCESSING'] = '1'
- celery_app.worker_main(worker_args)
- except KeyboardInterrupt:
- server_logger.info("收到停止信号,Celery Worker退出")
- except Exception as e:
- server_logger.error(f"Celery Worker运行时出错: {e}")
- finally:
- self.is_running = False
- self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
- self.worker_thread.start()
- self.is_running = True
- time.sleep(2)
- success = self.is_running and self.worker_thread.is_alive()
- if success:
- server_logger.info("Celery Worker启动成功")
- else:
- server_logger.error("Celery Worker启动失败")
- self.is_running = False
- return success
- except ImportError as e:
- server_logger.error(f"导入Celery失败: {e}")
- return False
- except Exception as e:
- server_logger.error(f"启动Celery Worker失败: {e}")
- return False
- def stop_worker(self, timeout: int = 5) -> bool:
- if not self.is_running:
- return True
- if not self._stop_lock.acquire(blocking=False):
- server_logger.info("Worker 停止已在执行中,跳过重复调用")
- return True
- try:
- server_logger.info("停止Celery Worker...")
- self._stop_worker_processes()
- if self.worker_thread and self.worker_thread.is_alive():
- self.worker_thread.join(timeout=timeout)
- r = self.server_utils.get_redis_connection()
- cleanup_redis_tasks(r, "停止时")
- self.is_running = False
- self.shutdown_event.clear()
- return True
- except Exception as e:
- server_logger.error(f"停止Celery Worker失败: {e}")
- return False
- finally:
- self._stop_lock.release()
- def stop_worker_immediately(self) -> bool:
- if not self.is_running:
- return True
- if not self._stop_lock.acquire(blocking=False):
- server_logger.info("Worker 停止已在执行中,跳过重复调用")
- return True
- try:
- server_logger.info("立即停止Celery Worker...")
- self._stop_worker_processes()
- r = self.server_utils.get_redis_connection()
- cleanup_redis_tasks(r, "立即停止时")
- self.is_running = False
- self.shutdown_event.clear()
- return True
- except Exception as e:
- server_logger.error(f"立即停止Celery Worker失败: {e}")
- self.is_running = False
- return False
- finally:
- self._stop_lock.release()
- def _stop_worker_processes(self):
- """终止 Celery worker 及其所有子进程。"""
- self.shutdown_event.set()
- # 先尝试优雅关闭
- try:
- celery_app.control.shutdown()
- except Exception:
- pass
- time.sleep(0.5)
- # 强制终止本进程下的子进程
- self._kill_child_processes()
- @staticmethod
- def _kill_child_processes():
- """终止由当前进程 fork 出的所有 Celery 子进程。"""
- system = platform.system()
- current_pid = os.getpid()
- try:
- if system == "Windows":
- _kill_windows_child_processes(current_pid)
- else:
- try:
- subprocess.run(['pkill', '-P', str(current_pid), '-f', 'celery'],
- capture_output=True, timeout=5)
- except Exception:
- pass
- except Exception as e:
- server_logger.warning(f"终止子进程时出错: {e}")
- def get_status(self) -> dict:
- return {
- "is_running": self.is_running,
- "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
- }
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.stop_worker()
|