sglang_resource_fit_selector.py 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268
  1. import logging
  2. from collections import defaultdict
  3. from typing import List, Optional, Dict, Tuple
  4. from transformers.utils import strtobool
  5. from gpustack.policies.base import ModelInstanceScheduleCandidate
  6. from gpustack.policies.candidate_selectors.base_candidate_selector import (
  7. EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU,
  8. EVENT_ACTION_AUTO_SINGLE_GPU,
  9. EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU,
  10. EVENT_ACTION_DEFAULT,
  11. EVENT_ACTION_MANUAL_MULTI,
  12. ModelParameters,
  13. RequestEstimateUsage,
  14. ScheduleCandidatesSelector,
  15. )
  16. from gpustack.policies.event_recorder.recorder import EventCollector, EventLevelEnum
  17. from gpustack.policies.utils import (
  18. get_computed_ram_claim,
  19. ListMessageBuilder,
  20. group_worker_gpu_by_memory,
  21. WorkerGPUInfo,
  22. estimate_model_vram,
  23. get_model_ram_claim,
  24. group_workers_by_gpu_type,
  25. ram_not_enough,
  26. sort_workers_by_gpu_count,
  27. estimate_diffusion_model_vram,
  28. get_worker_allocatable_resource,
  29. sort_gpu_indexes_by_allocatable_rate,
  30. )
  31. from gpustack.schemas.models import (
  32. ComputedResourceClaim,
  33. Model,
  34. ModelInstance,
  35. ModelInstanceSubordinateWorker,
  36. CategoryEnum,
  37. )
  38. from gpustack.schemas.workers import Worker
  39. from gpustack.config import Config
  40. from gpustack.utils.command import (
  41. find_bool_parameter,
  42. find_parameter,
  43. find_int_parameter,
  44. )
  45. from gpustack.utils.unit import byte_to_gib, byte_to_mib
  46. logger = logging.getLogger(__name__)
  47. class SGLangResourceFitSelector(ScheduleCandidatesSelector):
  48. def __init__(
  49. self,
  50. cfg: Config,
  51. model: Model,
  52. model_instances: List[ModelInstance],
  53. ):
  54. super().__init__(cfg, model, model_instances)
  55. self._vram_claim = 0
  56. self._ram_claim = 0
  57. self._messages = []
  58. self._event_collector = EventCollector(self._model, logger)
  59. self._param_mem_fraction_static = 0
  60. self._mem_fraction_static_by_gpu_type = {"*": 0}
  61. self._tp_size = 1
  62. self._pp_size = 1
  63. self._dp_size = 1
  64. self._chunked_prefill_size = None
  65. self._cuda_graph_max_bs = None
  66. # for multi worker schedule
  67. self._largest_multi_gpu_vram: int = 0
  68. self._largest_multi_gpu_total = 0
  69. self._largest_multi_gpu_utilization_satisfied_count = 0
  70. self._unsatisfied_gpu_messages: Dict[str, List[int]] = {}
  71. self._is_diffusion = CategoryEnum.IMAGE in self._model.categories
  72. world_size, strategies = (
  73. SGLangResourceFitSelector.get_world_size_from_backend_parameters(model)
  74. )
  75. self._set_gpu_count(world_size, strategies)
  76. async def _init_model_parameters(self, workers: List[Worker]):
  77. await super()._init_model_parameters(workers)
  78. self._validate_and_set_arguments()
  79. def _should_check_vision_tp_divisibility(self) -> bool:
  80. if not self._model.backend_parameters:
  81. return True
  82. language_only = find_bool_parameter(
  83. self._model.backend_parameters, ["language-only"]
  84. )
  85. return not language_only
  86. @staticmethod
  87. def get_world_size_from_backend_parameters(
  88. model: Model,
  89. ) -> Tuple[Optional[int], Optional[List[str]]]:
  90. tp = find_int_parameter(
  91. model.backend_parameters, ["tp-size", "tensor-parallel-size"]
  92. )
  93. pp = find_int_parameter(
  94. model.backend_parameters, ["pp-size", "pipeline-parallel-size"]
  95. )
  96. dp = find_int_parameter(
  97. model.backend_parameters, ["dp-size", "data-parallel-size"]
  98. )
  99. dp_attention = find_bool_parameter(
  100. model.backend_parameters, ["enable-dp-attention"]
  101. )
  102. if dp_attention:
  103. # For DP attention, it's using the TP group GPUs for data parallelism.
  104. # So we don't need to consider DP size for GPU count calculation.
  105. dp = None
  106. if tp or pp or dp:
  107. world_size = 1
  108. strategies = []
  109. if tp:
  110. strategies.append("tp")
  111. world_size *= tp
  112. if pp:
  113. strategies.append("pp")
  114. world_size *= pp
  115. if dp:
  116. strategies.append("dp")
  117. world_size *= dp
  118. return world_size, strategies
  119. return None, None
  120. def _get_nnodes(self) -> int:
  121. if self._model.backend_parameters:
  122. nnodes_param = find_parameter(self._model.backend_parameters, ["nnodes"])
  123. if nnodes_param:
  124. return int(nnodes_param)
  125. return 1
  126. def _validate_and_set_arguments(self):
  127. model = self._model
  128. self._tp_size = (
  129. find_int_parameter(
  130. model.backend_parameters, ["tp-size", "tensor-parallel-size"]
  131. )
  132. or 1
  133. )
  134. num_attention_heads = self._model_params.num_attention_heads
  135. if (
  136. self._tp_size
  137. and num_attention_heads
  138. and num_attention_heads % self._tp_size != 0
  139. ):
  140. raise ValueError(
  141. f"Total number of attention heads ({num_attention_heads})"
  142. " must be divisible by tp-size "
  143. f"({self._tp_size})."
  144. )
  145. self._pp_size = (
  146. find_int_parameter(
  147. model.backend_parameters, ["pp-size", "pipeline-parallel-size"]
  148. )
  149. or 1
  150. )
  151. self._dp_size = (
  152. find_int_parameter(
  153. model.backend_parameters, ["dp-size", "data-parallel-size"]
  154. )
  155. or 1
  156. )
  157. enable_mixed_chunk_param = find_parameter(
  158. model.backend_parameters, ["enable-mixed-chunk"]
  159. )
  160. enable_mixed_chunk = (
  161. strtobool(enable_mixed_chunk_param)
  162. if enable_mixed_chunk_param is not None
  163. else False
  164. )
  165. nnodes = self._get_nnodes()
  166. if self._tp_size or self._pp_size:
  167. world_size = int(self._tp_size or 1) * int(self._pp_size or 1)
  168. if world_size % nnodes != 0:
  169. raise ValueError(f"tp-size {world_size} must be divisible by nnodes")
  170. _speculative_algorithm = find_parameter(
  171. model.backend_parameters, ["speculative-algorithm"]
  172. )
  173. if self._pp_size and int(self._pp_size) > 1:
  174. if _speculative_algorithm is not None or enable_mixed_chunk:
  175. # We don't need to check overlap schedule. SGLang ignore this conflict and proceed.
  176. # Ref: https://github.com/sgl-project/sglang/blob/64480ec7124b8c23d9560746ca20415bfaf97a8e/python/sglang/srt/server_args.py#L1548-L1553
  177. raise ValueError(
  178. "Pipeline parallelism is not compatible with overlap schedule, speculative decoding, mixed chunked prefill."
  179. )
  180. _enable_dp_attention = find_bool_parameter(
  181. model.backend_parameters, ["enable-dp-attention"]
  182. )
  183. if self._dp_size and int(self._dp_size) > 1 and nnodes != 1:
  184. if not _enable_dp_attention:
  185. raise ValueError(
  186. "multi-node data parallel is not supported unless dp attention!"
  187. )
  188. if _speculative_algorithm is not None and enable_mixed_chunk:
  189. raise ValueError("enable_mixed_chunk is required for speculative decoding")
  190. if message := self._check_tp_size_divisibility(self._tp_size):
  191. raise ValueError(message + " Consider adjusting your tp-size value.")
  192. mem_fraction_static = find_parameter(
  193. self._model.backend_parameters, ["mem-fraction-static"]
  194. )
  195. if mem_fraction_static:
  196. self._param_mem_fraction_static = float(mem_fraction_static)
  197. def _cal_effective_vram(self, gpu_type: str) -> float:
  198. """Calculate effective VRAM considering SGLang's memory management."""
  199. if (
  200. self._is_diffusion
  201. or self._mem_fraction_static_by_gpu_type.get(gpu_type) == 0
  202. ):
  203. return self._vram_claim
  204. return self._vram_claim / self._mem_fraction_static_by_gpu_type.get(gpu_type)
  205. def _set_messages(self):
  206. """Set scheduling messages for SGLang."""
  207. if self._messages:
  208. return
  209. event_messages = {
  210. EVENT_ACTION_DEFAULT: "",
  211. EVENT_ACTION_MANUAL_MULTI: "",
  212. EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU: "",
  213. EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU: "",
  214. EVENT_ACTION_AUTO_SINGLE_GPU: "",
  215. }
  216. for event in self._event_collector.events:
  217. event_messages[event.action] = event.message
  218. messages = event_messages[EVENT_ACTION_DEFAULT] + "\n"
  219. for action in [
  220. EVENT_ACTION_MANUAL_MULTI,
  221. EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU,
  222. EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU,
  223. EVENT_ACTION_AUTO_SINGLE_GPU,
  224. ]:
  225. if event_messages[action]:
  226. messages += event_messages[action]
  227. break
  228. self._messages.append(messages)
  229. def _add_message(self, message: str):
  230. self._messages.append(message)
  231. def get_messages(self) -> List[str]:
  232. return self._messages
  233. async def select_candidates(
  234. self, workers: List[Worker]
  235. ) -> List[ModelInstanceScheduleCandidate]:
  236. """
  237. Get schedule candidates that fit the GPU resources requirement for SGLang.
  238. """
  239. # Initialize model parameters.
  240. await self._init_model_parameters(workers)
  241. if self._is_diffusion:
  242. self._vram_claim = await estimate_diffusion_model_vram(
  243. self._model, self._config.huggingface_token, workers
  244. )
  245. else:
  246. self.cal_mem_fraction_static(workers)
  247. self._vram_claim = await estimate_model_vram(
  248. self._model, self._config.huggingface_token, workers
  249. )
  250. self._ram_claim = get_model_ram_claim(self._model)
  251. logger.info(
  252. f"Calculated SGLang resource claim for model {self._model.readable_source}, "
  253. f"VRAM claim: {self._vram_claim}, RAM claim: {self._ram_claim}"
  254. )
  255. default_msg_list = ListMessageBuilder(
  256. f"The model requires approximately {byte_to_gib(self._vram_claim)} GiB of VRAM"
  257. f"{f' and {byte_to_gib(self._ram_claim)} GiB of RAM' if self._ram_claim > 0 else ''}."
  258. )
  259. max_mem_fraction_static = (
  260. max(self._mem_fraction_static_by_gpu_type.values())
  261. if self._mem_fraction_static_by_gpu_type
  262. else 0
  263. )
  264. if max_mem_fraction_static != 0 and not self._is_diffusion:
  265. default_msg_list.append(
  266. f"With --mem-fraction-static={max_mem_fraction_static}, "
  267. f"all GPUs combined need to provide at least {byte_to_gib(int(self._vram_claim / max_mem_fraction_static))} GiB of total VRAM "
  268. f"and each GPU needs {int(max_mem_fraction_static * 100)}% of allocatable VRAM."
  269. )
  270. self._event_collector.add(
  271. EventLevelEnum.INFO,
  272. EVENT_ACTION_DEFAULT,
  273. str(default_msg_list),
  274. )
  275. candidate_functions = [
  276. self.find_manual_gpu_selection_candidates,
  277. self.find_single_worker_single_gpu_full_offloading_candidates,
  278. self.find_single_worker_multi_gpu_full_offloading_candidates,
  279. self.find_multi_worker_multi_gpu_candidates,
  280. ]
  281. sort_workers_by_gpu_count(workers)
  282. for candidate_func in candidate_functions:
  283. if self.should_skip_candidate_func(candidate_func):
  284. continue
  285. logger.debug(
  286. f"SGLang model {self._model.readable_source}, filter candidates with resource fit selector: {candidate_func.__name__}"
  287. )
  288. candidates = candidate_func(workers)
  289. if len(candidates) == 1 and candidates[0].overcommit:
  290. self._set_messages()
  291. if candidates:
  292. return candidates
  293. self._set_messages()
  294. return []
  295. def cal_mem_fraction_static(self, workers: List[Worker]):
  296. """Calculate mem_fraction_static for SGLang for all type gpus."""
  297. workers_by_gpu_type = group_workers_by_gpu_type(workers)
  298. valid_gpu_types = (
  299. set(self._selected_gpu_indexes_by_gpu_type_and_worker.keys())
  300. if self._selected_gpu_indexes_by_gpu_type_and_worker
  301. else set(workers_by_gpu_type.keys())
  302. )
  303. self._mem_fraction_static_by_gpu_type.update(
  304. {
  305. gpu_type: (
  306. self._param_mem_fraction_static
  307. if self._param_mem_fraction_static > 0
  308. else MemFractionStaticCalculator(
  309. self._model,
  310. self._model_instances,
  311. self._model_params,
  312. gpu_type,
  313. self._selected_gpu_indexes_by_gpu_type_and_worker,
  314. )._cal_mem_fraction_static(workers_of_type)
  315. )
  316. for gpu_type, workers_of_type in workers_by_gpu_type.items()
  317. if gpu_type in valid_gpu_types
  318. }
  319. )
  320. def should_skip_candidate_func(self, candidate_func) -> bool:
  321. # Skip conditions for manual GPU selection.
  322. if (
  323. self._selected_gpu_workers
  324. and candidate_func != self.find_manual_gpu_selection_candidates
  325. ):
  326. return True
  327. # Skip conditions for distributed inference.
  328. if (
  329. not self._model.distributed_inference_across_workers
  330. and candidate_func == self.find_multi_worker_multi_gpu_candidates
  331. ):
  332. return True
  333. # SGLang Diffusion unsupported multi-worker
  334. if (
  335. CategoryEnum.IMAGE in self._model.categories
  336. and candidate_func == self.find_multi_worker_multi_gpu_candidates
  337. ):
  338. return True
  339. return False
  340. def find_manual_gpu_selection_candidates(
  341. self, workers: List[Worker]
  342. ) -> List[ModelInstanceScheduleCandidate]:
  343. request = RequestEstimateUsage(
  344. ram=self._ram_claim,
  345. vram=self._vram_claim,
  346. )
  347. if self._is_diffusion:
  348. request.vram = request.vram * self._gpu_count
  349. return self._find_manual_gpu_selection_candidates(
  350. workers,
  351. self._mem_fraction_static_by_gpu_type,
  352. request,
  353. self._event_collector,
  354. )
  355. def find_single_worker_single_gpu_full_offloading_candidates(
  356. self, workers: List[Worker]
  357. ) -> List[ModelInstanceScheduleCandidate]:
  358. """
  359. Find single worker single GPU candidates for SGLang.
  360. This function only handles automatic GPU selection.
  361. """
  362. if self._gpu_count is not None and self._gpu_count > 1:
  363. # Skip multi-GPU selection
  364. return []
  365. # Auto selection only
  366. candidates = []
  367. workers_by_gpu_type = group_workers_by_gpu_type(workers)
  368. for gpu_type, workers_of_type in workers_by_gpu_type.items():
  369. for worker in workers_of_type:
  370. worker_candidates = (
  371. self._find_single_worker_single_gpu_full_offloading_candidates(
  372. worker, gpu_type
  373. )
  374. )
  375. candidates.extend(worker_candidates)
  376. return candidates
  377. def _find_single_worker_single_gpu_full_offloading_candidates(
  378. self, worker: Worker, gpu_type: Optional[str] = None
  379. ) -> List[ModelInstanceScheduleCandidate]:
  380. """
  381. Find single worker single gpu full offloading candidates for the model instance with worker.
  382. requires: worker.status.gpu_devices is not None
  383. """
  384. candidates = []
  385. allocatable = self.get_worker_allocatable_resource(worker, gpu_type)
  386. if ram_not_enough(self._ram_claim, allocatable):
  387. return []
  388. if not worker.status.gpu_devices:
  389. return []
  390. largest_single_gpu_vram = 0
  391. largest_single_gpu_utilization = 0
  392. for _, gpu in enumerate(worker.status.gpu_devices):
  393. gpu_index = gpu.index
  394. allocatable_vram = allocatable.vram.get(gpu_index, 0)
  395. if gpu.memory is None or gpu.memory.total == 0:
  396. continue
  397. allocatable_gpu_utilization = allocatable_vram / gpu.memory.total
  398. if allocatable_vram > largest_single_gpu_vram:
  399. largest_single_gpu_vram = allocatable_vram
  400. largest_single_gpu_utilization = allocatable_gpu_utilization
  401. exceeds_vram = self._vram_claim > gpu.memory.total * (
  402. self._mem_fraction_static_by_gpu_type.get(gpu_type)
  403. if not self._is_diffusion
  404. else 1
  405. )
  406. exceeds_memory_utilization = (
  407. not self._is_diffusion
  408. and allocatable_gpu_utilization
  409. < self._mem_fraction_static_by_gpu_type.get(gpu_type)
  410. )
  411. if exceeds_vram or exceeds_memory_utilization:
  412. continue
  413. request_usage = (
  414. RequestEstimateUsage(ram=self._ram_claim, vram=self._vram_claim)
  415. if self._is_diffusion
  416. else None
  417. )
  418. vram_claim = self._get_worker_resource_claim(
  419. worker,
  420. [gpu_index],
  421. (
  422. self._mem_fraction_static_by_gpu_type.get(gpu_type)
  423. if not self._is_diffusion
  424. else 0
  425. ),
  426. request=request_usage,
  427. gpu_type=gpu_type,
  428. )
  429. candidates.append(
  430. ModelInstanceScheduleCandidate(
  431. worker=worker,
  432. gpu_type=gpu.type,
  433. gpu_indexes=[gpu_index],
  434. computed_resource_claim=ComputedResourceClaim(
  435. vram=vram_claim,
  436. ram=get_computed_ram_claim(self._model, vram_claim),
  437. vram_utilization=self._mem_fraction_static_by_gpu_type.get(
  438. gpu_type
  439. ),
  440. estimated_vram=self._vram_claim,
  441. ),
  442. )
  443. )
  444. if not candidates:
  445. event_msg = f"The current available GPU only has {byte_to_gib(largest_single_gpu_vram)} GiB allocatable VRAM."
  446. if (
  447. self._mem_fraction_static_by_gpu_type.get(gpu_type) != 0
  448. and not self._is_diffusion
  449. ):
  450. event_msg = (
  451. event_msg.rstrip(".")
  452. + f" ({(largest_single_gpu_utilization * 100):.2f}%)."
  453. )
  454. self._event_collector.add(
  455. EventLevelEnum.INFO,
  456. EVENT_ACTION_AUTO_SINGLE_GPU,
  457. str(ListMessageBuilder(event_msg)),
  458. )
  459. return candidates
  460. def find_single_worker_multi_gpu_full_offloading_candidates(
  461. self, workers: List[Worker]
  462. ) -> List[ModelInstanceScheduleCandidate]:
  463. """
  464. Find single worker multi GPU candidates for SGLang.
  465. This function only handles automatic GPU selection.
  466. """
  467. candidates = []
  468. workers_by_gpu_type = group_workers_by_gpu_type(workers)
  469. for gpu_type, workers_of_type in workers_by_gpu_type.items():
  470. for worker in workers_of_type:
  471. worker_candidates = (
  472. self._find_single_worker_multi_gpu_full_offloading_candidates(
  473. worker, gpu_type
  474. )
  475. )
  476. candidates.extend(worker_candidates)
  477. return candidates
  478. def _find_single_worker_multi_gpu_full_offloading_candidates( # noqa: C901
  479. self, worker: Worker, gpu_type: Optional[str] = None
  480. ) -> List[ModelInstanceScheduleCandidate]:
  481. """Find single worker multi GPU candidates for a specific worker."""
  482. if not worker.status.gpu_devices or len(worker.status.gpu_devices) < 2:
  483. return []
  484. # SGLang performs VRAM balancing checks. We group all GPUs based on available VRAM capacity
  485. gpu_group = group_worker_gpu_by_memory(
  486. [worker],
  487. model_instances=self._model_instances,
  488. ram_claim=self._ram_claim,
  489. gpu_type=gpu_type,
  490. )
  491. for info in gpu_group:
  492. gpu_list = info
  493. if not self._is_diffusion and any(
  494. gpu.allocatable_vram / gpu.gpu_device.memory.total
  495. < self._mem_fraction_static_by_gpu_type.get(gpu_type)
  496. for gpu in gpu_list
  497. ):
  498. continue
  499. if self._is_diffusion and any(
  500. gpu.allocatable_vram < self._cal_effective_vram(gpu_type)
  501. for gpu in gpu_list
  502. ):
  503. continue
  504. total_allocatable_vram = sum(gpu.allocatable_vram for gpu in gpu_list)
  505. if not gpu_list or len(gpu_list) < 2:
  506. continue
  507. if total_allocatable_vram > self._largest_multi_gpu_vram:
  508. self._largest_multi_gpu_vram = total_allocatable_vram
  509. self._largest_multi_gpu_utilization_satisfied_count = len(gpu_list)
  510. self._largest_multi_gpu_total = len(worker.status.gpu_devices)
  511. if total_allocatable_vram < self._cal_effective_vram(gpu_type):
  512. continue
  513. # Sort by vram in descending order
  514. sorted_gpu_devices: list[WorkerGPUInfo] = sorted(
  515. gpu_list,
  516. key=lambda gpu: gpu.allocatable_vram,
  517. reverse=True,
  518. )
  519. vram_sum = 0
  520. gpu_sum = 0
  521. gpu_indexes = []
  522. vram_claim: Dict[int, int] = {}
  523. found_candidate = False
  524. for _, gpu_device in enumerate(sorted_gpu_devices):
  525. gpu = gpu_device.gpu_device
  526. gpu_indexes.append(gpu.index)
  527. vram_claim[gpu.index] = int(
  528. gpu_device.allocatable_vram
  529. if self._is_diffusion
  530. else int(
  531. gpu.memory.total
  532. * self._mem_fraction_static_by_gpu_type.get(gpu_type)
  533. )
  534. )
  535. gpu_sum += 1
  536. vram_sum += vram_claim[gpu.index]
  537. if not self._is_tp_size_divisible(gpu_sum):
  538. continue
  539. if self._gpu_count and gpu_sum >= self._gpu_count:
  540. if vram_sum >= self._vram_claim:
  541. found_candidate = True
  542. # if self._gpu_count is set, cannot return more than gpu_count
  543. break
  544. if (not self._gpu_count) and vram_sum >= self._vram_claim:
  545. found_candidate = True
  546. break
  547. if found_candidate:
  548. return [
  549. ModelInstanceScheduleCandidate(
  550. worker=worker,
  551. gpu_type=gpu_type,
  552. gpu_indexes=gpu_indexes,
  553. computed_resource_claim=ComputedResourceClaim(
  554. vram=vram_claim,
  555. ram=get_computed_ram_claim(self._model, vram_claim),
  556. vram_utilization=self._mem_fraction_static_by_gpu_type.get(
  557. gpu_type
  558. ),
  559. estimated_vram=self._vram_claim,
  560. ),
  561. )
  562. ]
  563. event_msg_list = []
  564. if message := self._check_tp_size_divisibility(
  565. self._largest_multi_gpu_utilization_satisfied_count
  566. ):
  567. event_msg_list.append(message)
  568. if len(event_msg_list) == 0:
  569. event_msg = (
  570. f"The largest available worker has {byte_to_gib(self._largest_multi_gpu_vram):.2f} GiB allocatable VRAM."
  571. if not self._is_diffusion
  572. else f"SGLang Diffusion requires each GPU to provide {byte_to_gib(int(self._cal_effective_vram(gpu_type)))} GiB of allocatable VRAM when running in parallel."
  573. )
  574. if (
  575. self._mem_fraction_static_by_gpu_type.get(gpu_type) != 0
  576. and not self._is_diffusion
  577. ):
  578. effective_vram = (
  579. byte_to_gib(
  580. int(
  581. self._largest_multi_gpu_vram
  582. * self._mem_fraction_static_by_gpu_type.get(gpu_type)
  583. * self._largest_multi_gpu_utilization_satisfied_count
  584. / self._largest_multi_gpu_total
  585. )
  586. )
  587. if self._largest_multi_gpu_total > 0
  588. else 0
  589. )
  590. event_msg = (
  591. event_msg.rstrip(".")
  592. + f", {self._largest_multi_gpu_utilization_satisfied_count}/{self._largest_multi_gpu_total} of GPUs meet the VRAM utilization ratio, providing {effective_vram} GiB of allocatable VRAM."
  593. )
  594. event_msg_list.append(event_msg)
  595. self._event_collector.add(
  596. EventLevelEnum.INFO,
  597. EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU,
  598. str(ListMessageBuilder(event_msg_list)),
  599. )
  600. return []
  601. def find_multi_worker_multi_gpu_candidates(
  602. self, workers: List[Worker], gpu_type: Optional[str] = None
  603. ) -> List[ModelInstanceScheduleCandidate]:
  604. """
  605. Find multi worker multi GPU candidates for SGLang.
  606. This function only handles automatic multi-worker selection.
  607. """
  608. candidates = []
  609. workers_by_gpu_type = group_workers_by_gpu_type(workers)
  610. for gpu_type, workers_of_type in workers_by_gpu_type.items():
  611. gpu_group = group_worker_gpu_by_memory(
  612. workers_of_type,
  613. model_instances=self._model_instances,
  614. ram_claim=self._ram_claim,
  615. gpu_type=gpu_type,
  616. )
  617. for gpu_list in gpu_group:
  618. if len(gpu_list) <= 1:
  619. continue
  620. if any(
  621. not self._is_diffusion
  622. and gpu.allocatable_vram / gpu.gpu_device.memory.total
  623. < self._mem_fraction_static_by_gpu_type.get(gpu_type)
  624. for gpu in gpu_list
  625. ):
  626. continue
  627. worker_gpu_cnt = {}
  628. for gpu_info in gpu_list:
  629. if not worker_gpu_cnt.get(gpu_info.worker_id):
  630. worker_gpu_cnt[gpu_info.worker_id] = 0
  631. worker_gpu_cnt[gpu_info.worker_id] += 1
  632. first_cnt = 0
  633. # workers must with the same number of GPUs
  634. for gpu_cnt in worker_gpu_cnt.values():
  635. if not first_cnt:
  636. first_cnt = gpu_cnt
  637. if first_cnt != gpu_cnt:
  638. first_cnt = 0
  639. break
  640. if not first_cnt:
  641. continue
  642. if not self._is_tp_size_divisible(first_cnt):
  643. continue
  644. workers_candidates = self.auto_select_multi_worker_multi_gpu_candidates(
  645. workers_of_type, gpu_type
  646. )
  647. if workers_candidates:
  648. candidates.extend(workers_candidates)
  649. break # only need one group since gpu_list is sorted by vram in ascending order
  650. return candidates
  651. def auto_select_multi_worker_multi_gpu_candidates(
  652. self, workers: List[Worker], gpu_type: Optional[str] = None
  653. ) -> List[ModelInstanceScheduleCandidate]:
  654. """Auto select multi worker multi GPU candidates for SGLang."""
  655. candidates = []
  656. if not workers or len(workers) < 2:
  657. return candidates
  658. sort_workers_by_gpu_count(workers)
  659. workers_by_gpu_count_dict = defaultdict(list)
  660. for worker in workers:
  661. if not worker.status or not worker.status.gpu_devices:
  662. continue
  663. workers_by_gpu_count_dict[len(worker.status.gpu_devices)].append(worker)
  664. # Store the optimal combination info to show
  665. workers_combination: List[Worker] = []
  666. largest_vram = 0
  667. worker_count = 0
  668. device_count_per_worker = 0
  669. # Loop through worker groups with the same number of GPUs.
  670. for gpu_count, worker_group in workers_by_gpu_count_dict.items():
  671. if len(worker_group) < 2:
  672. continue
  673. selected_workers: List[Worker] = []
  674. gpu_sum = 0
  675. vram_sum = 0
  676. for worker in worker_group:
  677. allocatable = self.get_worker_allocatable_resource(worker, gpu_type)
  678. if ram_not_enough(self._ram_claim, allocatable):
  679. # The RAM resource(for extended KV cache) is required per worker.
  680. # Skip the worker if it does not satisfy the RAM requirement.
  681. continue
  682. if not self._is_diffusion and any(
  683. allocatable.vram.get(gpu.index, 0) / gpu.memory.total
  684. < self._mem_fraction_static_by_gpu_type.get(gpu_type)
  685. for gpu in worker.status.gpu_devices
  686. ):
  687. continue
  688. selected_workers.append(worker)
  689. gpu_sum += gpu_count
  690. vram_sum += sum(
  691. int(
  692. int(
  693. gpu.memory.total
  694. * self._mem_fraction_static_by_gpu_type.get(gpu_type)
  695. )
  696. if not self._is_diffusion
  697. else allocatable.vram.get(gpu.index, 0)
  698. )
  699. for gpu in worker.status.gpu_devices
  700. )
  701. if not self._is_tp_size_divisible(gpu_count):
  702. continue
  703. if vram_sum >= self._vram_claim:
  704. return [
  705. _create_candidate(
  706. self._model,
  707. selected_workers,
  708. self._mem_fraction_static_by_gpu_type.get(gpu_type),
  709. estimated_vram=self._vram_claim,
  710. )
  711. ]
  712. if vram_sum > largest_vram:
  713. workers_combination = selected_workers
  714. largest_vram = vram_sum
  715. worker_count = len(worker_group)
  716. device_count_per_worker = gpu_count
  717. # Nothing can be return, construct scheduling message
  718. event_message = ListMessageBuilder([])
  719. if workers_combination:
  720. worker_names = [worker.name for worker in workers_combination]
  721. worker_names_msg = (
  722. str(worker_names[:3]).rstrip("]")
  723. + f"...(more {len(worker_names) - 3})]"
  724. if len(worker_names) > 3
  725. else str(worker_names)
  726. )
  727. message = f"The optimal combination {worker_names_msg} provides {byte_to_gib(largest_vram)} GiB of allocatable VRAM."
  728. if worker_count - len(workers_combination) > 0:
  729. message += f" There are {worker_count - len(workers_combination)} {'workers' if worker_count - len(workers_combination) > 1 else 'worker'} that can provide {device_count_per_worker} {'GPUs' if device_count_per_worker > 1 else 'GPU'}, as the workers in the combination, but some GPUs among them fail to meet requirements."
  730. event_message.append(message)
  731. event_message.append(
  732. "Cannot find a suitable worker combination to run the model in distributed mode. "
  733. "If you are confident that the resources are sufficient, you may manually schedule the model by selecting the workers and GPUs."
  734. )
  735. self._event_collector.add(
  736. EventLevelEnum.INFO,
  737. EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU,
  738. str(event_message),
  739. )
  740. return []
  741. def _create_candidate(
  742. model: Model,
  743. selected_workers: List[Worker],
  744. mem_fraction_static: float = 0.9,
  745. estimated_vram: Optional[int] = None,
  746. ) -> ModelInstanceScheduleCandidate:
  747. """Create a candidate with SGLang-specific parameters and primary node confirmation."""
  748. if not selected_workers:
  749. raise ValueError("No workers provided for candidate creation")
  750. # Primary worker is the first one (with most GPUs)
  751. primary_worker = selected_workers[0]
  752. subordinate_workers = []
  753. # Calculate primary worker resources
  754. primary_gpu_indexes = []
  755. primary_vram_claim = {}
  756. gpu_type = None
  757. for gpu in primary_worker.status.gpu_devices:
  758. primary_gpu_indexes.append(gpu.index)
  759. primary_vram_claim[gpu.index] = int(gpu.memory.total * mem_fraction_static)
  760. gpu_type = gpu.type
  761. # Process subordinate workers if any
  762. if len(selected_workers) > 1:
  763. for worker in selected_workers[1:]:
  764. gpu_indexes = []
  765. vram_claim = {}
  766. if worker.status.gpu_devices:
  767. for gpu in worker.status.gpu_devices:
  768. gpu_indexes.append(gpu.index)
  769. vram_claim[gpu.index] = int(gpu.memory.total * mem_fraction_static)
  770. subordinate_worker = ModelInstanceSubordinateWorker(
  771. worker_id=worker.id,
  772. worker_name=worker.name,
  773. worker_ip=worker.ip,
  774. worker_ifname=worker.ifname,
  775. total_gpus=len(gpu_indexes),
  776. gpu_type=gpu_type,
  777. gpu_indexes=gpu_indexes,
  778. computed_resource_claim=ComputedResourceClaim(
  779. vram=vram_claim,
  780. ram=get_computed_ram_claim(model, vram_claim),
  781. vram_utilization=mem_fraction_static,
  782. estimated_vram=estimated_vram,
  783. ),
  784. )
  785. subordinate_workers.append(subordinate_worker)
  786. computed_resource_claim = ComputedResourceClaim(
  787. vram=primary_vram_claim,
  788. ram=get_computed_ram_claim(model, primary_vram_claim),
  789. estimated_vram=estimated_vram,
  790. )
  791. return ModelInstanceScheduleCandidate(
  792. worker=primary_worker,
  793. gpu_type=gpu_type,
  794. gpu_indexes=primary_gpu_indexes,
  795. computed_resource_claim=computed_resource_claim,
  796. subordinate_workers=subordinate_workers if subordinate_workers else None,
  797. )
  798. class MemFractionStaticCalculator:
  799. _model: Model
  800. _model_instances: List[ModelInstance]
  801. _gpu_type: Optional[str]
  802. _chunked_prefill_size: Optional[int]
  803. _cuda_graph_max_bs: Optional[int]
  804. _enable_dp_attention: Optional[bool]
  805. _speculative_algorithm: Optional[str]
  806. _tp_size: int
  807. _pp_size: int
  808. _dp_size: int
  809. _max_pp_size: int
  810. # Model hyperparameters.
  811. _model_params: ModelParameters
  812. _selected_gpu_indexes_by_gpu_type_and_worker: Dict[str, Dict[int, List[int]]] = {}
  813. def __init__(
  814. self,
  815. model: Model,
  816. model_instances: List[ModelInstance],
  817. model_params: ModelParameters,
  818. gpu_type: str,
  819. selected_gpu_indexes_by_gpu_type_and_worker: Dict[str, Dict[int, List[int]]],
  820. ) -> None:
  821. self._model = model
  822. self._model_instances = model_instances
  823. self._model_params = model_params
  824. self._gpu_type = gpu_type
  825. self._selected_gpu_indexes_by_gpu_type_and_worker = (
  826. selected_gpu_indexes_by_gpu_type_and_worker
  827. )
  828. self._chunked_prefill_size = find_int_parameter(
  829. self._model.backend_parameters, ["chunked-prefill-size"]
  830. )
  831. self._cuda_graph_max_bs = find_int_parameter(
  832. self._model.backend_parameters, ["cuda-graph-max-bs"]
  833. )
  834. self._enable_dp_attention = find_bool_parameter(
  835. model.backend_parameters, ["enable-dp-attention"]
  836. )
  837. self._speculative_algorithm = find_parameter(
  838. model.backend_parameters, ["speculative-algorithm"]
  839. )
  840. self._tp_size = (
  841. find_int_parameter(
  842. model.backend_parameters, ["tp-size", "tensor-parallel-size"]
  843. )
  844. or 1
  845. )
  846. self._pp_size = (
  847. find_int_parameter(
  848. model.backend_parameters, ["pp-size", "pipeline-parallel-size"]
  849. )
  850. or 1
  851. )
  852. self._dp_size = (
  853. find_int_parameter(
  854. model.backend_parameters, ["dp-size", "data-parallel-size"]
  855. )
  856. or 1
  857. )
  858. self._max_pp_size = max(
  859. self._pp_size,
  860. len(
  861. self._selected_gpu_indexes_by_gpu_type_and_worker.get(
  862. self._gpu_type, {}
  863. )
  864. ),
  865. )
  866. self._select_tp_size = max(
  867. [self._tp_size]
  868. + [
  869. len(gpu_indexes)
  870. for gpu_indexes in self._selected_gpu_indexes_by_gpu_type_and_worker.get(
  871. self._gpu_type, {}
  872. ).values()
  873. ]
  874. )
  875. def _cal_mem_fraction_static(self, workers: List[Worker]) -> float: # noqa: C901
  876. """
  877. Adapted from sglang's server_args memory fraction logic.
  878. Logic of SGLang set default mem_fraction_static:
  879. https://github.com/sgl-project/sglang/blob/037c3982af4a996f41b38cacf59f0be24b8699f8/python/sglang/srt/server_args.py#L751-L919
  880. note: we largely maintained the same code structure and logic, except we removed some assignments unrelated to the calculation of mem_fraction_static.
  881. Args:
  882. workers: List of workers used to determine GPU memory characteristics, the input workers should only contain same type GPUs.
  883. """
  884. if find_parameter(self._model.backend_parameters, ["mem-fraction-static"]):
  885. return self._param_mem_fraction_static
  886. is_npu = self._is_npu(workers)
  887. gpu_mem_bytes = self._get_min_gpu_sum(workers)
  888. gpu_mem = byte_to_mib(gpu_mem_bytes)
  889. # Step 1: Use the minimum GPU memory of all workers to calculate _chunked_prefill_size and _cuda_graph_max_bs.
  890. if gpu_mem:
  891. if gpu_mem < 20 * 1024:
  892. # T4, 4080
  893. # (_chunked_prefill_size 2k, _cuda_graph_max_bs 8)
  894. if self._chunked_prefill_size is None:
  895. self._chunked_prefill_size = 2048
  896. if self._cuda_graph_max_bs is None:
  897. self._cuda_graph_max_bs = 8
  898. elif is_npu and gpu_mem < 32 * 1024:
  899. # Atlas A2B4
  900. # (_chunked_prefill_size 32k, _cuda_graph_max_bs 16 if tp < 4 else 64)
  901. if self._chunked_prefill_size is None:
  902. self._chunked_prefill_size = 32768
  903. if self._cuda_graph_max_bs is None:
  904. if self._get_max_tp_size() < 4:
  905. self._cuda_graph_max_bs = 16
  906. else:
  907. self._cuda_graph_max_bs = 64
  908. elif gpu_mem < 35 * 1024:
  909. # A10, 4090, 5090
  910. # (_chunked_prefill_size 2k, _cuda_graph_max_bs 24 if tp < 4 else 80)
  911. if self._chunked_prefill_size is None:
  912. self._chunked_prefill_size = 2048
  913. if self._cuda_graph_max_bs is None:
  914. # Based on detailed statistics, when serving TP1/TP2 models on lower-end GPUs with HBM < 35GB, you can either disable cuda graph or set `_cuda_graph_max_bs` to a very small value to reduce the memory overhead of creating cuda graphs, with almost no impact on performance.
  915. # However, when serving models with TP4 or TP8, we need to enable cuda graph to maintain high performance. In this case, we can set `_cuda_graph_max_bs` to 80 (half of the default value 160) to reduce the memory overhead of creating cuda graphs. Looking at the logs
  916. # from TP4 serving of qwen2-72b, a value of 80 is sufficient and can reduce the memory overhead of creating cuda graphs on lower-end GPUs compared to the original 160, avoiding OOM issues.
  917. if self._get_max_tp_size() < 4:
  918. self._cuda_graph_max_bs = 24
  919. else:
  920. self._cuda_graph_max_bs = 80
  921. elif gpu_mem < 60 * 1024:
  922. # A100 (40GB), L40,
  923. # (_chunked_prefill_size 4k, _cuda_graph_max_bs 32 if tp < 4 else 160)
  924. if self._chunked_prefill_size is None:
  925. self._chunked_prefill_size = 4096
  926. if self._cuda_graph_max_bs is None:
  927. if self._get_max_tp_size() < 4:
  928. self._cuda_graph_max_bs = 32
  929. else:
  930. self._cuda_graph_max_bs = 160
  931. elif is_npu and gpu_mem < 64 * 1024:
  932. # Atlas A2 and Atlas A3
  933. # (_chunked_prefill_size 32k, _cuda_graph_max_bs 64 if tp < 4 else 128)
  934. if self._chunked_prefill_size is None:
  935. self._chunked_prefill_size = 32768
  936. if self._cuda_graph_max_bs is None:
  937. if self._get_max_tp_size() < 4:
  938. self._cuda_graph_max_bs = 64
  939. else:
  940. self._cuda_graph_max_bs = 128
  941. elif gpu_mem < 90 * 1024:
  942. # H100, A100
  943. # (_chunked_prefill_size 8k, _cuda_graph_max_bs 256 if tp < 4 else 512)
  944. if self._chunked_prefill_size is None:
  945. self._chunked_prefill_size = 8192
  946. if self._cuda_graph_max_bs is None:
  947. if self._get_max_tp_size() < 4:
  948. self._cuda_graph_max_bs = 256
  949. else:
  950. self._cuda_graph_max_bs = 512
  951. elif gpu_mem < 160 * 1024:
  952. # H20, H200
  953. # (_chunked_prefill_size 8k, _cuda_graph_max_bs 256 if tp < 4 else 512)
  954. if self._chunked_prefill_size is None:
  955. self._chunked_prefill_size = 8192
  956. if self._cuda_graph_max_bs is None:
  957. if self._get_max_tp_size() < 4:
  958. self._cuda_graph_max_bs = 256
  959. else:
  960. self._cuda_graph_max_bs = 512
  961. else:
  962. # B200, MI300
  963. # (_chunked_prefill_size 16k, _cuda_graph_max_bs 512)
  964. if self._chunked_prefill_size is None:
  965. self._chunked_prefill_size = 16384
  966. if self._cuda_graph_max_bs is None:
  967. self._cuda_graph_max_bs = 512
  968. else:
  969. # Fallback defaults when gpu_mem is None
  970. if self._chunked_prefill_size is None:
  971. self._chunked_prefill_size = 4096
  972. if self._cuda_graph_max_bs is None:
  973. self._cuda_graph_max_bs = 160
  974. # Step 2: Calculate reserved memory by other configs
  975. # Constant meta data (e.g., from attention backend)
  976. reserved_mem = 512
  977. # For activation during large prefill
  978. if self._chunked_prefill_size > 0:
  979. reserved_mem += max(self._chunked_prefill_size, 2048) * 1.5
  980. # For cuda graphs
  981. reserved_mem += self._cuda_graph_max_bs * 2
  982. # Some adjustments for large parallel size
  983. reserved_mem += self._get_max_tp_size() * self._max_pp_size / 8 * 1024
  984. if self._enable_dp_attention:
  985. # DP attention needs more padding for some operations
  986. reserved_mem += self._cuda_graph_max_bs * self._dp_size * 3
  987. # DP attention uses much more memory for large cuda graph max bs,
  988. # likely due to some inefficiencies in torch allocator or our implementation.
  989. # So we need to reserve more memory.
  990. if self._cuda_graph_max_bs > 300:
  991. reserved_mem += self._cuda_graph_max_bs * self._dp_size * 1.5
  992. if gpu_mem is not None and gpu_mem > 60 * 1024:
  993. reserved_mem = max(reserved_mem, 10 * 1024)
  994. if self._speculative_algorithm is not None:
  995. if self._speculative_algorithm == "STANDALONE":
  996. # standalonedraft model and cuda graphs
  997. reserved_mem += 6 * 1024
  998. elif self._speculative_algorithm != "NGRAM":
  999. # eagle draft models and cuda graphs
  1000. reserved_mem += 2 * 1024
  1001. # For piecewise cuda graphs
  1002. enable_piecewise_cuda_graph = find_parameter(
  1003. self._model.backend_parameters, ["enable-piecewise-cuda-graph"]
  1004. )
  1005. if enable_piecewise_cuda_graph:
  1006. piecewise_cuda_graph_max_tokens = (
  1007. find_int_parameter(
  1008. self._model.backend_parameters, ["piecewise-cuda-graph-max-tokens"]
  1009. )
  1010. or 0
  1011. )
  1012. reserved_mem += piecewise_cuda_graph_max_tokens // 4
  1013. _mem_fraction_static = (
  1014. round((gpu_mem - reserved_mem) / gpu_mem, 3)
  1015. if gpu_mem is not None and gpu_mem > 0
  1016. else 0.88
  1017. )
  1018. # Step 3: adjust mem_fraction_static for VL models
  1019. # Multimodal models need more memory for the image processing,
  1020. # so we adjust the mem_fraction_static accordingly.
  1021. model_config = self._model_params
  1022. vision_config = getattr(model_config, "vision_config", None)
  1023. if vision_config is not None:
  1024. # roughly reduce the mem_fraction_static base on params of Vit
  1025. original_server_arg_mem_fraction = _mem_fraction_static
  1026. # a base mem_fraction_static factor for regular Vit
  1027. base_mem_fraction_reduction_ratio = 0.95
  1028. # Qwen-VL family configs name the ViT layer count `depth` rather than `num_hidden_layers`.
  1029. vit_num_layers = (
  1030. getattr(vision_config, "num_hidden_layers", None)
  1031. or getattr(vision_config, "depth", None)
  1032. or 24
  1033. )
  1034. vit_hidden_size = getattr(vision_config, "hidden_size", 1024)
  1035. # baseline ViT params (ViT-L/14)
  1036. baseline_vit_layers = 24
  1037. baseline_vit_hidden_size = 1024
  1038. # weight params count
  1039. current_complexity_score = vit_num_layers * (vit_hidden_size**2)
  1040. baseline_complexity_score = baseline_vit_layers * (
  1041. baseline_vit_hidden_size**2
  1042. )
  1043. complexity_ratio = (
  1044. current_complexity_score / baseline_complexity_score
  1045. if baseline_complexity_score > 0
  1046. else 1.0
  1047. )
  1048. # every time the complexity grows 100%, adjust final factor for 10%
  1049. sensitivity_scale = 0.1
  1050. dynamic_adjustment_factor = 1.0 - sensitivity_scale * (
  1051. complexity_ratio - 1.0
  1052. )
  1053. dynamic_adjustment_factor = max(0.8, min(1.05, dynamic_adjustment_factor))
  1054. final_overall_factor = (
  1055. base_mem_fraction_reduction_ratio * dynamic_adjustment_factor
  1056. )
  1057. _mem_fraction_static = round(
  1058. original_server_arg_mem_fraction * final_overall_factor, 3
  1059. )
  1060. return _mem_fraction_static
  1061. def _is_npu(self, workers: List[Worker]) -> bool:
  1062. """
  1063. Check if the selected GPU type is NPU
  1064. """
  1065. is_npu = self._gpu_type == 'cann' or any(
  1066. ((gpu.vendor or '').lower() == 'ascend')
  1067. or ((gpu.type or '').lower() == 'cann')
  1068. for w in workers
  1069. for gpu in ((w.status and w.status.gpu_devices) or [])
  1070. )
  1071. return is_npu
  1072. def _get_min_gpu_sum(self, workers: List[Worker]) -> int:
  1073. """
  1074. Get the min GPU memory of input workers
  1075. """
  1076. use_manual = self._model.gpu_selector is not None
  1077. totals: List[int] = []
  1078. for worker in workers:
  1079. if not worker.status or not worker.status.gpu_devices:
  1080. continue
  1081. # If gpus_per_replica is set, choose top-N GPUs by allocatable VRAM
  1082. selected = None
  1083. if use_manual:
  1084. # Pre-selected GPU indexes for this worker when manual selection is used
  1085. selected = self._selected_gpu_indexes_by_gpu_type_and_worker.get(
  1086. self._gpu_type, {}
  1087. ).get(worker.name)
  1088. if self._model.gpu_selector.gpus_per_replica:
  1089. allocatable = get_worker_allocatable_resource(
  1090. self._model_instances, worker, self._gpu_type
  1091. )
  1092. sorted_gpu_indexes = [
  1093. idx
  1094. for idx in sort_gpu_indexes_by_allocatable_rate(
  1095. worker, allocatable.vram, gpu_type=self._gpu_type
  1096. )
  1097. if idx in selected
  1098. ]
  1099. selected = sorted_gpu_indexes[
  1100. : self._model.gpu_selector.gpus_per_replica
  1101. ]
  1102. # Traverse GPUs for this worker and respect manual selection if present
  1103. for gpu in worker.status.gpu_devices:
  1104. if selected is not None and gpu.index not in selected:
  1105. continue
  1106. total = gpu.memory.total if (gpu.memory and gpu.memory.total) else 0
  1107. totals.append(total)
  1108. return min(totals) if totals else 0
  1109. def _get_max_tp_size(self) -> int:
  1110. cmp_list = [self._tp_size, self._select_tp_size]
  1111. if self._model.gpu_selector and self._model.gpu_selector.gpus_per_replica:
  1112. cmp_list.append(self._model.gpu_selector.gpus_per_replica)
  1113. return max(cmp_list)