vllm_resource_fit_selector.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785
  1. import json
  2. from collections import defaultdict
  3. import logging
  4. import re
  5. from typing import Dict, List, Optional, Tuple
  6. from gpustack.policies.base import (
  7. Allocatable,
  8. ModelInstanceScheduleCandidate,
  9. )
  10. from gpustack.policies.candidate_selectors.base_candidate_selector import (
  11. EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU,
  12. EVENT_ACTION_AUTO_SINGLE_GPU,
  13. EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU,
  14. EVENT_ACTION_DEFAULT,
  15. EVENT_ACTION_MANUAL_MULTI,
  16. RequestEstimateUsage,
  17. ScheduleCandidatesSelector,
  18. )
  19. from gpustack.policies.event_recorder.recorder import EventCollector, EventLevelEnum
  20. from gpustack.policies.utils import (
  21. ListMessageBuilder,
  22. estimate_model_vram,
  23. group_workers_by_gpu_type,
  24. ram_not_enough,
  25. get_model_ram_claim,
  26. get_computed_ram_claim,
  27. sort_workers_by_gpu_count,
  28. )
  29. from gpustack.schemas.models import (
  30. CategoryEnum,
  31. ComputedResourceClaim,
  32. Model,
  33. ModelInstance,
  34. ModelInstanceSubordinateWorker,
  35. )
  36. from gpustack.schemas.workers import GPUDevicesStatus, Worker
  37. from gpustack.config import Config
  38. from gpustack.utils.command import (
  39. find_bool_parameter,
  40. find_parameter,
  41. find_int_parameter,
  42. )
  43. from gpustack.utils.unit import byte_to_gib
  44. logger = logging.getLogger(__name__)
  45. def parse_model_size_by_name(model_name: str) -> int:
  46. """
  47. Parse the model size from the model name.
  48. """
  49. match = re.search(r"(\d+(?:\.\d+)?)\s*[Bb]", model_name)
  50. if match:
  51. size_in_billions = float(match.group(1))
  52. return int(size_in_billions * 1e9)
  53. else:
  54. raise ValueError(f"Cannot parse model size from model name: {model_name}")
  55. class VLLMResourceFitSelector(ScheduleCandidatesSelector):
  56. def __init__(
  57. self,
  58. cfg: Config,
  59. model: Model,
  60. model_instances: List[ModelInstance],
  61. ):
  62. super().__init__(cfg, model, model_instances)
  63. self._vram_claim = 0
  64. self._ram_claim = 0
  65. self._largest_single_gpu_vram = 0
  66. self._largest_single_gpu_vram_utilization = 0
  67. self._largest_multi_gpu_vram = 0
  68. self._largest_multi_gpu_total = 0
  69. self._largest_multi_gpu_utilization_satisfied_count = 0
  70. self._messages = []
  71. self._event_collector = EventCollector(self._model, logger)
  72. self._workers_allocatable_resource: Dict[int, Allocatable] = {}
  73. self._worker_name_to_vram: Dict[str, Dict[int, int]] = {}
  74. self._unsatisfied_gpu_messages: Dict[str, List[int]] = {}
  75. world_size, strategies = (
  76. VLLMResourceFitSelector.get_world_size_from_backend_parameters(model)
  77. )
  78. self._set_gpu_count(world_size, strategies)
  79. async def _init_model_parameters(self, workers: List[Worker]):
  80. await super()._init_model_parameters(workers)
  81. self._validate_arguments()
  82. # GMU relies on architecture info in model parameters. Set it after model parameters are initialized.
  83. self._set_gpu_memory_utilization()
  84. def _should_check_vision_tp_divisibility(self) -> bool:
  85. if not self._model.backend_parameters:
  86. return True
  87. language_only = find_bool_parameter(
  88. self._model.backend_parameters, ["language-model-only"]
  89. )
  90. return not language_only
  91. @staticmethod
  92. def get_world_size_from_backend_parameters(
  93. model: Model,
  94. ) -> Tuple[Optional[int], Optional[List[str]]]:
  95. tp = find_int_parameter(
  96. model.backend_parameters, ["tensor-parallel-size", "tp"]
  97. )
  98. pp = find_int_parameter(
  99. model.backend_parameters, ["pipeline-parallel-size", "pp"]
  100. )
  101. dp = find_int_parameter(model.backend_parameters, ["data-parallel-size", "dp"])
  102. dpl = find_int_parameter(
  103. model.backend_parameters, ["--data-parallel-size-local", "dpl"]
  104. )
  105. if tp or pp or dp:
  106. world_size = 1
  107. strategies = []
  108. if tp:
  109. strategies.append("tp")
  110. world_size *= tp
  111. if pp:
  112. strategies.append("pp")
  113. world_size *= pp
  114. if dp:
  115. strategies.append("dp")
  116. world_size *= dp
  117. if dpl:
  118. # NB(thxCode): Indicate how many DP groups(each group owns `-dp` number devices) are there in one worker.
  119. world_size *= dpl
  120. return world_size, strategies
  121. return None, None
  122. def _set_gpu_memory_utilization(self):
  123. self._gpu_memory_utilization = 0.9
  124. model = self._model
  125. if self._disable_gpu_memory_utilization():
  126. # gpu memory utilization is not used for non-LLM models
  127. self._gpu_memory_utilization = 0
  128. self._gpu_memory_utilization_parameter_name = "gpu-memory-utilization"
  129. gmu = find_parameter(
  130. model.backend_parameters, [self._gpu_memory_utilization_parameter_name]
  131. )
  132. if gmu:
  133. self._gpu_memory_utilization = float(gmu)
  134. def _disable_gpu_memory_utilization(self) -> bool:
  135. """
  136. Determine whether GPU memory utilization should be disabled. vLLM does not use --gpu-memory-utilization for non-LLM models
  137. like embedding and reranker, except for some specific models like Qwen3-Embedding and Qwen3-Reranker.
  138. Rules:
  139. 1. For non-LLM models, GPU memory utilization is DISABLED (return True) unless they are in the exception list.
  140. 2. Otherwise, GPU memory utilization is ENABLED (return False).
  141. """
  142. if not self._model.categories:
  143. return False
  144. architectures = self._model_params.architectures or []
  145. # Non-LLM models that vLLM still uses GPU memory utilization
  146. NON_LLM_GMU_EXCEPTIONS = {
  147. "Qwen3ForCausalLM",
  148. "Qwen3ForSequenceClassification", # Qwen3-Embedding & Qwen3-Reranker
  149. "Qwen3VLForConditionalGeneration", # Qwen3-VL-Embedding & Qwen3-VL-Reranker
  150. }
  151. use_gmu_categories = [CategoryEnum.LLM, CategoryEnum.SPEECH_TO_TEXT]
  152. if any(cat in self._model.categories for cat in use_gmu_categories):
  153. return False
  154. # Disable for non-LLM models unless they are in the exception list
  155. return not any(arch in NON_LLM_GMU_EXCEPTIONS for arch in architectures)
  156. def _set_model_parameters(self):
  157. super()._set_model_parameters()
  158. # Get the architectures from hf-overrides. This helps make resource allocation
  159. # decisions for specific models like Qwen3-Embedding and Qwen3-Reranker.
  160. hf_overrides = find_parameter(self._model.backend_parameters, ["hf-overrides"])
  161. if hf_overrides:
  162. overrides_dict = json.loads(hf_overrides)
  163. if isinstance(overrides_dict, dict) and "architectures" in overrides_dict:
  164. self._model_params.architectures = overrides_dict["architectures"]
  165. self._num_attention_heads = self._model_params.num_attention_heads
  166. def _cal_effective_vram(self) -> float:
  167. if self._largest_multi_gpu_total == 0:
  168. return 0.0
  169. return (
  170. byte_to_gib(self._largest_multi_gpu_vram)
  171. * self._gpu_memory_utilization
  172. * self._largest_multi_gpu_utilization_satisfied_count
  173. / self._largest_multi_gpu_total
  174. )
  175. def _set_messages(self):
  176. if self._messages:
  177. return
  178. event_messages = {
  179. EVENT_ACTION_DEFAULT: "",
  180. EVENT_ACTION_MANUAL_MULTI: "",
  181. EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU: "",
  182. EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU: "",
  183. EVENT_ACTION_AUTO_SINGLE_GPU: "",
  184. }
  185. for event in self._event_collector.events:
  186. event_messages[event.action] = event.message
  187. messages = event_messages[EVENT_ACTION_DEFAULT] + "\n"
  188. for action in [
  189. EVENT_ACTION_MANUAL_MULTI,
  190. EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU,
  191. EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU,
  192. EVENT_ACTION_AUTO_SINGLE_GPU,
  193. ]:
  194. if event_messages[action]:
  195. messages += event_messages[action]
  196. break
  197. self._messages.append(messages)
  198. def _add_message(self, message: str):
  199. self._messages.append(message)
  200. def get_messages(self) -> List[str]:
  201. return self._messages
  202. def _get_worker_vram(self, worker: Worker) -> Dict[int, int]:
  203. if worker.name in self._worker_name_to_vram:
  204. return self._worker_name_to_vram[worker.name]
  205. if worker.status is None or not worker.status.gpu_devices:
  206. return {}
  207. vram_total_by_index = {}
  208. for gpu in worker.status.gpu_devices:
  209. total = gpu.memory.total if gpu.memory else 0
  210. vram_total_by_index[gpu.index] = total
  211. self._worker_name_to_vram[worker.name] = vram_total_by_index
  212. return vram_total_by_index
  213. async def select_candidates(
  214. self, workers: List[Worker]
  215. ) -> List[ModelInstanceScheduleCandidate]:
  216. """
  217. Get schedule candidates that fit the GPU resources requirement.
  218. """
  219. # Initialize model parameters.
  220. await self._init_model_parameters(workers)
  221. self._vram_claim = await estimate_model_vram(
  222. self._model, self._config.huggingface_token, workers
  223. )
  224. self._ram_claim = get_model_ram_claim(self._model)
  225. logger.info(
  226. f"Calculated resource claim for model {self._model.readable_source}, "
  227. f"VRAM claim: {self._vram_claim}, RAM claim: {self._ram_claim}"
  228. )
  229. default_msg_list = ListMessageBuilder(
  230. f"The model requires approximately {byte_to_gib(self._vram_claim)} GiB of VRAM"
  231. f"{f' and {byte_to_gib(self._ram_claim)} GiB of RAM' if self._ram_claim > 0 else ''}."
  232. )
  233. if self._gpu_memory_utilization != 0:
  234. default_msg_list.append(
  235. f"With --{self._gpu_memory_utilization_parameter_name}={self._gpu_memory_utilization}, "
  236. f"all GPUs combined need to provide at least {byte_to_gib(int(self._vram_claim / self._gpu_memory_utilization))} GiB of total VRAM "
  237. f"and each GPU needs {int(self._gpu_memory_utilization * 100)}% of allocatable VRAM."
  238. )
  239. self._event_collector.add(
  240. EventLevelEnum.INFO,
  241. EVENT_ACTION_DEFAULT,
  242. str(default_msg_list),
  243. )
  244. candidate_functions = [
  245. self.find_manual_gpu_selection_candidates,
  246. self.find_single_worker_single_gpu_full_offloading_candidates,
  247. self.find_single_worker_multi_gpu_full_offloading_candidates,
  248. self.find_multi_worker_multi_gpu_candidates,
  249. ]
  250. for candidate_func in candidate_functions:
  251. if self.should_skip_candidate_func(candidate_func):
  252. continue
  253. logger.debug(
  254. f"model {self._model.readable_source}, filter candidates with resource fit selector: {candidate_func.__name__}"
  255. )
  256. candidates = candidate_func(workers)
  257. if len(candidates) >= 1 and candidates[0].overcommit:
  258. # Manually selected candidate with overcommit. Also add the message.
  259. # It's useful for compatibility check.
  260. self._set_messages()
  261. if candidates:
  262. return candidates
  263. self._set_messages()
  264. return []
  265. def should_skip_candidate_func(self, candidate_func) -> bool:
  266. # Skip conditions for manual GPU selection.
  267. if (
  268. self._selected_gpu_workers
  269. and candidate_func != self.find_manual_gpu_selection_candidates
  270. ):
  271. return True
  272. # Skip conditions for distributed inference.
  273. if (
  274. not self._model.distributed_inference_across_workers
  275. and candidate_func == self.find_multi_worker_multi_gpu_candidates
  276. ):
  277. return True
  278. return False
  279. def find_manual_gpu_selection_candidates(
  280. self, workers: List[Worker]
  281. ) -> List[ModelInstanceScheduleCandidate]:
  282. request = RequestEstimateUsage(
  283. ram=self._ram_claim,
  284. vram=self._vram_claim,
  285. )
  286. return self._find_manual_gpu_selection_candidates(
  287. workers,
  288. {"*": self._gpu_memory_utilization},
  289. request,
  290. self._event_collector,
  291. )
  292. def find_single_worker_single_gpu_full_offloading_candidates(
  293. self, workers: List[Worker]
  294. ) -> List[ModelInstanceScheduleCandidate]:
  295. """
  296. Find single worker single gpu full offloading candidates for the model instance with workers.
  297. """
  298. if self._gpu_count is not None and self._gpu_count > 1:
  299. # Skip multi-GPU selection
  300. return []
  301. candidates = []
  302. workers_of_type = group_workers_by_gpu_type(workers)
  303. for gpu_type, workers_of_type in workers_of_type.items():
  304. for worker in workers_of_type:
  305. if not worker.status.gpu_devices:
  306. continue
  307. result = self._find_single_worker_single_gpu_full_offloading_candidates(
  308. worker,
  309. gpu_type,
  310. )
  311. if result:
  312. candidates.extend(result)
  313. return candidates
  314. def _find_single_worker_single_gpu_full_offloading_candidates(
  315. self,
  316. worker: Worker,
  317. gpu_type: Optional[str] = None,
  318. ) -> List[ModelInstanceScheduleCandidate]:
  319. """
  320. Find single worker single gpu full offloading candidates for the model instance with worker.
  321. requires: worker.status.gpu_devices is not None
  322. """
  323. candidates = []
  324. allocatable = self.get_worker_allocatable_resource(worker, gpu_type)
  325. if ram_not_enough(self._ram_claim, allocatable):
  326. return []
  327. if not worker.status.gpu_devices:
  328. return []
  329. for _, gpu in enumerate(worker.status.gpu_devices):
  330. gpu_index = gpu.index
  331. allocatable_vram = allocatable.vram.get(gpu_index, 0)
  332. allocatable_gpu_memory_utilization = allocatable_vram / gpu.memory.total
  333. if allocatable_vram > self._largest_single_gpu_vram:
  334. self._largest_single_gpu_vram = allocatable_vram
  335. self._largest_single_gpu_vram_utilization = (
  336. allocatable_gpu_memory_utilization
  337. )
  338. if gpu.memory is None or gpu.memory.total == 0:
  339. continue
  340. exceeds_vram = (
  341. self._vram_claim > gpu.memory.total * self._gpu_memory_utilization
  342. if self._gpu_memory_utilization > 0 # LLMs
  343. else self._vram_claim > allocatable_vram # non LLMs
  344. )
  345. exceeds_memory_utilization = (
  346. self._gpu_memory_utilization > 0
  347. and allocatable_gpu_memory_utilization < self._gpu_memory_utilization
  348. )
  349. if exceeds_vram or exceeds_memory_utilization:
  350. continue
  351. vram_claim_bytes = (
  352. int(gpu.memory.total * self._gpu_memory_utilization)
  353. if self._gpu_memory_utilization > 0 # LLMs
  354. else int(self._vram_claim) # non LLMs
  355. )
  356. vram_claim = {gpu_index: vram_claim_bytes}
  357. candidates.append(
  358. ModelInstanceScheduleCandidate(
  359. worker=worker,
  360. gpu_indexes=[gpu_index],
  361. gpu_type=gpu.type,
  362. computed_resource_claim=ComputedResourceClaim(
  363. vram=vram_claim,
  364. ram=get_computed_ram_claim(self._model, vram_claim),
  365. estimated_vram=self._vram_claim,
  366. ),
  367. )
  368. )
  369. if not candidates or (len(candidates) == 1 and candidates[0].overcommit):
  370. event_msg = f"The current available GPU only has {byte_to_gib(self._largest_single_gpu_vram)} GiB allocatable VRAM."
  371. if self._gpu_memory_utilization != 0:
  372. event_msg = (
  373. event_msg.rstrip(".")
  374. + f" ({(self._largest_single_gpu_vram_utilization * 100):.2f}%)."
  375. )
  376. self._event_collector.add(
  377. EventLevelEnum.INFO,
  378. EVENT_ACTION_AUTO_SINGLE_GPU,
  379. str(ListMessageBuilder(event_msg)),
  380. )
  381. return candidates
  382. def find_single_worker_multi_gpu_full_offloading_candidates(
  383. self, workers: List[Worker]
  384. ) -> List[ModelInstanceScheduleCandidate]:
  385. if self._gpu_count == 1:
  386. return []
  387. candidates = []
  388. workers_of_type = group_workers_by_gpu_type(workers)
  389. for gpu_type, workers_of_type in workers_of_type.items():
  390. for worker in workers_of_type:
  391. if not worker.status.gpu_devices:
  392. continue
  393. result = self._find_single_worker_multi_gpu_full_offloading_candidates(
  394. worker, gpu_type
  395. )
  396. if result:
  397. candidates.extend(result)
  398. if not candidates:
  399. return []
  400. min_gpu_count = min(len(candidate.gpu_indexes) for candidate in candidates)
  401. final_candidates = [
  402. candidate
  403. for candidate in candidates
  404. if len(candidate.gpu_indexes) == min_gpu_count
  405. ]
  406. return final_candidates
  407. def _find_single_worker_multi_gpu_full_offloading_candidates( # noqa: C901
  408. self, worker: Worker, gpu_type: Optional[str] = None
  409. ) -> List[ModelInstanceScheduleCandidate]:
  410. """
  411. Find single worker multi gpu full offloading candidates for the model instance.
  412. requires: worker.status.gpu_devices is not None
  413. """
  414. total_gpu = len(worker.status.gpu_devices)
  415. if total_gpu < 2:
  416. return None
  417. allocatable = self.get_worker_allocatable_resource(worker, gpu_type)
  418. if ram_not_enough(self._ram_claim, allocatable):
  419. return []
  420. gpu_list = []
  421. total_allocatable_vram = 0
  422. satisfied_gpu_count = 0
  423. for gpu in worker.status.gpu_devices:
  424. if gpu.memory is None or gpu.memory.total is None:
  425. continue
  426. allocatable_vram = allocatable.vram.get(gpu.index, 0)
  427. total_allocatable_vram += allocatable_vram
  428. if allocatable_vram / gpu.memory.total > self._gpu_memory_utilization:
  429. satisfied_gpu_count += 1
  430. gpu_list.append(gpu)
  431. if total_allocatable_vram > self._largest_multi_gpu_total:
  432. self._largest_multi_gpu_vram = total_allocatable_vram
  433. self._largest_multi_gpu_utilization_satisfied_count = satisfied_gpu_count
  434. self._largest_multi_gpu_total = len(worker.status.gpu_devices)
  435. # Sort by vram in descending order
  436. sorted_gpu_devices: GPUDevicesStatus = sorted(
  437. gpu_list,
  438. key=lambda gpu: allocatable.vram.get(gpu.index, 0),
  439. reverse=True,
  440. )
  441. vram_sum = 0
  442. gpu_sum = 0
  443. gpu_indexes = []
  444. vram_claim: Dict[int, int] = {}
  445. found_candidate = False
  446. for _, gpu in enumerate(sorted_gpu_devices):
  447. gpu_indexes.append(gpu.index)
  448. vram_claim[gpu.index] = (
  449. int(gpu.memory.total * self._gpu_memory_utilization)
  450. if self._gpu_memory_utilization > 0 # LLMs
  451. else allocatable.vram.get(gpu.index, 0) # non LLMs
  452. )
  453. gpu_sum += 1
  454. vram_sum += vram_claim[gpu.index]
  455. if not self._is_tp_size_divisible(gpu_sum):
  456. continue
  457. if self._gpu_count and gpu_sum >= self._gpu_count:
  458. if vram_sum >= self._vram_claim:
  459. found_candidate = True
  460. # if self._gpu_count is set, cannot return more than gpu_count
  461. break
  462. if (not self._gpu_count) and vram_sum >= self._vram_claim:
  463. found_candidate = True
  464. break
  465. if found_candidate:
  466. return [
  467. ModelInstanceScheduleCandidate(
  468. worker=worker,
  469. gpu_type=gpu_type,
  470. gpu_indexes=gpu_indexes,
  471. computed_resource_claim=ComputedResourceClaim(
  472. vram=vram_claim,
  473. ram=get_computed_ram_claim(self._model, vram_claim),
  474. estimated_vram=self._vram_claim,
  475. ),
  476. )
  477. ]
  478. event_msg_list = []
  479. if msg := self._check_tp_size_divisibility(
  480. self._largest_multi_gpu_utilization_satisfied_count
  481. ):
  482. event_msg_list.append(msg)
  483. event_msg = f"The largest available worker has {byte_to_gib(self._largest_multi_gpu_vram)} GiB allocatable VRAM."
  484. if self._gpu_memory_utilization != 0:
  485. event_msg = (
  486. event_msg.rstrip(".")
  487. + f", {self._largest_multi_gpu_utilization_satisfied_count}/{self._largest_multi_gpu_total} of GPUs meet the VRAM utilization ratio, providing {self._cal_effective_vram():.2f} GiB of allocatable VRAM."
  488. )
  489. event_msg_list.append(event_msg)
  490. self._event_collector.add(
  491. EventLevelEnum.INFO,
  492. EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU,
  493. str(ListMessageBuilder(event_msg_list)),
  494. )
  495. return []
  496. def find_multi_worker_multi_gpu_candidates(
  497. self, workers: List[Worker]
  498. ) -> List[ModelInstanceScheduleCandidate]:
  499. candidates = []
  500. workers_of_type = group_workers_by_gpu_type(workers)
  501. for gpu_type, workers_of_type in workers_of_type.items():
  502. result = self.auto_select_multi_worker_multi_gpu_candidates(
  503. workers_of_type, gpu_type
  504. )
  505. if result:
  506. candidates.extend(result)
  507. return candidates
  508. def auto_select_multi_worker_multi_gpu_candidates( # noqa: C901
  509. self, workers: List[Worker], gpu_type: Optional[str] = None
  510. ) -> List[ModelInstanceScheduleCandidate]:
  511. """
  512. Auto select multi worker multi gpu candidates.
  513. Currently, a candidate should match the following conditions:
  514. 1. Workers in the candidate have the same number of GPUs.
  515. 2. All GPUs in the worker satisfy the gpu_memory_utilization requirement.
  516. 3. TP size can be divided by the number of attention heads.
  517. 4. The total VRAM claim is greater than the estimated VRAM claim.
  518. 5. If gpu_count is set via parallelism, the total GPU count should be equal to gpu_count.
  519. """
  520. if not workers or len(workers) < 2:
  521. return []
  522. sort_workers_by_gpu_count(workers)
  523. workers_by_gpu_count_dict = defaultdict(list)
  524. for worker in workers:
  525. if not worker.status or not worker.status.gpu_devices:
  526. continue
  527. workers_by_gpu_count_dict[len(worker.status.gpu_devices)].append(worker)
  528. # Store the optimal combination info to show
  529. workers_combination: List[Worker] = []
  530. largest_vram = 0
  531. worker_count = 0
  532. device_count_per_worker = 0
  533. # Loop through worker groups with the same number of GPUs.
  534. for gpu_count, worker_group in workers_by_gpu_count_dict.items():
  535. if len(worker_group) < 2:
  536. continue
  537. if not self._is_tp_size_divisible(gpu_count):
  538. continue
  539. selected_workers: List[Worker] = []
  540. gpu_sum = 0
  541. vram_sum = 0
  542. for worker in worker_group:
  543. allocatable = self.get_worker_allocatable_resource(worker, gpu_type)
  544. if ram_not_enough(self._ram_claim, allocatable):
  545. # The RAM resource(for extended KV cache) is required per worker.
  546. # Skip the worker if it does not satisfy the RAM requirement.
  547. continue
  548. if any(
  549. gpu.memory is None
  550. or gpu.memory.total is None
  551. or (
  552. allocatable.vram.get(gpu.index, 0) / gpu.memory.total
  553. < self._gpu_memory_utilization
  554. )
  555. for gpu in worker.status.gpu_devices
  556. ):
  557. # Skip the worker if any GPU does not satisfy the gpu_memory_utilization requirement.
  558. continue
  559. selected_workers.append(worker)
  560. gpu_sum += gpu_count
  561. vram_sum += sum(
  562. int(gpu.memory.total * (self._gpu_memory_utilization or 1))
  563. for gpu in worker.status.gpu_devices
  564. )
  565. if self._gpu_count:
  566. # Parallelism is set. Proceed until we match the exact GPU count.
  567. if gpu_sum < self._gpu_count:
  568. continue
  569. elif gpu_sum > self._gpu_count:
  570. break
  571. if vram_sum >= self._vram_claim:
  572. return [
  573. _create_candidate(
  574. self._model,
  575. selected_workers,
  576. self._gpu_memory_utilization,
  577. estimated_vram=self._vram_claim,
  578. )
  579. ]
  580. if vram_sum > largest_vram:
  581. workers_combination = selected_workers
  582. largest_vram = vram_sum
  583. worker_count = len(worker_group)
  584. device_count_per_worker = gpu_count
  585. # Nothing can be return, construct scheduling message
  586. event_message = ListMessageBuilder([])
  587. if self._gpu_memory_utilization == 0:
  588. event_message.append(
  589. f"The largest available worker has {byte_to_gib(largest_vram)} GiB of VRAM."
  590. )
  591. elif workers_combination:
  592. worker_names = [worker.name for worker in workers_combination]
  593. worker_names_msg = (
  594. str(worker_names[:3]).rstrip("]")
  595. + f"...(more {len(worker_names) - 3})]"
  596. if len(worker_names) > 3
  597. else str(worker_names)
  598. )
  599. message = f"The optimal combination {worker_names_msg} provides {byte_to_gib(largest_vram)} GiB of allocatable VRAM."
  600. if worker_count - len(workers_combination) > 0:
  601. message += f" There are {worker_count - len(workers_combination)} {'workers' if worker_count - len(workers_combination) > 1 else 'worker'} that can provide {device_count_per_worker} {'GPUs' if device_count_per_worker > 1 else 'GPU'}, as the workers in the combination, but some GPUs among them fail to meet requirements."
  602. event_message.append(message)
  603. event_message.append(
  604. "Cannot find a suitable worker combination to run the model in distributed mode. "
  605. "If you are confident that the resources are sufficient, you may manually schedule the model by selecting the workers and GPUs."
  606. )
  607. self._event_collector.add(
  608. EventLevelEnum.INFO,
  609. EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU,
  610. str(event_message),
  611. )
  612. return []
  613. def _validate_arguments(self):
  614. tp = find_int_parameter(
  615. self._model.backend_parameters, ["tensor-parallel-size", "tp"]
  616. )
  617. if msg := self._check_tp_size_divisibility(tp):
  618. raise ValueError(
  619. msg + " Consider adjusting your tensor-parallel-size value."
  620. )
  621. def _create_candidate(
  622. model: Model,
  623. selected_workers: List[Worker],
  624. gpu_memory_utilization: float = 0.9,
  625. estimated_vram: Optional[int] = None,
  626. ) -> ModelInstanceScheduleCandidate:
  627. """
  628. Create a candidate with all GPUs from the selected workers.
  629. """
  630. main_worker = selected_workers[0]
  631. vram_claim_main = {
  632. gpu.index: int(gpu.memory.total * gpu_memory_utilization)
  633. for gpu in main_worker.status.gpu_devices
  634. }
  635. gpu_type = main_worker.status.gpu_devices[0].type
  636. candidate = ModelInstanceScheduleCandidate(
  637. worker=main_worker,
  638. gpu_type=gpu_type,
  639. gpu_indexes=[gpu.index for gpu in main_worker.status.gpu_devices],
  640. computed_resource_claim=ComputedResourceClaim(
  641. vram=vram_claim_main,
  642. ram=get_computed_ram_claim(model, vram_claim_main),
  643. estimated_vram=estimated_vram,
  644. ),
  645. )
  646. candidate.subordinate_workers = []
  647. for worker in selected_workers[1:]:
  648. vram_claim_subworker = {
  649. gpu.index: int(gpu.memory.total * gpu_memory_utilization)
  650. for gpu in worker.status.gpu_devices
  651. }
  652. candidate.subordinate_workers.append(
  653. ModelInstanceSubordinateWorker(
  654. worker_id=worker.id,
  655. worker_name=worker.name,
  656. worker_ip=worker.ip,
  657. worker_ifname=worker.ifname,
  658. total_gpus=len(worker.status.gpu_devices),
  659. gpu_type=gpu_type,
  660. gpu_indexes=[gpu.index for gpu in worker.status.gpu_devices],
  661. computed_resource_claim=ComputedResourceClaim(
  662. vram=vram_claim_subworker,
  663. ram=get_computed_ram_claim(model, vram_claim_subworker),
  664. estimated_vram=estimated_vram,
  665. ),
  666. )
  667. )
  668. return candidate