celery_manager.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. """
  2. Celery Worker 生命周期管理。
  3. """
  4. import os
  5. import sys
  6. import time
  7. import threading
  8. from foundation.observability.logger.loggering import server_logger
  9. from foundation.infrastructure.messaging.celery_app import app as celery_app
  10. from server.redis_cleanup import cleanup_redis_tasks
  11. class CeleryWorkerManager:
  12. """Celery Worker 程序化管理器。"""
  13. def __init__(self, server_utils):
  14. self.worker = None
  15. self.is_running = False
  16. self.worker_thread = None
  17. self.shutdown_event = threading.Event()
  18. self.server_utils = server_utils
  19. def start_worker(self, **kwargs) -> bool:
  20. if self.is_running:
  21. server_logger.warning("Celery Worker已在运行")
  22. return True
  23. try:
  24. r = self.server_utils.get_redis_connection()
  25. cleanup_redis_tasks(r, "启动前")
  26. def run_celery_worker():
  27. try:
  28. from foundation.observability.logger.loggering import configure_logging_for_subprocess
  29. configure_logging_for_subprocess()
  30. worker_args = [
  31. 'worker', '-c', '4', '-P', 'prefork', '-l', 'info',
  32. '--without-heartbeat', '--without-gossip', '--without-mingle',
  33. ]
  34. if sys.platform == 'win32':
  35. os.environ['FORKED_BY_MULTIPROCESSING'] = '1'
  36. celery_app.worker_main(worker_args)
  37. except KeyboardInterrupt:
  38. server_logger.info("收到停止信号,Celery Worker退出")
  39. except Exception as e:
  40. server_logger.error(f"Celery Worker运行时出错: {e}")
  41. finally:
  42. self.is_running = False
  43. self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
  44. self.worker_thread.start()
  45. self.is_running = True
  46. time.sleep(2)
  47. success = self.is_running and self.worker_thread.is_alive()
  48. if success:
  49. server_logger.info("Celery Worker启动成功")
  50. else:
  51. server_logger.error("Celery Worker启动失败")
  52. self.is_running = False
  53. return success
  54. except ImportError as e:
  55. server_logger.error(f"导入Celery失败: {e}")
  56. return False
  57. except Exception as e:
  58. server_logger.error(f"启动Celery Worker失败: {e}")
  59. return False
  60. def stop_worker(self, timeout: int = 5) -> bool:
  61. if not self.is_running:
  62. return True
  63. try:
  64. server_logger.info("停止Celery Worker...")
  65. self.shutdown_event.set()
  66. if self.worker_thread and self.worker_thread.is_alive():
  67. start_time = time.time()
  68. while self.is_running and (time.time() - start_time) < timeout:
  69. time.sleep(0.1)
  70. r = self.server_utils.get_redis_connection()
  71. cleanup_redis_tasks(r, "停止时")
  72. self.is_running = False
  73. self.shutdown_event.clear()
  74. return True
  75. except Exception as e:
  76. server_logger.error(f"停止Celery Worker失败: {e}")
  77. return False
  78. def stop_worker_immediately(self) -> bool:
  79. if not self.is_running:
  80. return True
  81. try:
  82. server_logger.info("立即停止Celery Worker...")
  83. self.shutdown_event.set()
  84. r = self.server_utils.get_redis_connection()
  85. cleanup_redis_tasks(r, "立即停止时")
  86. self.is_running = False
  87. self.shutdown_event.clear()
  88. return True
  89. except Exception as e:
  90. server_logger.error(f"立即停止Celery Worker失败: {e}")
  91. self.is_running = False
  92. return False
  93. def get_status(self) -> dict:
  94. return {
  95. "is_running": self.is_running,
  96. "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
  97. }
  98. def __enter__(self):
  99. return self
  100. def __exit__(self, exc_type, exc_val, exc_tb):
  101. self.stop_worker()