| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- """
- Redis 残留任务清理。合并了原来散落在 app.py 中 3 处的清理逻辑。
- """
- import time
- import subprocess
- import platform
- from foundation.observability.logger.loggering import server_logger
- def cleanup_redis_tasks(redis_client, phase: str = "启动前"):
- """清理 Redis 中的 Celery 残留任务。
- Args:
- redis_client: Redis 连接对象
- phase: "启动前" | "停止时" — 控制是否杀死现有 worker
- """
- try:
- server_logger.info(f"{phase}清理Redis中的Celery任务...")
- if phase == "启动前":
- _kill_existing_celery_workers()
- patterns = [
- 'task:*', 'celery-task-meta-*', 'current:*', '_kombu.binding.*',
- 'unacked*', 'celery@*', 'celery~*',
- ]
- all_keys = []
- for pattern in patterns:
- all_keys.extend(redis_client.keys(pattern))
- for key in all_keys:
- try:
- redis_client.delete(key)
- except Exception as e:
- server_logger.warning(f"{phase}清理 {key} 失败: {e}")
- for queue in ['celery', 'celery.pidbox', 'celeryev']:
- try:
- redis_client.delete(queue)
- except Exception as e:
- server_logger.warning(f"{phase}清理队列 {queue} 失败: {e}")
- for queue in ['celery']:
- queue_len = redis_client.llen(queue)
- if queue_len > 0:
- server_logger.warning(f"队列 {queue} 仍有 {queue_len} 个任务,强制清空")
- redis_client.delete(queue)
- if all_keys:
- server_logger.info(f"{phase}已清理 {len(all_keys)} 个Redis键")
- else:
- server_logger.info(f"{phase}未发现需要清理的残留任务")
- if phase == "启动前":
- time.sleep(0.5)
- except Exception as e:
- server_logger.error(f"{phase}清理Redis任务失败: {e}")
- def _kill_existing_celery_workers():
- """终止所有现有的 Celery Worker 进程。"""
- system = platform.system()
- try:
- if system == "Windows":
- result = subprocess.run(
- ['tasklist', '/FI', 'IMAGENAME eq python.exe', '/FO', 'CSV'],
- capture_output=True, text=True,
- )
- if 'celery' in result.stdout.lower():
- subprocess.run(['taskkill', '/F', '/IM', 'celery.exe'], capture_output=True)
- server_logger.info("已终止现有的Celery Worker进程")
- else:
- subprocess.run(['pkill', '-f', 'celery worker'], capture_output=True)
- server_logger.info("已终止现有的Celery Worker进程")
- time.sleep(0.5)
- except Exception as e:
- server_logger.warning(f"终止现有Celery Worker失败: {e}")
|