""" Redis 残留任务清理。合并了原来散落在 app.py 中 3 处的清理逻辑。 """ import time import subprocess import platform from foundation.observability.logger.loggering import server_logger def cleanup_redis_tasks(redis_client, phase: str = "启动前"): """清理 Redis 中的 Celery 残留任务。 Args: redis_client: Redis 连接对象 phase: "启动前" | "停止时" — 控制是否杀死现有 worker """ try: server_logger.info(f"{phase}清理Redis中的Celery任务...") if phase == "启动前": _kill_existing_celery_workers() patterns = [ 'task:*', 'celery-task-meta-*', 'current:*', '_kombu.binding.*', 'unacked*', 'celery@*', 'celery~*', ] all_keys = [] for pattern in patterns: all_keys.extend(redis_client.keys(pattern)) for key in all_keys: try: redis_client.delete(key) except Exception as e: server_logger.warning(f"{phase}清理 {key} 失败: {e}") for queue in ['celery', 'celery.pidbox', 'celeryev']: try: redis_client.delete(queue) except Exception as e: server_logger.warning(f"{phase}清理队列 {queue} 失败: {e}") for queue in ['celery']: queue_len = redis_client.llen(queue) if queue_len > 0: server_logger.warning(f"队列 {queue} 仍有 {queue_len} 个任务,强制清空") redis_client.delete(queue) if all_keys: server_logger.info(f"{phase}已清理 {len(all_keys)} 个Redis键") else: server_logger.info(f"{phase}未发现需要清理的残留任务") if phase == "启动前": time.sleep(0.5) except Exception as e: server_logger.error(f"{phase}清理Redis任务失败: {e}") def _kill_existing_celery_workers(): """终止所有现有的 Celery Worker 进程。""" system = platform.system() try: if system == "Windows": # PowerShell Get-CimInstance 查找命令行包含 celery 的 python 进程并终止 result = subprocess.run( ['powershell', '-NoProfile', '-Command', 'Get-CimInstance Win32_Process -Filter "name=\'python.exe\'" | Where-Object {$_.CommandLine -like \'*celery*\'} | Select-Object -ExpandProperty ProcessId'], capture_output=True, text=True, timeout=15, ) for line in result.stdout.strip().split('\n'): pid = line.strip() if pid and pid.isdigit(): subprocess.run(['taskkill', '/F', '/PID', pid], capture_output=True, timeout=5) server_logger.info(f"已终止Celery Worker进程 PID={pid}") else: subprocess.run(['pkill', '-f', 'celery worker'], capture_output=True) server_logger.info("已终止现有的Celery Worker进程") time.sleep(0.5) except Exception as e: server_logger.warning(f"终止现有Celery Worker失败: {e}")