| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import asyncio
- from collections import defaultdict
- import logging
- from gpustack import envs
- from gpustack.schemas.models import ModelInstance
- from gpustack.schemas.workers import Worker, WorkerStateEnum
- from gpustack.server.db import async_session
- from gpustack.server.services import ModelInstanceService
- from gpustack.utils.model_instance_workers import get_model_instance_worker_match
- from gpustack.utils.network import is_offline
- logger = logging.getLogger(__name__)
- class WorkerInstanceCleaner:
- """
- Periodically check offline workers and delete model instances.
- """
- def __init__(self, interval=30):
- """
- :param interval: loop interval in seconds
- """
- self._interval = interval
- async def start(self):
- while True:
- await asyncio.sleep(self._interval)
- try:
- await self._cleanup_offline_worker_instances()
- except Exception as e:
- logger.error(f"Failed to cleanup worker instances: {e}")
- async def _cleanup_offline_worker_instances(self):
- """
- Delete model instances assigned to offline workers.
- """
- async with async_session() as session:
- workers = await Worker.all(session)
- if not workers:
- return
- offline_workers = {}
- for worker in workers:
- if worker.state == WorkerStateEnum.NOT_READY and (
- not worker.maintenance or not worker.maintenance.enabled
- ):
- offline, last_heartbeat_str = is_offline(
- worker.heartbeat_time,
- envs.MODEL_INSTANCE_RESCHEDULE_GRACE_PERIOD,
- )
- if offline:
- offline_workers[worker.name] = {
- "id": worker.id,
- "cluster_id": worker.cluster_id,
- "last_heartbeat_str": last_heartbeat_str,
- }
- if not offline_workers:
- return
- cluster_ids = {
- w["cluster_id"] for w in offline_workers.values() if w["cluster_id"]
- }
- if cluster_ids:
- instances = await ModelInstance.all_by_fields(
- session,
- extra_conditions=[ModelInstance.cluster_id.in_(cluster_ids)],
- )
- else:
- instances = await ModelInstance.all(session)
- if not instances:
- return
- instances_to_delete = []
- impacted_instances_by_worker = defaultdict(list)
- for instance in instances:
- impacted_worker_names = []
- for worker_name, worker_info in offline_workers.items():
- match = get_model_instance_worker_match(
- instance,
- worker_name=worker_name,
- worker_id=worker_info["id"],
- )
- if match.matched:
- impacted_worker_names.append(worker_name)
- if not impacted_worker_names:
- continue
- instances_to_delete.append(instance)
- for worker_name in impacted_worker_names:
- impacted_instances_by_worker[worker_name].append(instance.name)
- if not instances_to_delete:
- return
- await ModelInstanceService(session).batch_delete(instances_to_delete)
- reschedule_minutes = envs.MODEL_INSTANCE_RESCHEDULE_GRACE_PERIOD / 60
- for worker_name, instance_names in impacted_instances_by_worker.items():
- last_heartbeat_str = offline_workers[worker_name]["last_heartbeat_str"]
- logger.info(
- f"Worker {worker_name} is in NOT_READY state for more than "
- f"{reschedule_minutes:.1f} minutes (last heartbeat at {last_heartbeat_str}) "
- "and is not in maintenance mode. "
- f"The following instances {', '.join(instance_names)} have been deleted and will be automatically rescheduled on other available nodes."
- )
|