redis_cleanup.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. """
  2. Redis 残留任务清理。合并了原来散落在 app.py 中 3 处的清理逻辑。
  3. """
  4. import time
  5. import subprocess
  6. import platform
  7. from foundation.observability.logger.loggering import server_logger
  8. def cleanup_redis_tasks(redis_client, phase: str = "启动前"):
  9. """清理 Redis 中的 Celery 残留任务。
  10. Args:
  11. redis_client: Redis 连接对象
  12. phase: "启动前" | "停止时" — 控制是否杀死现有 worker
  13. """
  14. try:
  15. server_logger.info(f"{phase}清理Redis中的Celery任务...")
  16. if phase == "启动前":
  17. _kill_existing_celery_workers()
  18. patterns = [
  19. 'task:*', 'celery-task-meta-*', 'current:*', '_kombu.binding.*',
  20. 'unacked*', 'celery@*', 'celery~*',
  21. ]
  22. all_keys = []
  23. for pattern in patterns:
  24. all_keys.extend(redis_client.keys(pattern))
  25. for key in all_keys:
  26. try:
  27. redis_client.delete(key)
  28. except Exception as e:
  29. server_logger.warning(f"{phase}清理 {key} 失败: {e}")
  30. for queue in ['celery', 'celery.pidbox', 'celeryev']:
  31. try:
  32. redis_client.delete(queue)
  33. except Exception as e:
  34. server_logger.warning(f"{phase}清理队列 {queue} 失败: {e}")
  35. for queue in ['celery']:
  36. queue_len = redis_client.llen(queue)
  37. if queue_len > 0:
  38. server_logger.warning(f"队列 {queue} 仍有 {queue_len} 个任务,强制清空")
  39. redis_client.delete(queue)
  40. if all_keys:
  41. server_logger.info(f"{phase}已清理 {len(all_keys)} 个Redis键")
  42. else:
  43. server_logger.info(f"{phase}未发现需要清理的残留任务")
  44. if phase == "启动前":
  45. time.sleep(0.5)
  46. except Exception as e:
  47. server_logger.error(f"{phase}清理Redis任务失败: {e}")
  48. def _kill_existing_celery_workers():
  49. """终止所有现有的 Celery Worker 进程。"""
  50. system = platform.system()
  51. try:
  52. if system == "Windows":
  53. # PowerShell Get-CimInstance 查找命令行包含 celery 的 python 进程并终止
  54. result = subprocess.run(
  55. ['powershell', '-NoProfile', '-Command',
  56. 'Get-CimInstance Win32_Process -Filter "name=\'python.exe\'" | Where-Object {$_.CommandLine -like \'*celery*\'} | Select-Object -ExpandProperty ProcessId'],
  57. capture_output=True, text=True, timeout=15,
  58. )
  59. for line in result.stdout.strip().split('\n'):
  60. pid = line.strip()
  61. if pid and pid.isdigit():
  62. subprocess.run(['taskkill', '/F', '/PID', pid],
  63. capture_output=True, timeout=5)
  64. server_logger.info(f"已终止Celery Worker进程 PID={pid}")
  65. else:
  66. subprocess.run(['pkill', '-f', 'celery worker'], capture_output=True)
  67. server_logger.info("已终止现有的Celery Worker进程")
  68. time.sleep(0.5)
  69. except Exception as e:
  70. server_logger.warning(f"终止现有Celery Worker失败: {e}")