test_scheduler_subscribe.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. """Regression tests for scheduler bus-subscription parameters (issue #4794)."""
  2. import asyncio
  3. from types import SimpleNamespace
  4. from unittest.mock import AsyncMock, patch
  5. import pytest
  6. from gpustack.scheduler.scheduler import Scheduler
  7. from gpustack.server.bus import EventType
  8. async def _stop_task(task: asyncio.Task) -> None:
  9. if not task.done():
  10. task.cancel()
  11. try:
  12. await task
  13. except (asyncio.CancelledError, Exception):
  14. pass
  15. @pytest.mark.asyncio
  16. async def test_scheduler_subscribes_only_to_created_and_skips_replay():
  17. cfg = SimpleNamespace(cache_dir=None)
  18. scheduler = Scheduler(cfg, check_interval=180)
  19. captured_kwargs = {}
  20. forever = asyncio.Event()
  21. async def fake_subscribe(*args, **kwargs):
  22. captured_kwargs.update(kwargs)
  23. await forever.wait()
  24. if False:
  25. yield # pragma: no cover
  26. with (
  27. patch(
  28. "gpustack.scheduler.scheduler.ModelInstance.subscribe",
  29. side_effect=fake_subscribe,
  30. ),
  31. patch.object(scheduler, "_schedule_cycle", AsyncMock()),
  32. patch.object(
  33. scheduler, "_enqueue_pending_instances", AsyncMock()
  34. ) as mock_enqueue,
  35. patch("gpustack.scheduler.scheduler.AsyncIOScheduler"),
  36. ):
  37. task = asyncio.create_task(scheduler.start())
  38. # Give start() a chance to set up the subscription.
  39. for _ in range(50):
  40. await asyncio.sleep(0)
  41. if captured_kwargs:
  42. break
  43. try:
  44. assert captured_kwargs.get("source") == "scheduler"
  45. assert captured_kwargs.get("event_types") == {EventType.CREATED}
  46. assert captured_kwargs.get("replay_existing") is False
  47. # Bootstrap scan must have run at least once before the live loop.
  48. assert mock_enqueue.await_count >= 1
  49. finally:
  50. forever.set()
  51. await _stop_task(task)
  52. @pytest.mark.asyncio
  53. async def test_enqueue_event_instance_evaluates_only_the_passed_instance():
  54. cfg = SimpleNamespace(cache_dir=None)
  55. scheduler = Scheduler(cfg, check_interval=180)
  56. instance = SimpleNamespace(id=7)
  57. with (
  58. patch.object(scheduler, "_evaluate", AsyncMock()) as mock_evaluate,
  59. patch.object(scheduler, "_should_schedule", return_value=True) as mock_should,
  60. ):
  61. await scheduler._enqueue_event_instance(instance)
  62. mock_should.assert_called_once_with(instance)
  63. mock_evaluate.assert_awaited_once_with(instance)
  64. # When _should_schedule returns False, no evaluation happens.
  65. with (
  66. patch.object(scheduler, "_evaluate", AsyncMock()) as mock_evaluate,
  67. patch.object(scheduler, "_should_schedule", return_value=False),
  68. ):
  69. await scheduler._enqueue_event_instance(instance)
  70. mock_evaluate.assert_not_awaited()
  71. # None and id-less payloads are no-ops.
  72. with (
  73. patch.object(scheduler, "_evaluate", AsyncMock()) as mock_evaluate,
  74. patch.object(scheduler, "_should_schedule", return_value=True) as mock_should,
  75. ):
  76. await scheduler._enqueue_event_instance(None)
  77. await scheduler._enqueue_event_instance(SimpleNamespace(id=None))
  78. mock_should.assert_not_called()
  79. mock_evaluate.assert_not_awaited()
  80. @pytest.mark.asyncio
  81. async def test_scheduler_handles_live_created_event_with_single_instance_path():
  82. cfg = SimpleNamespace(cache_dir=None)
  83. scheduler = Scheduler(cfg, check_interval=180)
  84. instance_payload = SimpleNamespace(id=42)
  85. created_event = SimpleNamespace(
  86. type=EventType.CREATED, data=instance_payload, id=42
  87. )
  88. forever = asyncio.Event()
  89. async def fake_subscribe(*args, **kwargs):
  90. yield created_event
  91. await forever.wait()
  92. with (
  93. patch(
  94. "gpustack.scheduler.scheduler.ModelInstance.subscribe",
  95. side_effect=fake_subscribe,
  96. ),
  97. patch.object(scheduler, "_schedule_cycle", AsyncMock()),
  98. patch.object(
  99. scheduler, "_enqueue_pending_instances", AsyncMock()
  100. ) as mock_full_scan,
  101. patch.object(
  102. scheduler, "_enqueue_event_instance", AsyncMock()
  103. ) as mock_event_path,
  104. ):
  105. with patch("gpustack.scheduler.scheduler.AsyncIOScheduler"):
  106. task = asyncio.create_task(scheduler.start())
  107. for _ in range(100):
  108. await asyncio.sleep(0)
  109. if mock_event_path.await_count >= 1:
  110. break
  111. try:
  112. # Bootstrap scan ran exactly once at startup.
  113. assert mock_full_scan.await_count == 1
  114. # Event path was invoked once with the instance from the event.
  115. assert mock_event_path.await_count == 1
  116. mock_event_path.assert_awaited_with(instance_payload)
  117. finally:
  118. forever.set()
  119. await _stop_task(task)