| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- """Regression tests for scheduler bus-subscription parameters (issue #4794)."""
- import asyncio
- from types import SimpleNamespace
- from unittest.mock import AsyncMock, patch
- import pytest
- from gpustack.scheduler.scheduler import Scheduler
- from gpustack.server.bus import EventType
- async def _stop_task(task: asyncio.Task) -> None:
- if not task.done():
- task.cancel()
- try:
- await task
- except (asyncio.CancelledError, Exception):
- pass
- @pytest.mark.asyncio
- async def test_scheduler_subscribes_only_to_created_and_skips_replay():
- cfg = SimpleNamespace(cache_dir=None)
- scheduler = Scheduler(cfg, check_interval=180)
- captured_kwargs = {}
- forever = asyncio.Event()
- async def fake_subscribe(*args, **kwargs):
- captured_kwargs.update(kwargs)
- await forever.wait()
- if False:
- yield # pragma: no cover
- with (
- patch(
- "gpustack.scheduler.scheduler.ModelInstance.subscribe",
- side_effect=fake_subscribe,
- ),
- patch.object(scheduler, "_schedule_cycle", AsyncMock()),
- patch.object(
- scheduler, "_enqueue_pending_instances", AsyncMock()
- ) as mock_enqueue,
- patch("gpustack.scheduler.scheduler.AsyncIOScheduler"),
- ):
- task = asyncio.create_task(scheduler.start())
- # Give start() a chance to set up the subscription.
- for _ in range(50):
- await asyncio.sleep(0)
- if captured_kwargs:
- break
- try:
- assert captured_kwargs.get("source") == "scheduler"
- assert captured_kwargs.get("event_types") == {EventType.CREATED}
- assert captured_kwargs.get("replay_existing") is False
- # Bootstrap scan must have run at least once before the live loop.
- assert mock_enqueue.await_count >= 1
- finally:
- forever.set()
- await _stop_task(task)
- @pytest.mark.asyncio
- async def test_enqueue_event_instance_evaluates_only_the_passed_instance():
- cfg = SimpleNamespace(cache_dir=None)
- scheduler = Scheduler(cfg, check_interval=180)
- instance = SimpleNamespace(id=7)
- with (
- patch.object(scheduler, "_evaluate", AsyncMock()) as mock_evaluate,
- patch.object(scheduler, "_should_schedule", return_value=True) as mock_should,
- ):
- await scheduler._enqueue_event_instance(instance)
- mock_should.assert_called_once_with(instance)
- mock_evaluate.assert_awaited_once_with(instance)
- # When _should_schedule returns False, no evaluation happens.
- with (
- patch.object(scheduler, "_evaluate", AsyncMock()) as mock_evaluate,
- patch.object(scheduler, "_should_schedule", return_value=False),
- ):
- await scheduler._enqueue_event_instance(instance)
- mock_evaluate.assert_not_awaited()
- # None and id-less payloads are no-ops.
- with (
- patch.object(scheduler, "_evaluate", AsyncMock()) as mock_evaluate,
- patch.object(scheduler, "_should_schedule", return_value=True) as mock_should,
- ):
- await scheduler._enqueue_event_instance(None)
- await scheduler._enqueue_event_instance(SimpleNamespace(id=None))
- mock_should.assert_not_called()
- mock_evaluate.assert_not_awaited()
- @pytest.mark.asyncio
- async def test_scheduler_handles_live_created_event_with_single_instance_path():
- cfg = SimpleNamespace(cache_dir=None)
- scheduler = Scheduler(cfg, check_interval=180)
- instance_payload = SimpleNamespace(id=42)
- created_event = SimpleNamespace(
- type=EventType.CREATED, data=instance_payload, id=42
- )
- forever = asyncio.Event()
- async def fake_subscribe(*args, **kwargs):
- yield created_event
- await forever.wait()
- with (
- patch(
- "gpustack.scheduler.scheduler.ModelInstance.subscribe",
- side_effect=fake_subscribe,
- ),
- patch.object(scheduler, "_schedule_cycle", AsyncMock()),
- patch.object(
- scheduler, "_enqueue_pending_instances", AsyncMock()
- ) as mock_full_scan,
- patch.object(
- scheduler, "_enqueue_event_instance", AsyncMock()
- ) as mock_event_path,
- ):
- with patch("gpustack.scheduler.scheduler.AsyncIOScheduler"):
- task = asyncio.create_task(scheduler.start())
- for _ in range(100):
- await asyncio.sleep(0)
- if mock_event_path.await_count >= 1:
- break
- try:
- # Bootstrap scan ran exactly once at startup.
- assert mock_full_scan.await_count == 1
- # Event path was invoked once with the instance from the event.
- assert mock_event_path.await_count == 1
- mock_event_path.assert_awaited_with(instance_payload)
- finally:
- forever.set()
- await _stop_task(task)
|