utils.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854
  1. import asyncio
  2. import logging
  3. import os
  4. from typing import Any, Callable, Dict, List, Optional
  5. from sqlmodel.ext.asyncio.session import AsyncSession
  6. from gpustack.client.worker_filesystem_client import WorkerFilesystemClient
  7. from gpustack.policies.base import (
  8. Allocatable,
  9. Allocated,
  10. )
  11. from gpustack.scheduler.calculator import calculate_local_model_weight_size
  12. from gpustack.schemas.models import (
  13. ModelInstance,
  14. Model,
  15. CategoryEnum,
  16. SourceEnum,
  17. is_llm_model,
  18. )
  19. from gpustack.schemas.model_files import ModelFileStateEnum
  20. from gpustack.schemas.workers import Worker, GPUDevicesStatus, GPUDeviceStatus
  21. from pydantic import BaseModel
  22. from gpustack.server.services import ModelFileService
  23. from gpustack.utils.hub import get_model_weight_size, get_diffusion_model_weight_size
  24. logger = logging.getLogger(__name__)
  25. class WorkerGPUInfo(BaseModel):
  26. """
  27. Data structure to represent a GPU device with its associated worker information.
  28. """
  29. worker_id: int
  30. worker_name: str
  31. gpu_device: GPUDeviceStatus
  32. allocatable_vram: int # in bytes
  33. def get_worker_allocatable_resource( # noqa: C901
  34. all_model_instances: List[ModelInstance],
  35. worker: Worker,
  36. gpu_type: Optional[str] = None,
  37. ) -> Allocatable:
  38. """
  39. Get the worker with the latest allocatable resources, if gpu_type is provided, only consider the GPUs of that type.
  40. """
  41. def update_allocated_vram(allocated, resource_claim):
  42. if not resource_claim.vram:
  43. return
  44. for gpu_index, vram in resource_claim.vram.items():
  45. allocated.vram[gpu_index] = allocated.vram.get(gpu_index, 0) + vram
  46. is_unified_memory = worker.status.memory.is_unified_memory
  47. model_instances = get_worker_model_instances(all_model_instances, worker)
  48. allocated = Allocated(ram=0, vram={})
  49. # 保护 system_reserved 为 None 的情况
  50. system_reserved = worker.system_reserved
  51. if system_reserved is None:
  52. from types import SimpleNamespace
  53. system_reserved = SimpleNamespace(ram=0, vram=0)
  54. system_reserved_ram = getattr(system_reserved, 'ram', 0) or 0
  55. system_reserved_vram = getattr(system_reserved, 'vram', 0) or 0
  56. for model_instance in model_instances:
  57. # Handle resource allocation for main worker
  58. if model_instance.worker_id == worker.id and (
  59. gpu_type is None
  60. or model_instance.gpu_type is None
  61. or model_instance.gpu_type == gpu_type
  62. ):
  63. allocated.ram += model_instance.computed_resource_claim.ram or 0
  64. if model_instance.gpu_indexes:
  65. update_allocated_vram(allocated, model_instance.computed_resource_claim)
  66. # Handle resource allocation for subordinate workers
  67. if (
  68. model_instance.distributed_servers
  69. and model_instance.distributed_servers.subordinate_workers
  70. ):
  71. for (
  72. subordinate_worker
  73. ) in model_instance.distributed_servers.subordinate_workers:
  74. if subordinate_worker.worker_id != worker.id:
  75. continue
  76. if subordinate_worker.computed_resource_claim and (
  77. gpu_type is None or model_instance.gpu_type == gpu_type
  78. ):
  79. # rpc server only consider the vram
  80. update_allocated_vram(
  81. allocated, subordinate_worker.computed_resource_claim
  82. )
  83. allocatable = Allocatable(ram=0, vram={})
  84. if worker.status.gpu_devices:
  85. for _, gpu in enumerate(worker.status.gpu_devices):
  86. gpu_index = gpu.index
  87. if (
  88. gpu.memory is None
  89. or gpu.memory.total is None
  90. or (gpu_type is not None and gpu.type != gpu_type)
  91. ):
  92. continue
  93. allocatable_vram = max(
  94. (
  95. gpu.memory.total
  96. - allocated.vram.get(gpu_index, 0)
  97. - system_reserved_vram
  98. ),
  99. 0,
  100. )
  101. allocatable.vram[gpu_index] = allocatable_vram
  102. allocatable.ram = max(
  103. (worker.status.memory.total - allocated.ram - system_reserved_ram), 0
  104. )
  105. if is_unified_memory:
  106. allocatable.ram = max(
  107. allocatable.ram
  108. - system_reserved_vram
  109. - sum(allocated.vram.values()),
  110. 0,
  111. )
  112. # For UMA, we need to set the gpu memory to the minimum of
  113. # the calculated with max allow gpu memory and the allocatable memory.
  114. if allocatable.vram:
  115. allocatable.vram[0] = min(allocatable.ram, allocatable.vram[0])
  116. logger.debug(
  117. f"Worker {worker.name} gpu_type {gpu_type}, "
  118. f"reserved memory: {system_reserved_ram}, "
  119. f"reserved gpu memory: {system_reserved_vram}, "
  120. f"allocatable memory: {allocatable.ram}, "
  121. f"allocatable gpu memory: {allocatable.vram}"
  122. )
  123. return allocatable
  124. def group_gpu_devices_by_memory(
  125. gpu_devices: GPUDevicesStatus,
  126. ) -> List[GPUDevicesStatus]:
  127. """
  128. Group GPU devices by allocatable memory size with the constraint that the minimum
  129. allocatable GPU memory in each group should not be less than 0.9 times the
  130. allocatable memory of other GPUs in the same group.
  131. Args:
  132. gpu_devices: List of GPU device information
  133. Returns:
  134. List of GPU device groups, where each group is a list of GPU devices
  135. Example:
  136. If we have GPUs with allocatable memory [8GB, 8.5GB, 16GB, 16.5GB, 32GB],
  137. they might be grouped as:
  138. - Group 1: [8GB, 8.5GB] (8GB >= 8.5GB * 0.9 = 7.65GB)
  139. - Group 2: [16GB, 16.5GB] (16GB >= 16.5GB * 0.9 = 14.85GB)
  140. - Group 3: [32GB]
  141. """
  142. if not gpu_devices:
  143. return []
  144. def get_allocatable_memory(gpu: GPUDeviceStatus) -> Optional[int]:
  145. """Calculate allocatable memory (total - allocated)"""
  146. if not gpu.memory or gpu.memory.total is None:
  147. return None
  148. allocated = gpu.memory.allocated or 0
  149. return gpu.memory.total - allocated
  150. # Filter out GPUs without valid memory information
  151. valid_gpus = []
  152. for gpu in gpu_devices:
  153. allocatable_memory = get_allocatable_memory(gpu)
  154. if allocatable_memory is not None and allocatable_memory > 0:
  155. valid_gpus.append(gpu)
  156. if not valid_gpus:
  157. return []
  158. # Sort GPUs by allocatable memory size (ascending order)
  159. sorted_gpus = sorted(valid_gpus, key=lambda gpu: get_allocatable_memory(gpu))
  160. groups = []
  161. current_group = []
  162. for gpu in sorted_gpus:
  163. if not current_group:
  164. # Start a new group
  165. current_group = [gpu]
  166. else:
  167. # Check if this GPU can be added to the current group
  168. min_allocatable_memory = get_allocatable_memory(
  169. current_group[0]
  170. ) # First GPU has minimum allocatable memory
  171. current_gpu_allocatable_memory = get_allocatable_memory(gpu)
  172. # Check if min_allocatable_memory >= current_gpu_allocatable_memory * 0.9
  173. # This ensures the minimum allocatable memory is not less than 0.9 times any other allocatable memory in the group
  174. if min_allocatable_memory >= current_gpu_allocatable_memory * 0.9:
  175. current_group.append(gpu)
  176. else:
  177. # Cannot add to current group, start a new group
  178. groups.append(current_group)
  179. current_group = [gpu]
  180. # Add the last group
  181. if current_group:
  182. groups.append(current_group)
  183. return groups
  184. def group_workers_by_gpu_type(workers: List[Worker]) -> Dict[str, List[Worker]]:
  185. """
  186. Group workers by their GPU types.
  187. Args:
  188. workers: List of workers containing GPU devices
  189. Returns:
  190. Dictionary mapping GPU type to list of workers that with gpus of that type
  191. """
  192. gpu_type_to_workers: Dict[str, List[Worker]] = {}
  193. for worker in workers:
  194. if not worker.status or not worker.status.gpu_devices:
  195. gpu_type_to_workers.setdefault(None, []).append(worker)
  196. continue
  197. # Collect unique GPU types for this worker
  198. gpus: Dict[str, GPUDevicesStatus] = {}
  199. for gpu in worker.status.gpu_devices:
  200. gpus.setdefault(gpu.type, []).append(gpu)
  201. # Add worker to each GPU type group
  202. for gpu_type in gpus.keys():
  203. w = worker.model_copy()
  204. w.status.gpu_devices = gpus[gpu_type]
  205. gpu_type_to_workers.setdefault(gpu_type, []).append(w)
  206. return gpu_type_to_workers
  207. def get_vram_claim_from_model_env(model: Model) -> Optional[int]:
  208. """
  209. Get the VRAM claim from model environment variable 'GPUSTACK_MODEL_VRAM_CLAIM' if set.
  210. """
  211. if model.env and 'GPUSTACK_MODEL_VRAM_CLAIM' in model.env:
  212. try:
  213. return int(model.env['GPUSTACK_MODEL_VRAM_CLAIM'])
  214. except ValueError:
  215. logger.warning(
  216. f"Invalid VRAM claim value for model {model.name}: {model.env['GPUSTACK_MODEL_VRAM_CLAIM']}"
  217. )
  218. return None
  219. async def _get_cached_model_size(
  220. session: Optional[AsyncSession],
  221. model: Model,
  222. ) -> Optional[int]:
  223. """
  224. Get cached file size from downloaded ModelFile.
  225. """
  226. if not session:
  227. return None
  228. source_index = model.model_source_index
  229. if not source_index:
  230. return None
  231. model_files = await ModelFileService(session).get_by_source_index(source_index)
  232. if not model_files:
  233. return None
  234. # Find READY files with size
  235. for mf in model_files:
  236. if mf.state == ModelFileStateEnum.READY and mf.size:
  237. logger.info(f"Using cached size {mf.size} from ModelFile {mf.id}")
  238. return mf.size
  239. return None
  240. async def estimate_model_vram(
  241. model: Model,
  242. token: Optional[str] = None,
  243. workers: Optional[List[Worker]] = None,
  244. session: Optional[AsyncSession] = None,
  245. ) -> int:
  246. """
  247. Estimate the vram requirement in bytes heuristically.
  248. This is the minimum requirement to help us decide how many GPUs are needed for the model.
  249. If users explicitly set parameters like tp & pp, this estimation is not needed.
  250. Formula for Diffusion (Image) models:
  251. VRAM = WEIGHT_SIZE
  252. Formula for LLM models:
  253. VRAM = WEIGHT_SIZE * 1.2 + RESERVED_FOOTPRINT
  254. Reference for the 20% overhead: https://blog.eleuther.ai/transformer-math/#total-inference-memory
  255. For example, using bfloat16,
  256. - 0.5B requires 3.1 GiB
  257. - 3B requires 8.9 GiB
  258. - 7B requires 19.0 GiB
  259. - 72B requires 164.5 GiB
  260. """
  261. env_vram_claim = get_vram_claim_from_model_env(model)
  262. if env_vram_claim is not None:
  263. # Use as a potential workaround if the empirical vram estimation is far beyond the expected value.
  264. return env_vram_claim
  265. weight_size = 0
  266. timeout_in_seconds = 15
  267. try:
  268. if model.categories and CategoryEnum.IMAGE in model.categories:
  269. weight_size = await asyncio.wait_for(
  270. estimate_diffusion_model_vram(model, token, workers, session),
  271. timeout=timeout_in_seconds,
  272. )
  273. return weight_size
  274. elif (
  275. model.source == SourceEnum.HUGGING_FACE
  276. or model.source == SourceEnum.MODEL_SCOPE
  277. ):
  278. weight_size = await asyncio.wait_for(
  279. asyncio.to_thread(get_model_weight_size, model, token),
  280. timeout=timeout_in_seconds,
  281. )
  282. elif model.source == SourceEnum.LOCAL_PATH:
  283. # Try to get cached size from ModelFile first
  284. cached_size = await _get_cached_model_size(session, model)
  285. if cached_size:
  286. weight_size = cached_size
  287. else:
  288. # Fall back to querying workers
  289. weight_size = await get_local_model_weight_size(
  290. model.local_path, workers, is_diffusion=False
  291. )
  292. except asyncio.TimeoutError:
  293. logger.warning(f"Timeout when getting weight size for model {model.name}")
  294. except Exception as e:
  295. logger.warning(f"Cannot get weight size for model {model.name}: {e}")
  296. # Reference: https://blog.eleuther.ai/transformer-math/#total-inference-memory
  297. activation_overhead_factor = 1.2
  298. if model.categories and CategoryEnum.TEXT_TO_SPEECH in model.categories:
  299. # Emperical factor base on Qwen3-TTS
  300. activation_overhead_factor = 3
  301. # CUDA graphs can take additional 1~3 GiB memory
  302. # https://github.com/vllm-project/vllm/blob/v0.6.1/vllm/worker/model_runner.py#L1313
  303. # For non-LLM models like embedding, set a smaller overhead
  304. framework_overhead = 2 * 1024**3 if is_llm_model(model) else 512 * 1024**2
  305. return int(weight_size * activation_overhead_factor) + framework_overhead
  306. async def estimate_diffusion_model_vram(
  307. model: Model,
  308. token: Optional[str] = None,
  309. workers: Optional[List[Worker]] = None,
  310. session: Optional[AsyncSession] = None,
  311. ) -> int:
  312. """ """
  313. if model.env and 'GPUSTACK_MODEL_VRAM_CLAIM' in model.env:
  314. # Use as a potential workaround if the empirical vram estimation is far beyond the expected value.
  315. return int(model.env['GPUSTACK_MODEL_VRAM_CLAIM'])
  316. weight_size = 0
  317. timeout_in_seconds = 15
  318. try:
  319. if (
  320. model.source == SourceEnum.HUGGING_FACE
  321. or model.source == SourceEnum.MODEL_SCOPE
  322. ):
  323. weight_size = await asyncio.wait_for(
  324. asyncio.to_thread(get_diffusion_model_weight_size, model, token),
  325. timeout=timeout_in_seconds,
  326. )
  327. elif model.source == SourceEnum.LOCAL_PATH:
  328. # Try to get cached size from ModelFile first
  329. cached_size = await _get_cached_model_size(session, model)
  330. if cached_size:
  331. weight_size = cached_size
  332. else:
  333. # Fall back to querying workers with is_diffusion=True
  334. weight_size = await get_local_model_weight_size(
  335. model.local_path, workers, is_diffusion=True
  336. )
  337. except asyncio.TimeoutError:
  338. logger.warning(f"Timeout when getting weight size for model {model.name}")
  339. except Exception as e:
  340. logger.warning(f"Cannot get weight size for model {model.name}: {e}")
  341. # Diffusion models need extra VRAM for VAE decode/encode, text encoder forward pass,
  342. # UNet/Transformer activations during denoising, and intermediate latent buffers.
  343. # Use 1.2x activation overhead (similar to LLM) + 1 GiB framework overhead.
  344. activation_overhead_factor = 1.2
  345. framework_overhead = 1 * 1024**3
  346. return int(weight_size * activation_overhead_factor) + framework_overhead
  347. def get_worker_model_instances(
  348. all_model_instances: List[ModelInstance], worker: Worker
  349. ) -> List[ModelInstance]:
  350. """
  351. Get all model instances related to the worker, including:
  352. 1. Model instances assigned to this worker (main worker)
  353. 2. Model instances that use this worker as a subordinate worker in distributed inference
  354. """
  355. # Filter to get only the relevant instances:
  356. # 1. Instances assigned to this worker (main worker)
  357. # 2. Instances that use this worker as a subordinate worker
  358. relevant_instances = []
  359. for model_instance in all_model_instances:
  360. # Check if this is a main worker instance
  361. if model_instance.worker_id == worker.id:
  362. relevant_instances.append(model_instance)
  363. # Check if this worker is used as a subordinate worker
  364. elif (
  365. model_instance.distributed_servers
  366. and model_instance.distributed_servers.subordinate_workers
  367. ):
  368. for (
  369. subordinate_worker
  370. ) in model_instance.distributed_servers.subordinate_workers:
  371. if subordinate_worker.worker_id == worker.id:
  372. relevant_instances.append(model_instance)
  373. break
  374. return relevant_instances
  375. class ListMessageBuilder:
  376. def __init__(self, messages: Optional[str | List[str]]):
  377. if not messages:
  378. self._messages = []
  379. self._messages = messages if isinstance(messages, list) else [messages]
  380. def append(self, message: str):
  381. self._messages.append(message)
  382. def extend(self, message: List[str]):
  383. self._messages.extend(message)
  384. def __str__(self) -> str:
  385. return "\n".join([f"- {line}" for line in self._messages])
  386. def get_model_num_attention_heads(pretrained_config) -> Optional[int]:
  387. """
  388. Get the number of attention heads in the model.
  389. Priority: llm_config > text_config > root-level setting > thinker_config.text_config
  390. """
  391. num_attention_heads = None
  392. try:
  393. # Helper to get num_attention_heads from config
  394. def get_heads_from(cfg, key="num_attention_heads"):
  395. value = getattr(cfg, key, None)
  396. if isinstance(value, int) and value > 0:
  397. return value
  398. return None
  399. thinker_config = getattr(pretrained_config, "thinker_config", None)
  400. thinker_text_config = (
  401. getattr(thinker_config, "text_config", None) if thinker_config else None
  402. )
  403. configs_by_priority = [
  404. getattr(pretrained_config, "llm_config", None),
  405. getattr(pretrained_config, "text_config", None),
  406. pretrained_config,
  407. thinker_text_config,
  408. ]
  409. for config in configs_by_priority:
  410. if not config:
  411. continue
  412. heads = get_heads_from(config)
  413. if heads is not None:
  414. num_attention_heads = heads
  415. break
  416. except Exception as e:
  417. logger.warning(f"Cannot get num_attention_heads: {e}")
  418. return num_attention_heads
  419. def _get_config_value(config: Any, key: str) -> Any:
  420. if config is None:
  421. return None
  422. if isinstance(config, dict):
  423. return config.get(key)
  424. return getattr(config, key, None)
  425. def _get_config_int(config: Any, key: str) -> Optional[int]:
  426. value = _get_config_value(config, key)
  427. if type(value) is int:
  428. return value
  429. return None
  430. def get_model_vision_num_attention_heads(pretrained_config: Any) -> Optional[int]:
  431. """
  432. Get total vision attention heads used for TP divisibility check.
  433. Priority for raw heads: num_attention_heads > num_heads.
  434. The final value is:
  435. total_vision_heads = raw_heads + max(num_dummy_heads, 0)
  436. """
  437. try:
  438. vision_config = _get_config_value(pretrained_config, "vision_config")
  439. if not vision_config:
  440. return None
  441. num_heads = _get_config_int(vision_config, "num_attention_heads")
  442. if num_heads is None:
  443. num_heads = _get_config_int(vision_config, "num_heads")
  444. if num_heads is None or num_heads <= 0:
  445. return None
  446. num_dummy_heads = _get_config_int(vision_config, "num_dummy_heads") or 0
  447. return num_heads + max(num_dummy_heads, 0)
  448. except Exception as e:
  449. logger.warning(f"Cannot get vision num_attention_heads: {e}")
  450. return None
  451. async def get_local_model_weight_size(
  452. local_path: str,
  453. workers: Optional[List[Worker]] = None,
  454. is_diffusion: bool = False,
  455. ) -> int:
  456. """
  457. Get the local model weight size in bytes.
  458. If the model exists locally (on the server), calculate it locally.
  459. Otherwise, if workers are provided, check if the model exists on any worker and get the size from there.
  460. Args:
  461. local_path: Path to the model directory
  462. workers: Optional list of workers to check
  463. is_diffusion: Whether this is a diffusion model (default: False)
  464. Returns:
  465. Total size in bytes
  466. """
  467. if os.path.exists(local_path):
  468. if not os.path.isdir(local_path):
  469. raise NotADirectoryError(
  470. f"The specified path '{local_path}' is not a directory."
  471. )
  472. try:
  473. # Use utility function to calculate size
  474. return calculate_local_model_weight_size(local_path, is_diffusion)
  475. except Exception as e:
  476. logger.error(f"Failed to calculate size locally for {local_path}: {e}")
  477. raise e
  478. if not workers:
  479. raise FileNotFoundError(f"The specified path '{local_path}' does not exist.")
  480. async def try_get_size_from_worker(worker: Worker) -> Optional[int]:
  481. """Try to get model weight size from a single worker."""
  482. try:
  483. async with WorkerFilesystemClient() as fs_client:
  484. size = await fs_client.get_model_weight_size(worker, local_path)
  485. if isinstance(size, int):
  486. logger.info(
  487. f"Successfully got model weight size from worker {worker.id}: {size} bytes"
  488. )
  489. return size
  490. return None
  491. except Exception as e:
  492. logger.debug(
  493. f"Failed to get model weight size from worker {worker.id}: {e}"
  494. )
  495. return None
  496. # Concurrently try all workers and return the first successful result
  497. logger.info(f"Broadcasting model weight size request to {len(workers)} workers")
  498. tasks = [try_get_size_from_worker(worker) for worker in workers]
  499. # Use as_completed to get results as they finish
  500. for completed_task in asyncio.as_completed(tasks):
  501. result = await completed_task
  502. if result is not None:
  503. return result
  504. def group_worker_gpu_by_memory(
  505. workers: List[Worker],
  506. model_instances: List[ModelInstance],
  507. ram_claim: int = 0,
  508. gpu_type: Optional[str] = None,
  509. ) -> List[List[WorkerGPUInfo]]:
  510. """
  511. Group GPU devices from multiple workers by allocatable memory size with the constraint
  512. that the minimum allocatable GPU memory in each group should not be less than 0.9 times
  513. the allocatable memory of other GPUs in the same group.
  514. Args:
  515. engine: Database engine for calculating allocatable resources
  516. workers: List of workers containing GPU devices
  517. ram_claim: RAM claim in bytes to filter out workers that do not have enough RAM
  518. Returns:
  519. List of GPU device groups, where each group is a list of WorkerGPUInfo objects
  520. containing worker information and GPU device details
  521. Example:
  522. If we have GPUs from different workers with allocatable memory [8GB, 8.5GB, 16GB, 16.5GB, 32GB],
  523. they might be grouped as:
  524. - Group 1: [WorkerGPUInfo(worker1, gpu1, 8GB), WorkerGPUInfo(worker2, gpu2, 8.5GB)]
  525. - Group 2: [WorkerGPUInfo(worker1, gpu3, 16GB), WorkerGPUInfo(worker3, gpu1, 16.5GB)]
  526. - Group 3: [WorkerGPUInfo(worker2, gpu4, 32GB)]
  527. """
  528. if not workers:
  529. return []
  530. # Collect all GPU devices with their worker information and allocatable VRAM
  531. worker_gpu_infos = []
  532. for worker in workers:
  533. if not worker.status or not worker.status.gpu_devices:
  534. continue
  535. # Get allocatable resources for this worker
  536. allocatable = get_worker_allocatable_resource(model_instances, worker, gpu_type)
  537. if ram_not_enough(ram_claim, allocatable):
  538. continue
  539. for gpu_device in worker.status.gpu_devices:
  540. if gpu_device.index is None:
  541. continue
  542. if not gpu_device.memory or gpu_device.memory.total == 0:
  543. continue
  544. # Get allocatable VRAM for this specific GPU
  545. gpu_index = gpu_device.index
  546. allocatable_vram = allocatable.vram.get(gpu_index, 0)
  547. # Only include GPUs with positive allocatable VRAM
  548. if allocatable_vram > 0:
  549. worker_gpu_info = WorkerGPUInfo(
  550. worker_id=worker.id,
  551. worker_name=worker.name,
  552. gpu_device=gpu_device,
  553. allocatable_vram=allocatable_vram,
  554. )
  555. worker_gpu_infos.append(worker_gpu_info)
  556. if not worker_gpu_infos:
  557. return []
  558. # Sort GPUs by allocatable VRAM size (ascending order)
  559. sorted_worker_gpu_infos = sorted(
  560. worker_gpu_infos, key=lambda info: info.allocatable_vram
  561. )
  562. groups = []
  563. current_group = []
  564. for worker_gpu_info in sorted_worker_gpu_infos:
  565. if not current_group:
  566. # Start a new group
  567. current_group = [worker_gpu_info]
  568. else:
  569. # Check if this GPU can be added to the current group
  570. min_allocatable_vram = current_group[
  571. 0
  572. ].allocatable_vram # First GPU has minimum allocatable VRAM
  573. current_allocatable_vram = worker_gpu_info.allocatable_vram
  574. # Check if min_allocatable_vram >= current_allocatable_vram * 0.9
  575. # This ensures the minimum allocatable VRAM is not less than 0.9 times any other allocatable VRAM in the group
  576. if min_allocatable_vram >= current_allocatable_vram * 0.9:
  577. current_group.append(worker_gpu_info)
  578. else:
  579. # Cannot add to current group, start a new group
  580. groups.append(current_group)
  581. current_group = [worker_gpu_info]
  582. # Add the last group
  583. if current_group:
  584. groups.append(current_group)
  585. return groups
  586. def ram_not_enough(ram_claim: int, allocatable: Allocatable) -> bool:
  587. """
  588. Check if the allocatable RAM is not enough for the claimed RAM.
  589. """
  590. if ram_claim <= 0:
  591. return False
  592. return allocatable.ram < ram_claim
  593. def get_model_ram_claim(model: Model) -> int:
  594. """
  595. Get the RAM requirement for the model in bytes.
  596. """
  597. extended_kv_cache = model.extended_kv_cache
  598. if (
  599. extended_kv_cache
  600. and extended_kv_cache.enabled
  601. and extended_kv_cache.ram_size
  602. and extended_kv_cache.ram_size > 0
  603. ):
  604. # When extended kv cache is enabled, reserve the ram for KV cache.
  605. return extended_kv_cache.ram_size * 1024**3
  606. return 0
  607. def get_computed_ram_claim(
  608. model: Model, vram_claim: Dict[int, int], static_ram: Optional[int] = None
  609. ) -> Optional[int]:
  610. """
  611. Get the computed RAM claim for the model based on the provided model and vram_claim.
  612. The priority is as follows:
  613. 1. If static_ram is provided, use it.
  614. 2. If RAM size for extended KV cache is available, use it.
  615. 3. If RAM ratio for extended KV cache is set and vram_claim is available, calculate RAM as ram_ratio * total_vram_claim.
  616. 4. If neither is available, return None.
  617. """
  618. if static_ram:
  619. return static_ram
  620. ext = model.extended_kv_cache
  621. if not ext or not ext.enabled:
  622. return None
  623. claim = None
  624. # static ram size
  625. if ext.ram_size and ext.ram_size > 0:
  626. claim = ext.ram_size * 1024**3
  627. # ram ratio to vram
  628. elif ext.ram_ratio and ext.ram_ratio > 0 and vram_claim:
  629. total_vram_claim = sum(vram_claim.values())
  630. claim = int(total_vram_claim * ext.ram_ratio)
  631. return claim
  632. def sort_workers_by_gpu_count(workers: List[Worker]):
  633. """
  634. Sort workers by the number of GPUs.
  635. """
  636. workers.sort(
  637. key=lambda worker: (
  638. len(worker.status.gpu_devices)
  639. if worker.status and worker.status.gpu_devices
  640. else 0
  641. ),
  642. reverse=True,
  643. )
  644. def sort_gpu_indexes_by_allocatable_rate(
  645. worker: Worker, allocatable: dict, gpu_type: Optional[str] = None
  646. ) -> List[int]:
  647. """
  648. Sort GPU indexes of a worker by allocatable VRAM rate (allocatable_vram / total_vram), ascending.
  649. """
  650. allocatable_rate = {
  651. gpu.index: (
  652. allocatable.get(gpu.index, 0) / gpu.memory.total
  653. if gpu.memory
  654. and gpu.memory.total
  655. and (gpu_type is None or gpu.type == gpu_type)
  656. else 0
  657. )
  658. for gpu in worker.status.gpu_devices
  659. }
  660. # return a list of gpu indexes sorted by allocatable rate in ascending order
  661. return sorted(allocatable_rate, key=lambda idx: allocatable_rate[idx], reverse=True)
  662. def sort_selected_workers_by_gpu_type_and_resource(
  663. workers: List[Worker],
  664. selected_gpu_indexes_by_gpu_type_and_worker: Dict[str, Dict[str, List[int]]],
  665. get_worker_allocatable_resource: Callable[[Worker, Optional[str]], Allocatable],
  666. ) -> Dict[str, List[Worker]]:
  667. """
  668. Filter and sort selected workers by their GPU resource availability.
  669. This function selects workers whose names are in `selected_gpu_workers`,
  670. then sorts their GPU devices by allocatable VRAM rate (ascending),
  671. and finally sorts the workers by the number of GPUs (descending).
  672. """
  673. selected_workers_by_gpu_type = {
  674. gpu_type: [] for gpu_type in selected_gpu_indexes_by_gpu_type_and_worker.keys()
  675. }
  676. selected_gpu_types = list(selected_gpu_indexes_by_gpu_type_and_worker.keys())
  677. for gpu_type in selected_gpu_types:
  678. for worker in workers:
  679. # Skip invalid
  680. selected_gpu_indexes = selected_gpu_indexes_by_gpu_type_and_worker.get(
  681. gpu_type, {}
  682. ).get(worker.name)
  683. if (
  684. not worker.status
  685. or not worker.status.gpu_devices
  686. or not selected_gpu_indexes
  687. ):
  688. continue
  689. # Sort selected GPUs by allocatable rate
  690. allocatable = get_worker_allocatable_resource(worker, gpu_type)
  691. sorted_gpu_indexes = [
  692. idx
  693. for idx in sort_gpu_indexes_by_allocatable_rate(
  694. worker, allocatable.vram
  695. )
  696. if idx in selected_gpu_indexes
  697. ]
  698. sorted_gpus = [
  699. gpu
  700. for idx in sorted_gpu_indexes
  701. for gpu in worker.status.gpu_devices
  702. if gpu.index == idx and (gpu_type is None or gpu.type == gpu_type)
  703. ]
  704. # Create a copy of the worker with sorted GPUs
  705. w = worker.model_copy(deep=True)
  706. w.status.gpu_devices = sorted_gpus
  707. selected_workers_by_gpu_type[gpu_type].append(w)
  708. # Sort workers by GPU count
  709. for gpu_type in selected_workers_by_gpu_type:
  710. sort_workers_by_gpu_count(selected_workers_by_gpu_type[gpu_type])
  711. return selected_workers_by_gpu_type