Przeglądaj źródła

feat: 增强 Celery 任务管理与服务优雅关闭

优化 Celery Worker 生命周期管理,增强 Redis 残留任务清理,改进 Windows 信号处理。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
WangXuMing 1 tydzień temu
rodzic
commit
0324bb5a63
4 zmienionych plików z 104 dodań i 17 usunięć
  1. 73 5
      server/celery_manager.py
  2. 10 5
      server/redis_cleanup.py
  3. 0 2
      server/runner.py
  4. 21 5
      server/signals.py

+ 73 - 5
server/celery_manager.py

@@ -5,11 +5,37 @@ import os
 import sys
 import time
 import threading
+import subprocess
+import platform
 from foundation.observability.logger.loggering import server_logger
 from foundation.infrastructure.messaging.celery_app import app as celery_app
 from server.redis_cleanup import cleanup_redis_tasks
 
 
+def _kill_windows_child_processes(parent_pid: int):
+    """终止 Windows 上指定父进程的所有子进程。用 PowerShell Get-CimInstance 替代已被移除的 wmic。"""
+    try:
+        result = subprocess.run(
+            ['powershell', '-NoProfile', '-Command',
+             f'Get-CimInstance Win32_Process -Filter "ParentProcessId={parent_pid}" | Select-Object -ExpandProperty ProcessId'],
+            capture_output=True, text=True, timeout=15,
+        )
+        for line in result.stdout.strip().split('\n'):
+            pid = line.strip()
+            if pid and pid.isdigit():
+                try:
+                    subprocess.run(['taskkill', '/F', '/PID', pid],
+                                   capture_output=True, timeout=5)
+                    server_logger.info(f"已终止子进程 PID={pid}")
+                except Exception:
+                    pass
+    except FileNotFoundError:
+        # PowerShell 也不可用时的最后兜底
+        server_logger.warning("PowerShell不可用,无法终止子进程")
+    except Exception as e:
+        server_logger.warning(f"查找/终止子进程时出错: {e}")
+
+
 class CeleryWorkerManager:
     """Celery Worker 程序化管理器。"""
 
@@ -19,6 +45,8 @@ class CeleryWorkerManager:
         self.worker_thread = None
         self.shutdown_event = threading.Event()
         self.server_utils = server_utils
+        self._worker_pid = None
+        self._stop_lock = threading.Lock()
 
     def start_worker(self, **kwargs) -> bool:
         if self.is_running:
@@ -34,6 +62,7 @@ class CeleryWorkerManager:
                     from foundation.observability.logger.loggering import configure_logging_for_subprocess
                     configure_logging_for_subprocess()
 
