test_score_chain.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. from types import SimpleNamespace
  2. from typing import Dict, List
  3. from unittest.mock import AsyncMock, patch
  4. import pytest
  5. from gpustack import envs
  6. from gpustack.policies.base import (
  7. Allocatable,
  8. ModelInstanceScore,
  9. ModelInstanceScheduleCandidate,
  10. ModelInstanceScorer,
  11. ScheduleCandidatesScorer,
  12. )
  13. from gpustack.policies.scorers.model_file_locality_scorer import (
  14. ModelFileLocalityScorer,
  15. )
  16. from gpustack.policies.scorers.offload_layer_scorer import OffloadLayerScorer
  17. from gpustack.policies.scorers.placement_scorer import (
  18. PlacementScorer,
  19. ScaleTypeEnum,
  20. )
  21. from gpustack.policies.scorers.score_chain import (
  22. CandidateScoreChain,
  23. ModelInstanceScoreChain,
  24. )
  25. from gpustack.policies.scorers.status_scorer import StatusScorer
  26. from gpustack.schemas.model_files import ModelFileStateEnum
  27. from gpustack.schemas.models import (
  28. ComputedResourceClaim,
  29. ModelInstanceStateEnum,
  30. PlacementStrategyEnum,
  31. )
  32. from gpustack.schemas.workers import WorkerStateEnum
  33. from tests.fixtures.workers.fixtures import (
  34. linux_cpu_1,
  35. linux_cpu_2,
  36. linux_nvidia_19_4090_24gx2,
  37. linux_nvidia_2_4080_16gx2,
  38. )
  39. from tests.utils.model import new_model, new_model_instance
  40. class ListCandidateScorer(ScheduleCandidatesScorer):
  41. def __init__(self, scores: List[float]):
  42. self._scores = scores
  43. async def score(self, candidates: List[ModelInstanceScheduleCandidate]):
  44. for idx, candidate in enumerate(candidates):
  45. candidate.score = self._scores[idx] if idx < len(self._scores) else None
  46. return candidates
  47. class DummyInstanceScorer(ModelInstanceScorer):
  48. def __init__(self, scores: List[float], max_score: float):
  49. self._scores = scores
  50. self._max_score = max_score
  51. self.called = False
  52. async def score_instances(self, instances):
  53. self.called = True
  54. results = []
  55. for idx, instance in enumerate(instances):
  56. score = self._scores[idx] if idx < len(self._scores) else 0
  57. results.append(ModelInstanceScore(model_instance=instance, score=score))
  58. return results
  59. def _make_candidate(worker, ram: int = 1, vram=None, gpu_indexes=None):
  60. return ModelInstanceScheduleCandidate(
  61. worker=worker,
  62. gpu_indexes=gpu_indexes,
  63. computed_resource_claim=ComputedResourceClaim(ram=ram, vram=vram or {}),
  64. score=None,
  65. )
  66. def _build_candidates(worker1, worker2, ram: int = 1):
  67. return [
  68. _make_candidate(worker1, ram=ram),
  69. _make_candidate(worker2, ram=ram),
  70. ]
  71. def _build_gpu_candidates(worker1, worker2, vram: int = 50, ram: int = 0):
  72. return [
  73. _make_candidate(worker1, ram=ram, vram={0: vram}, gpu_indexes=[0]),
  74. _make_candidate(worker2, ram=ram, vram={0: vram}, gpu_indexes=[0]),
  75. ]
  76. def _mock_model_file(worker_id, state=ModelFileStateEnum.READY):
  77. return SimpleNamespace(worker_id=worker_id, state=state, resolved_paths=[])
  78. def _scores_by_instance(
  79. scores: List[ModelInstanceScore],
  80. ) -> Dict[int, float]:
  81. return {item.model_instance.id: item.score for item in scores}
  82. @pytest.mark.asyncio
  83. async def test_candidate_score_chain_sums_scores():
  84. candidates = [
  85. _make_candidate(linux_cpu_1()),
  86. _make_candidate(linux_cpu_2()),
  87. ]
  88. chain = CandidateScoreChain(
  89. scorers=[
  90. ListCandidateScorer([10, 20]),
  91. ListCandidateScorer([1.5, None]),
  92. ]
  93. )
  94. scored = await chain.score(candidates)
  95. assert scored[0].score == 11.5
  96. assert scored[1].score == 20.0
  97. @pytest.mark.asyncio
  98. async def test_candidate_score_chain_handles_all_none():
  99. candidates = [
  100. _make_candidate(linux_cpu_1()),
  101. _make_candidate(linux_cpu_2()),
  102. ]
  103. chain = CandidateScoreChain(
  104. scorers=[
  105. ListCandidateScorer([None, None]),
  106. ListCandidateScorer([None, None]),
  107. ]
  108. )
  109. scored = await chain.score(candidates)
  110. assert scored[0].score == 0.0
  111. assert scored[1].score == 0.0
  112. @pytest.mark.asyncio
  113. async def test_instance_score_chain_skips_zero_max_score():
  114. instances = [new_model_instance(1, "i1", 1, worker_id=1)]
  115. zero_scorer = DummyInstanceScorer([100], max_score=0)
  116. valid_scorer = DummyInstanceScorer(
  117. [5], max_score=envs.SCHEDULER_SCALE_DOWN_OFFLOAD_MAX_SCORE
  118. )
  119. chain = ModelInstanceScoreChain(
  120. scorers=[zero_scorer, valid_scorer],
  121. total_max_score=None,
  122. )
  123. scored = await chain.score(instances)
  124. assert zero_scorer.called is False
  125. assert valid_scorer.called is True
  126. assert scored[0].score == 5.0
  127. @pytest.mark.asyncio
  128. async def test_candidate_score_chain_spread_locality():
  129. worker1 = linux_nvidia_19_4090_24gx2()
  130. worker2 = linux_nvidia_2_4080_16gx2()
  131. model = new_model(
  132. 1,
  133. "test",
  134. placement_strategy=PlacementStrategyEnum.SPREAD,
  135. huggingface_repo_id="a/b",
  136. )
  137. model_instances = [
  138. new_model_instance(1, "m1", model.id, worker_id=worker1.id),
  139. new_model_instance(2, "m2", model.id, worker_id=worker2.id),
  140. ]
  141. placement_scorer = PlacementScorer(
  142. model,
  143. model_instances,
  144. max_score=envs.SCHEDULER_SCALE_UP_PLACEMENT_MAX_SCORE,
  145. )
  146. mock_session = AsyncMock()
  147. mock_async_session = AsyncMock()
  148. mock_async_session.__aenter__.return_value = mock_session
  149. with (
  150. patch(
  151. "gpustack.policies.scorers.model_file_locality_scorer.async_session",
  152. return_value=mock_async_session,
  153. ),
  154. patch(
  155. "gpustack.server.services.ModelFileService.get_by_source_index",
  156. new=AsyncMock(return_value=[_mock_model_file(worker2.id)]),
  157. ),
  158. patch(
  159. "gpustack.server.services.ModelFileService.get_by_resolved_path",
  160. new=AsyncMock(return_value=[]),
  161. ),
  162. ):
  163. locality_scorer = ModelFileLocalityScorer(
  164. model,
  165. draft_model_source=None,
  166. max_score=envs.SCHEDULER_SCALE_UP_LOCALITY_MAX_SCORE,
  167. )
  168. placement_scored = await placement_scorer.score(
  169. _build_gpu_candidates(worker1, worker2, vram=1)
  170. )
  171. chain_scored = await CandidateScoreChain(
  172. [placement_scorer, locality_scorer]
  173. ).score(_build_gpu_candidates(worker1, worker2, vram=1))
  174. placement_scores = {c.worker.id: c.score for c in placement_scored}
  175. chain_scores = {c.worker.id: c.score for c in chain_scored}
  176. assert placement_scores[worker1.id] == pytest.approx(91.5)
  177. assert placement_scores[worker2.id] == pytest.approx(91.5)
  178. assert chain_scores[worker1.id] == pytest.approx(91.5)
  179. assert chain_scores[worker2.id] == pytest.approx(96.5)
  180. placement_pick = max(placement_scored, key=lambda c: c.score).worker.id
  181. chain_pick = max(chain_scored, key=lambda c: c.score).worker.id
  182. assert placement_pick == worker1.id
  183. assert chain_pick == worker2.id
  184. @pytest.mark.asyncio
  185. async def test_candidate_score_chain_binpack_and_locality():
  186. worker1 = linux_nvidia_19_4090_24gx2()
  187. worker2 = linux_nvidia_2_4080_16gx2()
  188. model = new_model(
  189. 1,
  190. "test",
  191. placement_strategy=PlacementStrategyEnum.BINPACK,
  192. huggingface_repo_id="a/b",
  193. )
  194. placement_scorer = PlacementScorer(
  195. model, [], max_score=envs.SCHEDULER_SCALE_UP_PLACEMENT_MAX_SCORE
  196. )
  197. mock_session = AsyncMock()
  198. mock_async_session = AsyncMock()
  199. mock_async_session.__aenter__.return_value = mock_session
  200. def allocatable_side_effect(_, worker, gpu_type=None):
  201. return Allocatable(ram=0, vram={0: 100})
  202. with (
  203. patch(
  204. "gpustack.policies.scorers.placement_scorer.get_worker_allocatable_resource",
  205. side_effect=allocatable_side_effect,
  206. ),
  207. patch(
  208. "gpustack.policies.scorers.model_file_locality_scorer.async_session",
  209. return_value=mock_async_session,
  210. ),
  211. patch(
  212. "gpustack.server.services.ModelFileService.get_by_source_index",
  213. new=AsyncMock(return_value=[_mock_model_file(worker1.id)]),
  214. ),
  215. patch(
  216. "gpustack.server.services.ModelFileService.get_by_resolved_path",
  217. new=AsyncMock(return_value=[]),
  218. ),
  219. ):
  220. locality_scorer = ModelFileLocalityScorer(
  221. model,
  222. draft_model_source=None,
  223. max_score=envs.SCHEDULER_SCALE_UP_LOCALITY_MAX_SCORE,
  224. )
  225. placement_scored = await placement_scorer.score(
  226. _build_gpu_candidates(worker1, worker2, vram=50)
  227. )
  228. chain_scored = await CandidateScoreChain(
  229. [placement_scorer, locality_scorer]
  230. ).score(_build_gpu_candidates(worker1, worker2, vram=50))
  231. placement_scores = {c.worker.id: c.score for c in placement_scored}
  232. chain_scores = {c.worker.id: c.score for c in chain_scored}
  233. assert placement_scores[worker1.id] == pytest.approx(33.3333333)
  234. assert placement_scores[worker2.id] == pytest.approx(33.3333333)
  235. assert chain_scores[worker1.id] == pytest.approx(38.3333333)
  236. assert chain_scores[worker2.id] == pytest.approx(33.3333333)
  237. @pytest.mark.asyncio
  238. async def test_candidate_score_chain_binpack_locality_changes_pick():
  239. worker1 = linux_nvidia_19_4090_24gx2()
  240. worker2 = linux_nvidia_2_4080_16gx2()
  241. model = new_model(
  242. 1,
  243. "test",
  244. placement_strategy=PlacementStrategyEnum.BINPACK,
  245. huggingface_repo_id="a/b",
  246. )
  247. placement_scorer = PlacementScorer(
  248. model, [], max_score=envs.SCHEDULER_SCALE_UP_PLACEMENT_MAX_SCORE
  249. )
  250. mock_session = AsyncMock()
  251. mock_async_session = AsyncMock()
  252. mock_async_session.__aenter__.return_value = mock_session
  253. def allocatable_side_effect(_, worker, gpu_type=None):
  254. if worker.id == worker1.id:
  255. return Allocatable(ram=0, vram={0: 100})
  256. return Allocatable(ram=0, vram={0: 110})
  257. with (
  258. patch(
  259. "gpustack.policies.scorers.placement_scorer.get_worker_allocatable_resource",
  260. side_effect=allocatable_side_effect,
  261. ),
  262. patch(
  263. "gpustack.policies.scorers.model_file_locality_scorer.async_session",
  264. return_value=mock_async_session,
  265. ),
  266. patch(
  267. "gpustack.server.services.ModelFileService.get_by_source_index",
  268. new=AsyncMock(return_value=[_mock_model_file(worker2.id)]),
  269. ),
  270. patch(
  271. "gpustack.server.services.ModelFileService.get_by_resolved_path",
  272. new=AsyncMock(return_value=[]),
  273. ),
  274. ):
  275. locality_scorer = ModelFileLocalityScorer(
  276. model,
  277. draft_model_source=None,
  278. max_score=envs.SCHEDULER_SCALE_UP_LOCALITY_MAX_SCORE,
  279. )
  280. placement_scored = await placement_scorer.score(
  281. _build_gpu_candidates(worker1, worker2, vram=50)
  282. )
  283. chain_scored = await CandidateScoreChain(
  284. [placement_scorer, locality_scorer]
  285. ).score(_build_gpu_candidates(worker1, worker2, vram=50))
  286. placement_scores = {c.worker.id: c.score for c in placement_scored}
  287. chain_scores = {c.worker.id: c.score for c in chain_scored}
  288. assert placement_scores[worker1.id] == pytest.approx(33.3333333)
  289. assert placement_scores[worker2.id] == pytest.approx(30.3030303)
  290. assert chain_scores[worker1.id] == pytest.approx(33.3333333)
  291. assert chain_scores[worker2.id] == pytest.approx(35.3030303)
  292. placement_pick = max(placement_scored, key=lambda c: c.score).worker.id
  293. chain_pick = max(chain_scored, key=lambda c: c.score).worker.id
  294. assert placement_pick == worker1.id
  295. assert chain_pick == worker2.id
  296. @pytest.mark.asyncio
  297. async def test_instance_score_chain_scales_with_total_max_score():
  298. instances = [
  299. new_model_instance(1, "i1", 1, worker_id=1),
  300. new_model_instance(2, "i2", 1, worker_id=2),
  301. ]
  302. scorer_a = DummyInstanceScorer(
  303. [10, 5], max_score=envs.SCHEDULER_SCALE_DOWN_STATUS_MAX_SCORE
  304. )
  305. scorer_b = DummyInstanceScorer(
  306. [30, 0], max_score=envs.SCHEDULER_SCALE_DOWN_OFFLOAD_MAX_SCORE
  307. )
  308. chain = ModelInstanceScoreChain(
  309. scorers=[scorer_a, scorer_b],
  310. total_max_score=20,
  311. )
  312. scored = await chain.score(instances)
  313. assert scored[0].score == pytest.approx(7.2727273)
  314. assert scored[1].score == pytest.approx(0.9090909)
  315. @pytest.mark.asyncio
  316. async def test_instance_score_chain_with_real_scorers():
  317. worker1 = linux_nvidia_19_4090_24gx2()
  318. worker2 = linux_nvidia_2_4080_16gx2()
  319. worker1.state = WorkerStateEnum.READY
  320. worker2.state = WorkerStateEnum.NOT_READY
  321. model = new_model(
  322. 1,
  323. "test",
  324. placement_strategy=PlacementStrategyEnum.BINPACK,
  325. huggingface_repo_id="a/b",
  326. )
  327. instances = [
  328. new_model_instance(
  329. 1,
  330. "i1",
  331. model.id,
  332. worker_id=worker1.id,
  333. state=ModelInstanceStateEnum.RUNNING,
  334. computed_resource_claim=ComputedResourceClaim(
  335. ram=100,
  336. vram={},
  337. offload_layers=10,
  338. total_layers=10,
  339. ),
  340. ),
  341. new_model_instance(
  342. 2,
  343. "i2",
  344. model.id,
  345. worker_id=worker2.id,
  346. state=ModelInstanceStateEnum.RUNNING,
  347. computed_resource_claim=ComputedResourceClaim(
  348. ram=50,
  349. vram={},
  350. offload_layers=0,
  351. total_layers=10,
  352. ),
  353. ),
  354. ]
  355. mock_session = AsyncMock()
  356. mock_async_session = AsyncMock()
  357. mock_async_session.__aenter__.return_value = mock_session
  358. def allocatable_side_effect(_, worker, gpu_type=None):
  359. if worker.id == worker1.id:
  360. return Allocatable(ram=1000, vram={})
  361. return Allocatable(ram=2000, vram={})
  362. with (
  363. patch(
  364. "gpustack.policies.scorers.status_scorer.async_session",
  365. return_value=mock_async_session,
  366. ),
  367. patch(
  368. "gpustack.policies.scorers.placement_scorer.get_worker_allocatable_resource",
  369. side_effect=allocatable_side_effect,
  370. ),
  371. patch(
  372. "gpustack.policies.scorers.placement_scorer.async_session",
  373. return_value=mock_async_session,
  374. ),
  375. patch(
  376. "gpustack.policies.scorers.status_scorer.Worker.all",
  377. new=AsyncMock(return_value=[worker1, worker2]),
  378. ),
  379. patch(
  380. "gpustack.policies.scorers.placement_scorer.Worker.all",
  381. new=AsyncMock(return_value=[worker1, worker2]),
  382. ),
  383. ):
  384. status_scorer = StatusScorer(
  385. model, max_score=envs.SCHEDULER_SCALE_DOWN_STATUS_MAX_SCORE
  386. )
  387. offload_scorer = OffloadLayerScorer(
  388. model, max_score=envs.SCHEDULER_SCALE_DOWN_OFFLOAD_MAX_SCORE
  389. )
  390. placement_scorer = PlacementScorer(
  391. model,
  392. instances,
  393. scale_type=ScaleTypeEnum.SCALE_DOWN,
  394. max_score=envs.SCHEDULER_SCALE_DOWN_PLACEMENT_MAX_SCORE,
  395. )
  396. chain_scores = await ModelInstanceScoreChain(
  397. scorers=[status_scorer, offload_scorer, placement_scorer],
  398. total_max_score=None,
  399. ).score(instances)
  400. chain_map = _scores_by_instance(chain_scores)
  401. assert chain_map[instances[0].id] == pytest.approx(110.0909091)
  402. assert chain_map[instances[1].id] == pytest.approx(0.0243902439)