celery_manager.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. """
  2. Celery Worker 生命周期管理。
  3. """
  4. import os
  5. import sys
  6. import time
  7. import threading
  8. import subprocess
  9. import platform
  10. from foundation.observability.logger.loggering import server_logger
  11. from foundation.infrastructure.messaging.celery_app import app as celery_app
  12. from server.redis_cleanup import cleanup_redis_tasks
  13. def _kill_windows_child_processes(parent_pid: int):
  14. """终止 Windows 上指定父进程的所有子进程。用 PowerShell Get-CimInstance 替代已被移除的 wmic。"""
  15. try:
  16. result = subprocess.run(
  17. ['powershell', '-NoProfile', '-Command',
  18. f'Get-CimInstance Win32_Process -Filter "ParentProcessId={parent_pid}" | Select-Object -ExpandProperty ProcessId'],
  19. capture_output=True, text=True, timeout=15,
  20. )
  21. for line in result.stdout.strip().split('\n'):
  22. pid = line.strip()
  23. if pid and pid.isdigit():
  24. try:
  25. subprocess.run(['taskkill', '/F', '/PID', pid],
  26. capture_output=True, timeout=5)
  27. server_logger.info(f"已终止子进程 PID={pid}")
  28. except Exception:
  29. pass
  30. except FileNotFoundError:
  31. # PowerShell 也不可用时的最后兜底
  32. server_logger.warning("PowerShell不可用,无法终止子进程")
  33. except Exception as e:
  34. server_logger.warning(f"查找/终止子进程时出错: {e}")
  35. class CeleryWorkerManager:
  36. """Celery Worker 程序化管理器。"""
  37. def __init__(self, server_utils):
  38. self.worker = None
  39. self.is_running = False
  40. self.worker_thread = None
  41. self.shutdown_event = threading.Event()
  42. self.server_utils = server_utils
  43. self._worker_pid = None
  44. self._stop_lock = threading.Lock()
  45. def start_worker(self, **kwargs) -> bool:
  46. if self.is_running:
  47. server_logger.warning("Celery Worker已在运行")
  48. return True
  49. try:
  50. r = self.server_utils.get_redis_connection()
  51. cleanup_redis_tasks(r, "启动前")
  52. def run_celery_worker():
  53. try:
  54. from foundation.observability.logger.loggering import configure_logging_for_subprocess
  55. configure_logging_for_subprocess()
  56. self._worker_pid = os.getpid()
  57. worker_args = [
  58. 'worker', '-c', '4', '-P', 'prefork', '-l', 'info',
  59. '--without-heartbeat', '--without-gossip', '--without-mingle',
  60. ]
  61. if sys.platform == 'win32':
  62. os.environ['FORKED_BY_MULTIPROCESSING'] = '1'
  63. celery_app.worker_main(worker_args)
  64. except KeyboardInterrupt:
  65. server_logger.info("收到停止信号,Celery Worker退出")
  66. except Exception as e:
  67. server_logger.error(f"Celery Worker运行时出错: {e}")
  68. finally:
  69. self.is_running = False
  70. self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
  71. self.worker_thread.start()
  72. self.is_running = True
  73. time.sleep(2)
  74. success = self.is_running and self.worker_thread.is_alive()
  75. if success:
  76. server_logger.info("Celery Worker启动成功")
  77. else:
  78. server_logger.error("Celery Worker启动失败")
  79. self.is_running = False
  80. return success
  81. except ImportError as e:
  82. server_logger.error(f"导入Celery失败: {e}")
  83. return False
  84. except Exception as e:
  85. server_logger.error(f"启动Celery Worker失败: {e}")
  86. return False
  87. def stop_worker(self, timeout: int = 5) -> bool:
  88. if not self.is_running:
  89. return True
  90. if not self._stop_lock.acquire(blocking=False):
  91. server_logger.info("Worker 停止已在执行中,跳过重复调用")
  92. return True
  93. try:
  94. server_logger.info("停止Celery Worker...")
  95. self._stop_worker_processes()
  96. if self.worker_thread and self.worker_thread.is_alive():
  97. self.worker_thread.join(timeout=timeout)
  98. r = self.server_utils.get_redis_connection()
  99. cleanup_redis_tasks(r, "停止时")
  100. self.is_running = False
  101. self.shutdown_event.clear()
  102. return True
  103. except Exception as e:
  104. server_logger.error(f"停止Celery Worker失败: {e}")
  105. return False
  106. finally:
  107. self._stop_lock.release()
  108. def stop_worker_immediately(self) -> bool:
  109. if not self.is_running:
  110. return True
  111. if not self._stop_lock.acquire(blocking=False):
  112. server_logger.info("Worker 停止已在执行中,跳过重复调用")
  113. return True
  114. try:
  115. server_logger.info("立即停止Celery Worker...")
  116. self._stop_worker_processes()
  117. r = self.server_utils.get_redis_connection()
  118. cleanup_redis_tasks(r, "立即停止时")
  119. self.is_running = False
  120. self.shutdown_event.clear()
  121. return True
  122. except Exception as e:
  123. server_logger.error(f"立即停止Celery Worker失败: {e}")
  124. self.is_running = False
  125. return False
  126. finally:
  127. self._stop_lock.release()
  128. def _stop_worker_processes(self):
  129. """终止 Celery worker 及其所有子进程。"""
  130. self.shutdown_event.set()
  131. # 先尝试优雅关闭
  132. try:
  133. celery_app.control.shutdown()
  134. except Exception:
  135. pass
  136. time.sleep(0.5)
  137. # 强制终止本进程下的子进程
  138. self._kill_child_processes()
  139. @staticmethod
  140. def _kill_child_processes():
  141. """终止由当前进程 fork 出的所有 Celery 子进程。"""
  142. system = platform.system()
  143. current_pid = os.getpid()
  144. try:
  145. if system == "Windows":
  146. _kill_windows_child_processes(current_pid)
  147. else:
  148. try:
  149. subprocess.run(['pkill', '-P', str(current_pid), '-f', 'celery'],
  150. capture_output=True, timeout=5)
  151. except Exception:
  152. pass
  153. except Exception as e:
  154. server_logger.warning(f"终止子进程时出错: {e}")
  155. def get_status(self) -> dict:
  156. return {
  157. "is_running": self.is_running,
  158. "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
  159. }
  160. def __enter__(self):
  161. return self
  162. def __exit__(self, exc_type, exc_val, exc_tb):
  163. self.stop_worker()