workload_cleaner.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import logging
  2. from typing import Callable
  3. from gpustack_runtime.deployer import (
  4. list_workloads,
  5. WorkloadStatusStateEnum,
  6. delete_workload,
  7. )
  8. from gpustack import envs
  9. from gpustack.client.generated_clientset import ClientSet
  10. from gpustack.utils import network
  11. from gpustack.utils.datetimex import parse_iso8601_to_utc
  12. from gpustack.utils.runtime import is_benchmark_workload
  13. logger = logging.getLogger(__name__)
  14. class WorkloadCleaner:
  15. @property
  16. def _worker_id(self) -> int:
  17. return self._worker_id_getter()
  18. @property
  19. def _clientset(self) -> ClientSet:
  20. return self._clientset_getter()
  21. _clientset_getter: Callable[[], ClientSet]
  22. _worker_id_getter: Callable[[], int]
  23. def __init__(
  24. self,
  25. worker_id_getter: Callable[[], int],
  26. clientset_getter: Callable[[], ClientSet],
  27. ):
  28. self._worker_id_getter = worker_id_getter
  29. self._clientset_getter = clientset_getter
  30. def cleanup_orphan_workloads(self):
  31. current_instance_names = set()
  32. model_instances_page = self._clientset.model_instances.list()
  33. if model_instances_page.items:
  34. for model_instance in model_instances_page.items:
  35. deployment_metadata = model_instance.get_deployment_metadata(
  36. self._worker_id,
  37. )
  38. if deployment_metadata:
  39. current_instance_names.add(deployment_metadata.name)
  40. current_benchmark_names = set()
  41. benchmarks_page = self._clientset.benchmarks.list()
  42. if benchmarks_page.items:
  43. for benchmark in benchmarks_page.items:
  44. deployment_metadata = benchmark.get_deployment_metadata()
  45. if deployment_metadata:
  46. current_benchmark_names.add(deployment_metadata.name)
  47. workloads = list_workloads()
  48. for w in workloads:
  49. create_at = parse_iso8601_to_utc(w.created_at)
  50. should_clean_orphan = False
  51. if is_benchmark_workload(w):
  52. should_clean_orphan, _ = network.is_offline(
  53. create_at,
  54. envs.WORKER_ORPHAN_BENCHMARK_WORKLOAD_CLEANUP_GRACE_PERIOD,
  55. )
  56. # Clean up benchmark workloads that are:
  57. # 1. In FAILED or INACTIVE state (regardless of whether they're in current_benchmark_names)
  58. # 2. Not in current_benchmark_names and past grace period
  59. if should_clean_orphan and (
  60. w.state
  61. in [
  62. WorkloadStatusStateEnum.FAILED,
  63. WorkloadStatusStateEnum.INACTIVE,
  64. ]
  65. or w.name not in current_benchmark_names
  66. ):
  67. delete_workload(w.name)
  68. logger.info(
  69. f"Deleted orphan benchmark workload {w.name}, created at {w.created_at}."
  70. )
  71. else:
  72. should_clean_orphan, _ = network.is_offline(
  73. create_at, envs.WORKER_ORPHAN_WORKLOAD_CLEANUP_GRACE_PERIOD
  74. )
  75. if w.name not in current_instance_names and should_clean_orphan:
  76. delete_workload(w.name)
  77. logger.info(
  78. f"Deleted orphan workload {w.name}, created at {w.created_at}."
  79. )