base_candidate_selector.py 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082
  1. from abc import ABC, abstractmethod
  2. from dataclasses import dataclass, fields
  3. import enum
  4. import logging
  5. from typing import Any, Dict, List, Optional, Tuple
  6. from gpustack.config import Config
  7. from gpustack.policies.event_recorder.recorder import EventCollector, EventLevelEnum
  8. from gpustack.schemas.models import (
  9. BackendEnum,
  10. CategoryEnum,
  11. ComputedResourceClaim,
  12. Model,
  13. ModelInstance,
  14. ModelInstanceSubordinateWorker,
  15. is_omni_model,
  16. )
  17. from gpustack.schemas.workers import Worker
  18. from gpustack.utils.hub import get_hf_text_config, get_max_model_len
  19. from gpustack.scheduler.calculator import get_pretrained_config_with_workers
  20. from gpustack.utils.gpu import (
  21. abbreviate_worker_gpu_indexes,
  22. group_gpu_ids_by_worker,
  23. group_gpu_indexes_by_gpu_type_and_worker,
  24. )
  25. from gpustack.policies.base import Allocatable, ModelInstanceScheduleCandidate
  26. from gpustack.policies.utils import (
  27. ListMessageBuilder,
  28. get_computed_ram_claim,
  29. get_model_num_attention_heads,
  30. get_model_vision_num_attention_heads,
  31. get_worker_allocatable_resource,
  32. get_worker_model_instances,
  33. sort_gpu_indexes_by_allocatable_rate,
  34. sort_selected_workers_by_gpu_type_and_resource,
  35. )
  36. from gpustack.utils.unit import byte_to_gib
  37. logger = logging.getLogger(__name__)
  38. EVENT_ACTION_DEFAULT = "default_scheduling_msg"
  39. EVENT_ACTION_MANUAL_MULTI = "manual_multi_gpu_scheduling_msg"
  40. EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU = "auto_multi_worker_multi_gpu_scheduling_msg"
  41. EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU = (
  42. "auto_single_worker_multi_gpu_scheduling_msg"
  43. )
  44. EVENT_ACTION_AUTO_SINGLE_GPU = "auto_single_gpu_scheduling_msg"
  45. @dataclass
  46. class RequestEstimateUsage:
  47. ram: int
  48. vram: int
  49. class ModelAttentionTypeEnum(enum.Enum):
  50. UNK = "unknown"
  51. MHA = "multi_head_attention"
  52. GQA = "grouped_query_attention"
  53. MQA = "multi_query_attention"
  54. MLA = "multi_head_latent_attention"
  55. @dataclass
  56. class ModelParameters:
  57. architectures: List[str] = None
  58. derived_max_seq_len: int = 0
  59. num_hidden_layers: int = 0
  60. hidden_size: Optional[int] = None
  61. vocab_size: Optional[int] = None
  62. num_attention_heads: Optional[int] = None
  63. num_key_value_heads: int = 1
  64. n_group: Optional[int] = None
  65. head_dim: Optional[int] = None
  66. q_lora_rank: Optional[int] = None
  67. kv_lora_rank: Optional[int] = None
  68. qk_rope_head_dim: Optional[int] = None
  69. qk_nope_head_dim: Optional[int] = None
  70. v_head_dim: Optional[int] = None
  71. torch_dtype: str = "bfloat16"
  72. quantize: Optional[str] = None
  73. quantization_config: Optional[Dict] = None
  74. moe_num_experts: Optional[int] = None
  75. moe_num_shared_experts: Optional[int] = None
  76. moe_intermediate_size: Optional[int] = None
  77. is_multimodel: bool = False
  78. vision_config: Optional[Dict] = None
  79. vision_num_attention_heads: Optional[int] = None
  80. def from_model_pretrained_config( # noqa: C901
  81. self, model: Model, pretrained_config: Any
  82. ):
  83. """
  84. Parse the model's (hyper)parameters from the model.
  85. """
  86. if hasattr(pretrained_config, "vision_config"):
  87. self.is_multimodel = True
  88. self.vision_config = pretrained_config.vision_config
  89. self.vision_num_attention_heads = get_model_vision_num_attention_heads(
  90. pretrained_config
  91. )
  92. # Get architectures first, it is not available in text_config.
  93. if hasattr(pretrained_config, "architectures"):
  94. self.architectures = pretrained_config.architectures
  95. pretrained_config = get_hf_text_config(pretrained_config)
  96. if pretrained_config is None and CategoryEnum.LLM in model.categories:
  97. # Exclude empty dict cases, as they indicate the locally-sourced model is not local to the server node.
  98. raise ValueError(f"Failed to get model {model.name} pretrained config")
  99. for attr_name in [attr.name for attr in fields(self.__class__)]:
  100. try:
  101. attr_value = getattr(pretrained_config, attr_name, None)
  102. if attr_value is not None:
  103. setattr(self, attr_name, attr_value)
  104. except AttributeError:
  105. # If reach here, that means the field is an internal property,
  106. # which would not register in the argument parser.
  107. pass
  108. # Default
  109. self.derived_max_seq_len = get_max_model_len(pretrained_config)
  110. if not self.num_attention_heads:
  111. # For backward compatibility, try to get num_attention_heads from llm_config.
  112. self.num_attention_heads = get_model_num_attention_heads(pretrained_config)
  113. if not self.head_dim and self.hidden_size and self.num_attention_heads:
  114. self.head_dim = self.hidden_size // self.num_attention_heads
  115. if not self.moe_num_experts:
  116. for key in [
  117. "n_routed_experts",
  118. "num_local_experts",
  119. "num_experts",
  120. ]:
  121. if value := getattr(pretrained_config, key, None):
  122. setattr(self, "moe_num_experts", value)
  123. break
  124. if self.moe_num_experts and not self.moe_num_shared_experts:
  125. for key in [
  126. "n_shared_experts",
  127. "num_shared_experts",
  128. ]:
  129. if value := getattr(pretrained_config, key, None):
  130. setattr(self, "moe_num_shared_experts", value)
  131. break
  132. def get_attention_type(self) -> ModelAttentionTypeEnum:
  133. """
  134. Get the attention type based on the hyperparameters.
  135. """
  136. if self.num_attention_heads:
  137. if self.num_key_value_heads == 1:
  138. return ModelAttentionTypeEnum.MQA
  139. elif (
  140. 1 < self.num_key_value_heads < self.num_attention_heads
  141. and self.num_attention_heads % self.num_key_value_heads == 0
  142. ):
  143. return ModelAttentionTypeEnum.GQA
  144. elif self.num_key_value_heads == self.num_attention_heads:
  145. if self.q_lora_rank and self.kv_lora_rank:
  146. return ModelAttentionTypeEnum.MLA
  147. return ModelAttentionTypeEnum.MHA
  148. return ModelAttentionTypeEnum.UNK
  149. class ScheduleCandidatesSelector(ABC):
  150. # GPUStack configuration.
  151. _config: Config
  152. # Model to be scheduled.
  153. _model: Model
  154. # Model hyperparameters.
  155. _model_params: ModelParameters
  156. # Frequently used model parameter in selectors.
  157. _num_attention_heads: int
  158. _vision_num_attention_heads: int
  159. # Number of GPUs required by the model.
  160. # Derived from GPU selectors(manual) or Parallelism parameters(auto).
  161. _gpu_count: int
  162. # Selected worker names if manual GPU selection is used.
  163. _selected_gpu_workers: Optional[List[str]]
  164. # Selected GPU indexes by gpu type and worker name if manual GPU selection is used.
  165. _selected_gpu_indexes_by_gpu_type_and_worker: Dict[str, Dict[str, List[int]]]
  166. # Worker VRAM totals by GPU indexs cache.
  167. _vram_totals_by_gpu_type_and_worker_and_gpu_idxs: Dict[
  168. str, Dict[str, Dict[int, int]]
  169. ]
  170. # Worker allocatable resource cache by gpu type.
  171. _workers_allocatable_resource_by_gpu_type: Dict[str, Dict[int, Allocatable]]
  172. def __init__(
  173. self,
  174. config: Config,
  175. model: Model,
  176. model_instances: List[ModelInstance],
  177. ):
  178. self._config = config
  179. self._model = model
  180. self._model_instances = model_instances
  181. self._model_params = ModelParameters()
  182. self._num_attention_heads = 0
  183. self._vision_num_attention_heads = 0
  184. self._gpu_count = 0
  185. self._selected_gpu_workers = None
  186. self._selected_gpu_indexes_by_gpu_type_and_worker = {}
  187. self._vram_totals_by_gpu_type_and_worker_and_gpu_idxs = {}
  188. self._workers_allocatable_resource_by_gpu_type = {}
  189. @abstractmethod
  190. def get_messages(self) -> List[str]:
  191. """
  192. Get diagnostic messages from the selector.
  193. :return: A list of diagnostic messages.
  194. """
  195. pass
  196. @abstractmethod
  197. async def select_candidates(
  198. self, workers: List[Worker]
  199. ) -> List[ModelInstanceScheduleCandidate]:
  200. """
  201. Get schedule candidates.
  202. :param workers: The list of workers to select from.
  203. :return: A list of schedule candidates.
  204. """
  205. pass
  206. @abstractmethod
  207. def _should_check_vision_tp_divisibility(self) -> bool:
  208. """
  209. Whether this backend enforces TP divisibility for vision attention heads.
  210. """
  211. pass
  212. async def _init_model_parameters(self, workers: List[Worker]):
  213. if is_omni_model(self._model):
  214. # Current model parameters are for llm-like models.
  215. return
  216. try:
  217. pretrained_config = await get_pretrained_config_with_workers(
  218. self._model,
  219. workers,
  220. trust_remote_code=True,
  221. )
  222. self._model_params.from_model_pretrained_config(
  223. self._model, pretrained_config
  224. )
  225. self._num_attention_heads = self._model_params.num_attention_heads
  226. self._vision_num_attention_heads = (
  227. self._model_params.vision_num_attention_heads
  228. )
  229. except Exception as e:
  230. raise ValueError(
  231. f"Failed to parse model {self._model.name} hyperparameters: {e}"
  232. )
  233. def _set_gpu_count(
  234. self,
  235. world_size: Optional[int] = None,
  236. strategies: Optional[List[str]] = None,
  237. ):
  238. model = self._model
  239. if model.gpu_selector and model.gpu_selector.gpu_ids:
  240. gpu_indexes_by_gpu_type_and_worker = (
  241. group_gpu_indexes_by_gpu_type_and_worker(model.gpu_selector.gpu_ids)
  242. )
  243. gpu_ids_by_worker = group_gpu_ids_by_worker(model.gpu_selector.gpu_ids)
  244. self._selected_gpu_workers = list(gpu_ids_by_worker.keys())
  245. self._selected_gpu_indexes_by_gpu_type_and_worker = (
  246. gpu_indexes_by_gpu_type_and_worker
  247. )
  248. self._gpu_count = model.gpu_selector.gpus_per_replica or len(
  249. model.gpu_selector.gpu_ids
  250. )
  251. # When world_size is set.
  252. if world_size and world_size > 0:
  253. if self._gpu_count and self._gpu_count != world_size:
  254. # Both gpu selector and parallelism are set, validate they match.
  255. strategies_str = "/".join(strategies) if strategies else "parallelism"
  256. raise ValueError(
  257. f"Model {model.name} has {strategies_str} set, but the selected gpu count ({self._gpu_count}) does not match the world size ({world_size})."
  258. )
  259. self._gpu_count = world_size
  260. def get_worker_allocatable_resource(
  261. self, worker: Worker, gpu_type: Optional[str] = None
  262. ) -> Allocatable:
  263. """
  264. Get the worker's allocatable resource, with caching for efficiency.
  265. Args:
  266. worker: Worker object
  267. gpu_type: Optional GPU type to filter
  268. Returns:
  269. Allocatable: The allocatable resource of the worker
  270. """
  271. allocatable = self._workers_allocatable_resource_by_gpu_type.get(
  272. gpu_type, {}
  273. ).get(worker.id)
  274. if allocatable is not None:
  275. return allocatable
  276. allocatable = get_worker_allocatable_resource(
  277. self._model_instances, worker, gpu_type
  278. )
  279. self._workers_allocatable_resource_by_gpu_type.setdefault(
  280. gpu_type, {}
  281. ).setdefault(worker.id, allocatable)
  282. return allocatable
  283. def _get_worker_vram_totals(
  284. self, worker: Worker, gpu_type: Optional[str] = None
  285. ) -> Dict[int, int]:
  286. """
  287. Get a mapping from GPU idxs to total VRAM for the given worker, filtered by gpu_type if provided.
  288. Uses cache for efficiency.
  289. Args:
  290. worker: Worker object
  291. Returns:
  292. Dict[int, int]: Mapping from GPU idxs to total VRAM (bytes)
  293. """
  294. vram_by_ids = self._vram_totals_by_gpu_type_and_worker_and_gpu_idxs.get(
  295. gpu_type, {}
  296. ).get(worker.name)
  297. if vram_by_ids is not None:
  298. return vram_by_ids
  299. if not worker.status or not worker.status.gpu_devices:
  300. return {}
  301. vram_by_idxs = {
  302. gpu.index: gpu.memory.total if gpu.memory and gpu.memory.total else 0
  303. for gpu in worker.status.gpu_devices
  304. if gpu.index is not None and (gpu_type is None or gpu.type == gpu_type)
  305. }
  306. self._vram_totals_by_gpu_type_and_worker_and_gpu_idxs.setdefault(gpu_type, {})[
  307. worker.name
  308. ] = vram_by_idxs
  309. return vram_by_idxs
  310. def _get_worker_resource_claim(
  311. self,
  312. worker: Worker,
  313. gpu_indexes: List[int],
  314. gpu_memory_utilization: float,
  315. request: Optional[RequestEstimateUsage] = None,
  316. gpu_type: Optional[str] = None,
  317. ) -> Dict[int, int]:
  318. """
  319. Given a worker and gpu indexes, get the vram claim according to gpu_memory_utilization.
  320. Returns a dictionary of gpu index to vram claim in bytes.
  321. """
  322. vram_claim: Dict[int, int] = {}
  323. for gpu in worker.status.gpu_devices:
  324. if gpu.index not in gpu_indexes or (
  325. gpu_type is not None and gpu.type != gpu_type
  326. ):
  327. continue
  328. claim = 0
  329. # LLMs
  330. if gpu_memory_utilization > 0:
  331. claim = int(gpu.memory.total * gpu_memory_utilization)
  332. # non LLMs
  333. else:
  334. if request is not None:
  335. claim = int(request.vram / len(gpu_indexes))
  336. else:
  337. claim = 0
  338. vram_claim[gpu.index] = claim
  339. return vram_claim
  340. def _get_worker_gpu_addresses(
  341. self,
  342. worker: Worker,
  343. gpu_indexes: List[int],
  344. gpu_type: Optional[str] = None,
  345. ) -> List[str]:
  346. """
  347. Given a worker and gpu indexes, get the gpu addresses according to gpu_indexes.
  348. Returns a list of GPU addresses.
  349. """
  350. gpu_addresses = []
  351. if self._model.backend != BackendEnum.ASCEND_MINDIE.value:
  352. return gpu_addresses
  353. for gpu in worker.status.gpu_devices:
  354. if gpu.index not in gpu_indexes and (
  355. gpu_type is not None and gpu.type != gpu_type
  356. ):
  357. continue
  358. addr = (
  359. gpu.network.inet
  360. if gpu.network and gpu.network.status == 'up' and gpu.network.inet
  361. else "-.-.-.-"
  362. )
  363. gpu_addresses.append(addr)
  364. return gpu_addresses
  365. def _generate_manual_selected_gpus_overcommit_message( # noqa: C901
  366. self,
  367. candidate: ModelInstanceScheduleCandidate,
  368. workers: List[Worker],
  369. request: RequestEstimateUsage,
  370. gpu_memory_utilization: float,
  371. ) -> Tuple[bool, str, float]:
  372. """Check whether the candidate is overcommit and generate overcommit message.
  373. Args:
  374. candidate (ModelInstanceScheduleCandidate): The candidate to check.
  375. workers (List[Worker]): The list of workers.
  376. vram_claim (int): The model VRAM claim in bytes.
  377. gpu_memory_utilization (float): The required GPU memory utilization ratio.
  378. Returns:
  379. Tuple[bool, str, float]: A tuple of (is_overcommit, message, effective_vram).
  380. """
  381. if not self._model.gpu_selector or not self._model.gpu_selector.gpu_ids:
  382. return (False, "", 0)
  383. # 1. Build worker set for easy access.
  384. worker_set = {w.name: w for w in workers}
  385. # 2. Summarize the total VRAM and the number that meets the utilization for selected GPUs.
  386. # Generate:
  387. # - total_vram
  388. # - satisfied_gpus_by_worker
  389. # - satisfied_gpus_total_allocatable_vram_for_non_llms
  390. # - selected_gpus_total_allocatable_vram_for_non_llms
  391. selected_gpu_count = len(self._model.gpu_selector.gpu_ids)
  392. satisfied_gpus_by_worker = {}
  393. satisfied_gpus_count = 0
  394. satisfied_gpus_total_allocatable_vram_for_non_llms = 0
  395. selected_gpus_total_allocatable_vram_for_non_llms = 0
  396. for (
  397. gpu_type,
  398. worker_gpu_indexes,
  399. ) in self._selected_gpu_indexes_by_gpu_type_and_worker.items():
  400. for wn, g in worker_gpu_indexes.items():
  401. w = worker_set.get(wn)
  402. if w is None:
  403. continue
  404. wa = self.get_worker_allocatable_resource(w, gpu_type)
  405. vram_total_by_index = self._get_worker_vram_totals(w, gpu_type)
  406. if wa.vram is None:
  407. continue
  408. for gpu_index in g:
  409. total_vram = vram_total_by_index.get(gpu_index)
  410. allocatable_vram = wa.vram.get(gpu_index)
  411. selected_gpus_total_allocatable_vram_for_non_llms += (
  412. allocatable_vram or 0
  413. )
  414. if gpu_memory_utilization == 0: # non LLMs
  415. satisfied_gpus_count += 1
  416. satisfied_gpus_total_allocatable_vram_for_non_llms += (
  417. allocatable_vram or 0
  418. )
  419. satisfied_gpus_by_worker.setdefault(gpu_type, {}).setdefault(
  420. wn, {}
  421. )[gpu_index] = True
  422. else: # LLMs
  423. if total_vram is None or allocatable_vram is None:
  424. continue
  425. allocatable_gpu_memory_utilization = (
  426. allocatable_vram / total_vram
  427. )
  428. if allocatable_gpu_memory_utilization >= gpu_memory_utilization:
  429. satisfied_gpus_count += 1
  430. satisfied_gpus_by_worker.setdefault(
  431. gpu_type, {}
  432. ).setdefault(wn, {})[gpu_index] = True
  433. # 3. Summarize the total VRAM and the number that meets the utilization for used GPUs.
  434. used_gpu_count = 0
  435. used_satisfied_count = 0
  436. used_gpus_total_allocatable_vram_for_non_llms = 0
  437. used_vram_claim_total = 0
  438. used_ram_claim_total = 0
  439. pairs = [
  440. (
  441. candidate.worker.name,
  442. candidate.gpu_type,
  443. candidate.gpu_indexes,
  444. candidate.computed_resource_claim,
  445. )
  446. ]
  447. if candidate.subordinate_workers:
  448. for sw in candidate.subordinate_workers:
  449. pairs.append(
  450. (
  451. sw.worker_name,
  452. sw.gpu_type,
  453. sw.gpu_indexes,
  454. sw.computed_resource_claim,
  455. )
  456. )
  457. for wn, gpu_type, indexes, claim in pairs:
  458. w = worker_set.get(wn)
  459. wa = self.get_worker_allocatable_resource(w, gpu_type)
  460. used_vram_claim_total += (
  461. sum(v for k, v in claim.vram.items()) if claim.vram else 0
  462. )
  463. used_ram_claim_total += claim.ram or 0
  464. used_gpu_count += len(indexes)
  465. for idx in indexes:
  466. allocatable_vram = wa.vram.get(idx)
  467. used_gpus_total_allocatable_vram_for_non_llms += allocatable_vram or 0
  468. if (
  469. satisfied_gpus_by_worker.get(gpu_type, {})
  470. .get(wn, {})
  471. .get(idx, None)
  472. is not None
  473. ):
  474. used_satisfied_count += 1
  475. # 4. Determine if overcommit occurred.
  476. overcommit = (
  477. used_vram_claim_total < request.vram
  478. or (request.ram > 0 and used_ram_claim_total < request.ram)
  479. or used_satisfied_count < used_gpu_count
  480. )
  481. if not overcommit:
  482. return (False, "", 0)
  483. # 5. Build scheduling message.
  484. using_partial = used_gpu_count < selected_gpu_count
  485. used_devices_msg = abbreviate_worker_gpu_indexes(
  486. candidate.worker.name,
  487. candidate.gpu_indexes,
  488. len(candidate.subordinate_workers or []),
  489. (used_gpu_count - len(candidate.gpu_indexes)),
  490. 8,
  491. )
  492. scheduling_msg = ListMessageBuilder(
  493. "Manual GPU selection resulted in resource overcommit."
  494. )
  495. effective_vram = 0
  496. if gpu_memory_utilization == 0:
  497. # non-LLM case
  498. if using_partial:
  499. effective_vram = used_gpus_total_allocatable_vram_for_non_llms
  500. scheduling_msg.extend(
  501. [
  502. f"Using {used_devices_msg} out of {selected_gpu_count} selected devices.",
  503. f"Used GPUs provide {byte_to_gib(effective_vram)} GiB allocatable VRAM.",
  504. ]
  505. )
  506. else:
  507. effective_vram = selected_gpus_total_allocatable_vram_for_non_llms
  508. scheduling_msg.append(
  509. f"Selected GPUs have {byte_to_gib(effective_vram)} GiB of VRAM."
  510. )
  511. else:
  512. # LLM case
  513. if using_partial:
  514. effective_vram = (
  515. used_gpus_total_allocatable_vram_for_non_llms
  516. * gpu_memory_utilization
  517. )
  518. scheduling_msg.extend(
  519. [
  520. f"Using {used_devices_msg} out of {selected_gpu_count} selected devices.",
  521. f"Used GPUs provide {byte_to_gib(used_gpus_total_allocatable_vram_for_non_llms):.2f} GiB allocatable VRAM, "
  522. f"{used_satisfied_count}/{used_gpu_count} of GPUs meet the VRAM utilization ratio, "
  523. f"providing {byte_to_gib(effective_vram):.2f} GiB of allocatable VRAM.",
  524. ]
  525. )
  526. else:
  527. effective_vram = (
  528. selected_gpus_total_allocatable_vram_for_non_llms
  529. * gpu_memory_utilization
  530. )
  531. scheduling_msg.extend(
  532. [
  533. f"Selected GPUs have {byte_to_gib(selected_gpus_total_allocatable_vram_for_non_llms):.2f} GiB allocatable VRAM, "
  534. f"{satisfied_gpus_count}/{selected_gpu_count} of GPUs meet the VRAM utilization ratio, "
  535. f"providing {byte_to_gib(effective_vram):.2f} GiB of allocatable VRAM."
  536. ]
  537. )
  538. return (True, str(scheduling_msg), effective_vram)
  539. def _get_non_overcommit_and_best_overcommit_candidates(
  540. self,
  541. candidates: List[ModelInstanceScheduleCandidate],
  542. workers: List[Worker],
  543. request: RequestEstimateUsage,
  544. gpu_memory_utilization: Dict[str, float],
  545. ) -> Tuple[
  546. List[ModelInstanceScheduleCandidate],
  547. Optional[ModelInstanceScheduleCandidate],
  548. Optional[str],
  549. ]:
  550. """Separate non-overcommit candidates and find the best overcommit candidate.
  551. Args:
  552. candidates (List[ModelInstanceScheduleCandidate]): The list of candidates to check.
  553. workers (List[Worker]): The list of workers.
  554. request (RequestEstimateUsage): The estimated resource usage request.
  555. gpu_memory_utilization (Dict[str, float]): The required GPU memory utilization ratio by GPU type, key is the GPU type, * is for all types.
  556. Returns:
  557. Tuple[List[ModelInstanceScheduleCandidate], Optional[ModelInstanceScheduleCandidate], Optional[str]]:
  558. A tuple of (non_overcommit_candidates, best_overcommit_candidate, overcommit_message).
  559. """
  560. non_overcommits_candidates = []
  561. max_effective_vram = 0
  562. best_overcommit_candidate = None
  563. overcommit_msg = None
  564. for c in candidates:
  565. gpu_memory_utilization_for_type = gpu_memory_utilization.get(
  566. c.gpu_type,
  567. gpu_memory_utilization.get("*", 0),
  568. )
  569. overcommit, msg, effective_vram = (
  570. self._generate_manual_selected_gpus_overcommit_message(
  571. c, workers, request, gpu_memory_utilization_for_type
  572. )
  573. )
  574. c.overcommit = overcommit
  575. if not overcommit:
  576. non_overcommits_candidates.append(c)
  577. elif effective_vram >= max_effective_vram:
  578. max_effective_vram = effective_vram
  579. best_overcommit_candidate = c
  580. overcommit_msg = msg
  581. if non_overcommits_candidates:
  582. return non_overcommits_candidates, None, None
  583. return non_overcommits_candidates, best_overcommit_candidate, overcommit_msg
  584. def _find_manual_gpu_selection_candidates( # noqa: C901
  585. self,
  586. workers: List[Worker],
  587. gpu_memory_utilization: Dict[str, float],
  588. request: RequestEstimateUsage,
  589. event_collector: EventCollector,
  590. ) -> List[ModelInstanceScheduleCandidate]:
  591. """
  592. Find candidates for manual GPU selection based on user-specified GPU IDs.
  593. This function handles all manual GPU selection scenarios for vLLM.
  594. args:
  595. workers: List of available workers.
  596. gpu_memory_utilization: Required GPU memory utilization ratio by GPU type, key is the GPU type, * is for all types.
  597. request: The estimated resource usage request.
  598. event_collector: Event collector for logging events.
  599. """
  600. # Skip if no manual GPU selection is specified
  601. if not self._selected_gpu_workers:
  602. return []
  603. # Not allow heterogeneous gpu types
  604. if (
  605. self._gpu_count == len(self._model.gpu_selector.gpu_ids)
  606. and len(self._selected_gpu_indexes_by_gpu_type_and_worker.keys()) > 1
  607. ):
  608. event_collector.add(
  609. EventLevelEnum.ERROR,
  610. EVENT_ACTION_MANUAL_MULTI,
  611. str(
  612. ListMessageBuilder(
  613. "Deployment with heterogeneous GPU types is not supported, please select GPUs of the same type or update GPUs per replica.",
  614. )
  615. ),
  616. )
  617. return []
  618. logger.debug(
  619. f"Manual GPU selection: workers={self._selected_gpu_workers}, "
  620. f"worker_count={len(self._selected_gpu_workers)}, "
  621. f"gpu_indexes_by_worker_and_type={self._selected_gpu_indexes_by_gpu_type_and_worker}"
  622. )
  623. candidates = []
  624. # Filter and sort selected workers by resource
  625. selected_workers_by_gpu_type = sort_selected_workers_by_gpu_type_and_resource(
  626. workers,
  627. self._selected_gpu_indexes_by_gpu_type_and_worker,
  628. self.get_worker_allocatable_resource,
  629. )
  630. # Handle single-worker single gpu scenarios
  631. if self._gpu_count == 1:
  632. for gpu_type, workers_of_type in selected_workers_by_gpu_type.items():
  633. gpu_memory_utilization_for_type = gpu_memory_utilization.get(
  634. gpu_type,
  635. gpu_memory_utilization.get("*", 0),
  636. )
  637. for worker in workers_of_type:
  638. for gpu in worker.status.gpu_devices:
  639. worker_candidates = (
  640. self._manual_select_single_worker_multi_gpu_candidates(
  641. worker,
  642. [gpu.index],
  643. gpu_memory_utilization_for_type,
  644. request,
  645. gpu_type,
  646. )
  647. )
  648. candidates.extend(worker_candidates)
  649. # Handle single-worker multi-GPU and multi-worker scenarios
  650. elif self._gpu_count > 1:
  651. # Single-worker multi-GPU selection
  652. for gpu_type, workers_of_type in selected_workers_by_gpu_type.items():
  653. gpu_memory_utilization_for_type = gpu_memory_utilization.get(
  654. gpu_type,
  655. gpu_memory_utilization.get("*", 0),
  656. )
  657. for worker in workers_of_type:
  658. selected_gpu_indexes = [
  659. gpu.index for gpu in worker.status.gpu_devices
  660. ]
  661. if selected_gpu_indexes is None:
  662. continue
  663. worker_candidates = (
  664. self._manual_select_single_worker_multi_gpu_candidates(
  665. worker,
  666. selected_gpu_indexes,
  667. gpu_memory_utilization_for_type,
  668. request,
  669. gpu_type,
  670. )
  671. )
  672. candidates.extend(worker_candidates)
  673. # Multi-worker multi-GPU selection
  674. if not candidates:
  675. for gpu_type, selected_workers in selected_workers_by_gpu_type.items():
  676. gpu_memory_utilization_for_type = gpu_memory_utilization.get(
  677. gpu_type,
  678. gpu_memory_utilization.get("*", 0),
  679. )
  680. worker_candidates = (
  681. self._manual_select_multi_worker_multi_gpu_candidates(
  682. selected_workers,
  683. gpu_memory_utilization_for_type,
  684. request,
  685. gpu_type,
  686. )
  687. )
  688. candidates.extend(worker_candidates)
  689. if not candidates:
  690. return []
  691. # Separate non-overcommit and overcommit candidates
  692. non_overcommits_candidates, best_overcommit_candidate, overcommit_msg = (
  693. self._get_non_overcommit_and_best_overcommit_candidates(
  694. candidates, workers, request, gpu_memory_utilization
  695. )
  696. )
  697. # Return non-overcommit candidates if any
  698. if non_overcommits_candidates:
  699. return non_overcommits_candidates
  700. # Handle overcommit candidates
  701. if self._model.replicas > 1:
  702. event_collector.add(
  703. EventLevelEnum.INFO,
  704. EVENT_ACTION_MANUAL_MULTI,
  705. str(
  706. ListMessageBuilder(
  707. f"Found {len(candidates) - len(non_overcommits_candidates)} candidate, manual scheduling for multi-replica model instances does not allow overcommit or heterogeneous deployment topologies.",
  708. )
  709. ),
  710. )
  711. return []
  712. if best_overcommit_candidate:
  713. event_collector.add(
  714. EventLevelEnum.INFO, EVENT_ACTION_MANUAL_MULTI, overcommit_msg
  715. )
  716. return [best_overcommit_candidate]
  717. return []
  718. def _manual_select_single_worker_multi_gpu_candidates(
  719. self,
  720. worker: Worker,
  721. gpu_indexes: List[int],
  722. gpu_memory_utilization: float,
  723. request: RequestEstimateUsage,
  724. gpu_type: Optional[str] = None,
  725. ) -> List[ModelInstanceScheduleCandidate]:
  726. """Manually select multi GPU candidates."""
  727. # Early exit
  728. if len(gpu_indexes) < self._gpu_count:
  729. return []
  730. allocatable = self.get_worker_allocatable_resource(worker, gpu_type)
  731. vram_totals_by_gpu_idx = self._get_worker_vram_totals(worker, gpu_type)
  732. # Check if the GPU is satisfied the requirement
  733. satisfied_gpu_indexes = []
  734. unsatisfied_gpu_indexes = []
  735. for gpu_index in gpu_indexes:
  736. allocatable_vram = allocatable.vram.get(gpu_index, 0)
  737. total_vram = vram_totals_by_gpu_idx.get(gpu_index, 0)
  738. # LLMs
  739. if gpu_memory_utilization > 0:
  740. if (
  741. total_vram > 0
  742. and allocatable_vram / total_vram >= gpu_memory_utilization
  743. ):
  744. satisfied_gpu_indexes.append(gpu_index)
  745. else:
  746. unsatisfied_gpu_indexes.append(gpu_index)
  747. # non LLMs
  748. else:
  749. satisfied_gpu_indexes.append(gpu_index)
  750. if len(satisfied_gpu_indexes) >= self._gpu_count:
  751. break
  752. # Extend with unsatisfied gpu indexes if not enough satisfied gpus
  753. used_gpu_indexes = satisfied_gpu_indexes.copy()
  754. used_gpu_indexes.extend(
  755. unsatisfied_gpu_indexes[: self._gpu_count - len(satisfied_gpu_indexes)]
  756. )
  757. # Get vram claims for used gpus
  758. vram_claims = self._get_worker_resource_claim(
  759. worker, used_gpu_indexes, gpu_memory_utilization, request, gpu_type
  760. )
  761. return [
  762. ModelInstanceScheduleCandidate(
  763. worker=worker,
  764. gpu_type=gpu_type,
  765. gpu_indexes=used_gpu_indexes,
  766. gpu_addresses=self._get_worker_gpu_addresses(
  767. worker, used_gpu_indexes, gpu_type
  768. ),
  769. computed_resource_claim=ComputedResourceClaim(
  770. vram=vram_claims,
  771. ram=get_computed_ram_claim(self._model, vram_claims, request.ram),
  772. vram_utilization=gpu_memory_utilization,
  773. ),
  774. )
  775. ]
  776. def _manual_select_multi_worker_multi_gpu_candidates(
  777. self,
  778. workers: List[Worker],
  779. gpu_memory_utilization: float,
  780. request: RequestEstimateUsage,
  781. gpu_type: Optional[str] = None,
  782. ) -> List[ModelInstanceScheduleCandidate]:
  783. """Manual select multi worker multi GPU candidates."""
  784. if len(workers) < 2:
  785. return []
  786. # Main worker is the first one (with most GPUs)
  787. main_worker = workers[0]
  788. main_worker_name = main_worker.name
  789. main_gpu_indexes = self._selected_gpu_indexes_by_gpu_type_and_worker.get(
  790. gpu_type, {}
  791. ).get(main_worker_name, [])
  792. main_vram_claim = self._get_worker_resource_claim(
  793. main_worker, main_gpu_indexes, gpu_memory_utilization, request, gpu_type
  794. )
  795. # Handle subordinate workers
  796. subordinate_workers: List[ModelInstanceSubordinateWorker] = []
  797. for worker in workers:
  798. # Skip if the worker is not selected
  799. if (
  800. self._selected_gpu_indexes_by_gpu_type_and_worker.get(gpu_type, {}).get(
  801. worker.name
  802. )
  803. is None
  804. ):
  805. continue
  806. if worker.name == main_worker_name:
  807. continue
  808. if not self._validate_distributed_limit_per_worker(worker):
  809. continue
  810. # Sort GPUs by allocatable rate
  811. gpu_indexes = self._selected_gpu_indexes_by_gpu_type_and_worker.get(
  812. gpu_type, {}
  813. ).get(worker.name, [])
  814. vram_allocatable = self._get_worker_resource_claim(
  815. worker, gpu_indexes, gpu_memory_utilization, request, gpu_type
  816. )
  817. sorted_gpu_indexes = sort_gpu_indexes_by_allocatable_rate(
  818. worker, vram_allocatable, gpu_type
  819. )
  820. # Calculate how many GPUs can be assigned to this subordinate worker
  821. current_gpu_count = len(main_gpu_indexes) + sum(
  822. len(sw.gpu_indexes) for sw in subordinate_workers
  823. )
  824. remaining_gpu_count = max(self._gpu_count - current_gpu_count, 0)
  825. assign_count = min(len(gpu_indexes), remaining_gpu_count)
  826. # Assign GPUs to the subordinate worker
  827. sw_gpu_indexes = sorted_gpu_indexes[:assign_count]
  828. vram_claim = self._get_worker_resource_claim(
  829. worker, sw_gpu_indexes, gpu_memory_utilization, request, gpu_type
  830. )
  831. subordinate_workers.append(
  832. ModelInstanceSubordinateWorker(
  833. worker_id=worker.id,
  834. worker_name=worker.name,
  835. worker_ip=worker.ip,
  836. worker_ifname=worker.ifname,
  837. total_gpus=len(worker.status.gpu_devices),
  838. gpu_type=gpu_type,
  839. gpu_indexes=sw_gpu_indexes,
  840. gpu_addresses=self._get_worker_gpu_addresses(
  841. worker, sw_gpu_indexes
  842. ),
  843. computed_resource_claim=ComputedResourceClaim(
  844. vram=vram_claim,
  845. ram=get_computed_ram_claim(
  846. self._model, vram_claim, request.ram
  847. ),
  848. vram_utilization=gpu_memory_utilization,
  849. ),
  850. )
  851. )
  852. current_gpu_count = len(main_gpu_indexes) + sum(
  853. len(sw.gpu_indexes) for sw in subordinate_workers
  854. )
  855. if self._gpu_count and current_gpu_count >= self._gpu_count:
  856. break
  857. if not subordinate_workers:
  858. return []
  859. return [
  860. ModelInstanceScheduleCandidate(
  861. worker=main_worker,
  862. gpu_type=gpu_type,
  863. gpu_indexes=main_gpu_indexes,
  864. gpu_addresses=self._get_worker_gpu_addresses(
  865. main_worker, main_gpu_indexes
  866. ),
  867. computed_resource_claim=ComputedResourceClaim(
  868. vram=main_vram_claim,
  869. ram=get_computed_ram_claim(
  870. self._model, main_vram_claim, request.ram
  871. ),
  872. ),
  873. subordinate_workers=subordinate_workers,
  874. )
  875. ]
  876. def _validate_distributed_limit_per_worker(self, worker: Worker) -> bool:
  877. """
  878. Validate that there is no more than one distributed vLLM instance per worker.
  879. """
  880. instances = get_worker_model_instances(self._model_instances, worker)
  881. for instance in instances:
  882. if (
  883. instance.distributed_servers
  884. and instance.distributed_servers.subordinate_workers
  885. and (
  886. instance.model
  887. and instance.model.backend
  888. and instance.model.backend == self._model.backend
  889. )
  890. ):
  891. self._messages = [
  892. str(
  893. ListMessageBuilder(
  894. f"Each worker can run only one distributed vLLM instance. Worker '{worker.name}' already has '{instance.name}'."
  895. )
  896. ),
  897. ]
  898. return False
  899. return True
  900. def _is_tp_size_divisible(self, tp_size: int) -> bool:
  901. """
  902. Check whether InferenceBackend's constraint of parameter divisibility is satisfied.
  903. 1. num_attention_heads
  904. 2. vision_num_attention_heads
  905. 3. vocab_size
  906. Notes on `tp_size` (tensor parallel size) usage in auto scheduling:
  907. - Single-worker multi-GPU: `tp_size` is the number of GPUs currently selected
  908. in the traversal (i.e., `gpu_sum`). The scheduler grows the candidate set
  909. incrementally and validates divisibility at each step.
  910. - Multi-worker multi-GPU: selected workers are constrained to have the same
  911. number of GPUs, so `tp_size` equals the per-worker GPU count (i.e., `gpu_count`)
  912. """
  913. if not tp_size:
  914. return False
  915. if self._num_attention_heads and self._num_attention_heads % tp_size != 0:
  916. return False
  917. if (
  918. self._should_check_vision_tp_divisibility()
  919. and self._vision_num_attention_heads
  920. and self._vision_num_attention_heads % tp_size != 0
  921. ):
  922. return False
  923. if (
  924. self._model_params.vocab_size
  925. and self._model_params.vocab_size % tp_size != 0
  926. ):
  927. return False
  928. return True
  929. def _check_tp_size_divisibility(
  930. self,
  931. tp_size: int,
  932. ) -> Optional[str]:
  933. """
  934. Check whether InferenceBackend's constraint of parameter divisibility is satisfied.
  935. 1. num_attention_heads
  936. 2. vision_num_attention_heads
  937. 3. vocab_size
  938. Return:
  939. None if divisibility is satisfied, otherwise an error message.
  940. """
  941. if not tp_size:
  942. return None
  943. if self._num_attention_heads and self._num_attention_heads % tp_size != 0:
  944. return (
  945. f"Total number of attention heads ({self._num_attention_heads})"
  946. " must be divisible by tensor parallel size "
  947. f"({tp_size})."
  948. )
  949. if (
  950. self._should_check_vision_tp_divisibility()
  951. and self._vision_num_attention_heads
  952. and self._vision_num_attention_heads % tp_size != 0
  953. ):
  954. return (
  955. f"Total number of vision attention heads ({self._vision_num_attention_heads})"
  956. " must be divisible by tensor parallel size "
  957. f"({tp_size})."
  958. )
  959. if (
  960. self._model_params.vocab_size
  961. and self._model_params.vocab_size % tp_size != 0
  962. ):
  963. return (
  964. f"Vocabulary size ({self._model_params.vocab_size})"
  965. " must be divisible by tensor parallel size "
  966. f"({tp_size})."
  967. )
  968. return None