+                    self._worker_pid = os.getpid()
                     worker_args = [
                         'worker', '-c', '4', '-P', 'prefork', '-l', 'info',
                         '--without-heartbeat', '--without-gossip', '--without-mingle',
@@ -71,15 +100,16 @@ class CeleryWorkerManager:
     def stop_worker(self, timeout: int = 5) -> bool:
         if not self.is_running:
             return True
+        if not self._stop_lock.acquire(blocking=False):
+            server_logger.info("Worker 停止已在执行中,跳过重复调用")
+            return True
 
         try:
             server_logger.info("停止Celery Worker...")
-            self.shutdown_event.set()
+            self._stop_worker_processes()
 
             if self.worker_thread and self.worker_thread.is_alive():
-                start_time = time.time()
-                while self.is_running and (time.time() - start_time) < timeout:
-                    time.sleep(0.1)
+                self.worker_thread.join(timeout=timeout)
 
             r = self.server_utils.get_redis_connection()
             cleanup_redis_tasks(r, "停止时")
@@ -90,14 +120,19 @@ class CeleryWorkerManager:
         except Exception as e:
             server_logger.error(f"停止Celery Worker失败: {e}")
             return False
+        finally:
+            self._stop_lock.release()
 
     def stop_worker_immediately(self) -> bool:
         if not self.is_running:
             return True
+        if not self._stop_lock.acquire(blocking=False):
+            server_logger.info("Worker 停止已在执行中,跳过重复调用")
+            return True
 
         try:
             server_logger.info("立即停止Celery Worker...")
-            self.shutdown_event.set()
+            self._stop_worker_processes()
 
             r = self.server_utils.get_redis_connection()
             cleanup_redis_tasks(r, "立即停止时")
@@ -109,6 +144,39 @@ class CeleryWorkerManager:
             server_logger.error(f"立即停止Celery Worker失败: {e}")
             self.is_running = False
             return False
+        finally:
+            self._stop_lock.release()
+
+    def _stop_worker_processes(self):
+        """终止 Celery worker 及其所有子进程。"""
+        self.shutdown_event.set()
+
+        # 先尝试优雅关闭
+        try:
+            celery_app.control.shutdown()
+        except Exception:
+            pass
+        time.sleep(0.5)
+
+        # 强制终止本进程下的子进程
+        self._kill_child_processes()
+
+    @staticmethod
+    def _kill_child_processes():
+        """终止由当前进程 fork 出的所有 Celery 子进程。"""
+        system = platform.system()
+        current_pid = os.getpid()
+        try:
+            if system == "Windows":
+                _kill_windows_child_processes(current_pid)
+            else:
+                try:
+                    subprocess.run(['pkill', '-P', str(current_pid), '-f', 'celery'],
+                                   capture_output=True, timeout=5)
+                except Exception:
+                    pass
+        except Exception as e:
+            server_logger.warning(f"终止子进程时出错: {e}")
 
     def get_status(self) -> dict:
         return {

+ 10 - 5
server/redis_cleanup.py

@@ -63,13 +63,18 @@ def _kill_existing_celery_workers():
     system = platform.system()
     try:
         if system == "Windows":
+            # PowerShell Get-CimInstance 查找命令行包含 celery 的 python 进程并终止
             result = subprocess.run(
-                ['tasklist', '/FI', 'IMAGENAME eq python.exe', '/FO', 'CSV'],
-                capture_output=True, text=True,
+                ['powershell', '-NoProfile', '-Command',
+                 'Get-CimInstance Win32_Process -Filter "name=\'python.exe\'" | Where-Object {$_.CommandLine -like \'*celery*\'} | Select-Object -ExpandProperty ProcessId'],
+                capture_output=True, text=True, timeout=15,
             )
-            if 'celery' in result.stdout.lower():
-                subprocess.run(['taskkill', '/F', '/IM', 'celery.exe'], capture_output=True)
-                server_logger.info("已终止现有的Celery Worker进程")
+            for line in result.stdout.strip().split('\n'):
+                pid = line.strip()
+                if pid and pid.isdigit():
+                    subprocess.run(['taskkill', '/F', '/PID', pid],
+                                   capture_output=True, timeout=5)
+                    server_logger.info(f"已终止Celery Worker进程 PID={pid}")
         else:
             subprocess.run(['pkill', '-f', 'celery worker'], capture_output=True)
             server_logger.info("已终止现有的Celery Worker进程")

+ 0 - 2
server/runner.py

@@ -1,7 +1,6 @@
 """
 服务器运行器:整合所有组件并启动 uvicorn。
 """
-import atexit
 import uvicorn
 from foundation.observability.logger.loggering import server_logger
 from server.signals import setup_signal_handlers
@@ -25,7 +24,6 @@ class ServerRunner:
 
         if with_celery:
             self.celery_manager.start_worker()
-            atexit.register(self.celery_manager.stop_worker_immediately)
             setup_signal_handlers(lambda: self.celery_manager.stop_worker_immediately())
 
         app = self.app_factory.create_app()

+ 21 - 5
server/signals.py

@@ -2,17 +2,30 @@
 操作系统信号和 Windows 控制台事件处理。
 """
 import sys
+import threading
 import signal
 from foundation.observability.logger.loggering import server_logger
 
+_shutdown_lock = threading.Lock()
+_shutdown_called = False
+
+
+def _run_shutdown_once(shutdown_callback):
+    """确保 shutdown 回调在整个进程生命周期内只执行一次。"""
+    global _shutdown_called
+    with _shutdown_lock:
+        if _shutdown_called:
+            return
+        _shutdown_called = True
+    shutdown_callback()
+
 
 def setup_signal_handlers(shutdown_callback):
-    """注册信号处理器,shutdown_callback 会在收到终止信号时调用。"""
+    """注册信号处理器,shutdown_callback 会在收到终止信号时调用(通过 _run_shutdown_once 确保只执行一次)。"""
 
     def handler(signum, frame):
         server_logger.info(f"收到信号 {signum},正在停止服务...")
-        shutdown_callback()
-        sys.exit(0)
+        _run_shutdown_once(shutdown_callback)
 
     try:
         signal.signal(signal.SIGINT, handler)
@@ -34,8 +47,11 @@ def _setup_windows_handler(shutdown_callback):
             CTRL_SHUTDOWN_EVENT = 6
             if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
                 server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
-                shutdown_callback()
-                sys.exit(0)
+                # 将耗时 shutdown 操作放到后台线程,避免阻塞 console handler 超时
+                threading.Thread(
+                    target=lambda: _run_shutdown_once(shutdown_callback),
+                    daemon=True,
+                ).start()
             return False
         win32api.SetConsoleCtrlHandler(win32_handler, True)
     except (ImportError, AttributeError) as e: