redis_cleanup.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. """
  2. Redis 残留任务清理。合并了原来散落在 app.py 中 3 处的清理逻辑。
  3. """
  4. import time
  5. import subprocess
  6. import platform
  7. from foundation.observability.logger.loggering import server_logger
  8. def cleanup_redis_tasks(redis_client, phase: str = "启动前"):
  9. """清理 Redis 中的 Celery 残留任务。
  10. Args:
  11. redis_client: Redis 连接对象
  12. phase: "启动前" | "停止时" — 控制是否杀死现有 worker
  13. """
  14. try:
  15. server_logger.info(f"{phase}清理Redis中的Celery任务...")
  16. if phase == "启动前":
  17. _kill_existing_celery_workers()
  18. patterns = [
  19. 'task:*', 'celery-task-meta-*', 'current:*', '_kombu.binding.*',
  20. 'unacked*', 'celery@*', 'celery~*',
  21. ]
  22. all_keys = []
  23. for pattern in patterns:
  24. all_keys.extend(redis_client.keys(pattern))
  25. for key in all_keys:
  26. try:
  27. redis_client.delete(key)
  28. except Exception as e:
  29. server_logger.warning(f"{phase}清理 {key} 失败: {e}")
  30. for queue in ['celery', 'celery.pidbox', 'celeryev']:
  31. try:
  32. redis_client.delete(queue)
  33. except Exception as e:
  34. server_logger.warning(f"{phase}清理队列 {queue} 失败: {e}")
  35. for queue in ['celery']:
  36. queue_len = redis_client.llen(queue)
  37. if queue_len > 0:
  38. server_logger.warning(f"队列 {queue} 仍有 {queue_len} 个任务,强制清空")
  39. redis_client.delete(queue)
  40. if all_keys:
  41. server_logger.info(f"{phase}已清理 {len(all_keys)} 个Redis键")
  42. else:
  43. server_logger.info(f"{phase}未发现需要清理的残留任务")
  44. if phase == "启动前":
  45. time.sleep(0.5)
  46. except Exception as e:
  47. server_logger.error(f"{phase}清理Redis任务失败: {e}")
  48. def _kill_existing_celery_workers():
  49. """终止所有现有的 Celery Worker 进程。"""
  50. system = platform.system()
  51. try:
  52. if system == "Windows":
  53. result = subprocess.run(
  54. ['tasklist', '/FI', 'IMAGENAME eq python.exe', '/FO', 'CSV'],
  55. capture_output=True, text=True,
  56. )
  57. if 'celery' in result.stdout.lower():
  58. subprocess.run(['taskkill', '/F', '/IM', 'celery.exe'], capture_output=True)
  59. server_logger.info("已终止现有的Celery Worker进程")
  60. else:
  61. subprocess.run(['pkill', '-f', 'celery worker'], capture_output=True)
  62. server_logger.info("已终止现有的Celery Worker进程")
  63. time.sleep(0.5)
  64. except Exception as e:
  65. server_logger.warning(f"终止现有Celery Worker失败: {e}")