worker_instance_cleaner.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import asyncio
  2. from collections import defaultdict
  3. import logging
  4. from gpustack import envs
  5. from gpustack.schemas.models import ModelInstance
  6. from gpustack.schemas.workers import Worker, WorkerStateEnum
  7. from gpustack.server.db import async_session
  8. from gpustack.server.services import ModelInstanceService
  9. from gpustack.utils.model_instance_workers import get_model_instance_worker_match
  10. from gpustack.utils.network import is_offline
  11. logger = logging.getLogger(__name__)
  12. class WorkerInstanceCleaner:
  13. """
  14. Periodically check offline workers and delete model instances.
  15. """
  16. def __init__(self, interval=30):
  17. """
  18. :param interval: loop interval in seconds
  19. """
  20. self._interval = interval
  21. async def start(self):
  22. while True:
  23. await asyncio.sleep(self._interval)
  24. try:
  25. await self._cleanup_offline_worker_instances()
  26. except Exception as e:
  27. logger.error(f"Failed to cleanup worker instances: {e}")
  28. async def _cleanup_offline_worker_instances(self):
  29. """
  30. Delete model instances assigned to offline workers.
  31. """
  32. async with async_session() as session:
  33. workers = await Worker.all(session)
  34. if not workers:
  35. return
  36. offline_workers = {}
  37. for worker in workers:
  38. if worker.state == WorkerStateEnum.NOT_READY and (
  39. not worker.maintenance or not worker.maintenance.enabled
  40. ):
  41. offline, last_heartbeat_str = is_offline(
  42. worker.heartbeat_time,
  43. envs.MODEL_INSTANCE_RESCHEDULE_GRACE_PERIOD,
  44. )
  45. if offline:
  46. offline_workers[worker.name] = {
  47. "id": worker.id,
  48. "cluster_id": worker.cluster_id,
  49. "last_heartbeat_str": last_heartbeat_str,
  50. }
  51. if not offline_workers:
  52. return
  53. cluster_ids = {
  54. w["cluster_id"] for w in offline_workers.values() if w["cluster_id"]
  55. }
  56. if cluster_ids:
  57. instances = await ModelInstance.all_by_fields(
  58. session,
  59. extra_conditions=[ModelInstance.cluster_id.in_(cluster_ids)],
  60. )
  61. else:
  62. instances = await ModelInstance.all(session)
  63. if not instances:
  64. return
  65. instances_to_delete = []
  66. impacted_instances_by_worker = defaultdict(list)
  67. for instance in instances:
  68. impacted_worker_names = []
  69. for worker_name, worker_info in offline_workers.items():
  70. match = get_model_instance_worker_match(
  71. instance,
  72. worker_name=worker_name,
  73. worker_id=worker_info["id"],
  74. )
  75. if match.matched:
  76. impacted_worker_names.append(worker_name)
  77. if not impacted_worker_names:
  78. continue
  79. instances_to_delete.append(instance)
  80. for worker_name in impacted_worker_names:
  81. impacted_instances_by_worker[worker_name].append(instance.name)
  82. if not instances_to_delete:
  83. return
  84. await ModelInstanceService(session).batch_delete(instances_to_delete)
  85. reschedule_minutes = envs.MODEL_INSTANCE_RESCHEDULE_GRACE_PERIOD / 60
  86. for worker_name, instance_names in impacted_instances_by_worker.items():
  87. last_heartbeat_str = offline_workers[worker_name]["last_heartbeat_str"]
  88. logger.info(
  89. f"Worker {worker_name} is in NOT_READY state for more than "
  90. f"{reschedule_minutes:.1f} minutes (last heartbeat at {last_heartbeat_str}) "
  91. "and is not in maintenance mode. "
  92. f"The following instances {', '.join(instance_names)} have been deleted and will be automatically rescheduled on other available nodes."
  93. )