| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- """
- Redis 残留任务清理。合并了原来散落在 app.py 中 3 处的清理逻辑。
- """
- import time
- import subprocess
- import platform
- from foundation.observability.logger.loggering import server_logger
- def cleanup_redis_tasks(redis_client, phase: str = "启动前"):
- """清理 Redis 中的 Celery 残留任务。
- Args:
- redis_client: Redis 连接对象
- phase: "启动前" | "停止时" — 控制是否杀死现有 worker
- """
- try:
- server_logger.info(f"{phase}清理Redis中的Celery任务...")
- if phase == "启动前":
- _kill_existing_celery_workers()
- patterns = [
- 'task:*', 'celery-task-meta-*', 'current:*', '_kombu.binding.*',
- 'unacked*', 'celery@*', 'celery~*',
- ]
- all_keys = []
- for pattern in patterns:
- all_keys.extend(redis_client.keys(pattern))
- for key in all_keys:
- try:
- redis_client.delete(key)
- except Exception as e:
- server_logger.warning(f"{phase}清理 {key} 失败: {e}")
- for queue in ['celery', 'celery.pidbox', 'celeryev']:
- try:
- redis_client.delete(queue)
- except Exception as e:
- server_logger.warning(f"{phase}清理队列 {queue} 失败: {e}")
- for queue in ['celery']:
- queue_len = redis_client.llen(queue)
- if queue_len > 0:
- server_logger.warning(f"队列 {queue} 仍有 {queue_len} 个任务,强制清空")
- redis_client.delete(queue)
- if all_keys:
- server_logger.info(f"{phase}已清理 {len(all_keys)} 个Redis键")
- else:
- server_logger.info(f"{phase}未发现需要清理的残留任务")
- if phase == "启动前":
- time.sleep(0.5)
- except Exception as e:
- server_logger.error(f"{phase}清理Redis任务失败: {e}")
- def _kill_existing_celery_workers():
- """终止所有现有的 Celery Worker 进程。"""
- system = platform.system()
- try:
- if system == "Windows":
- # PowerShell Get-CimInstance 查找命令行包含 celery 的 python 进程并终止
- result = subprocess.run(
- ['powershell', '-NoProfile', '-Command',
- 'Get-CimInstance Win32_Process -Filter "name=\'python.exe\'" | Where-Object {$_.CommandLine -like \'*celery*\'} | Select-Object -ExpandProperty ProcessId'],
- capture_output=True, text=True, timeout=15,
- )
- for line in result.stdout.strip().split('\n'):
- pid = line.strip()
- if pid and pid.isdigit():
- subprocess.run(['taskkill', '/F', '/PID', pid],
- capture_output=True, timeout=5)
- server_logger.info(f"已终止Celery Worker进程 PID={pid}")
- else:
- subprocess.run(['pkill', '-f', 'celery worker'], capture_output=True)
- server_logger.info("已终止现有的Celery Worker进程")
- time.sleep(0.5)
- except Exception as e:
- server_logger.warning(f"终止现有Celery Worker失败: {e}")
|