utils.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838
  1. import asyncio
  2. import logging
  3. import os
  4. from typing import Any, Callable, Dict, List, Optional
  5. from sqlmodel.ext.asyncio.session import AsyncSession
  6. from gpustack.client.worker_filesystem_client import WorkerFilesystemClient
  7. from gpustack.policies.base import (
  8. Allocatable,
  9. Allocated,
  10. )
  11. from gpustack.scheduler.calculator import calculate_local_model_weight_size
  12. from gpustack.schemas.models import (
  13. ModelInstance,
  14. Model,
  15. CategoryEnum,
  16. SourceEnum,
  17. is_llm_model,
  18. )
  19. from gpustack.schemas.model_files import ModelFileStateEnum
  20. from gpustack.schemas.workers import Worker, GPUDevicesStatus, GPUDeviceStatus
  21. from pydantic import BaseModel
  22. from gpustack.server.services import ModelFileService
  23. from gpustack.utils.hub import get_model_weight_size, get_diffusion_model_weight_size
  24. logger = logging.getLogger(__name__)
  25. class WorkerGPUInfo(BaseModel):
  26. """
  27. Data structure to represent a GPU device with its associated worker information.
  28. """
  29. worker_id: int
  30. worker_name: str
  31. gpu_device: GPUDeviceStatus
  32. allocatable_vram: int # in bytes
  33. def get_worker_allocatable_resource( # noqa: C901
  34. all_model_instances: List[ModelInstance],
  35. worker: Worker,
  36. gpu_type: Optional[str] = None,
  37. ) -> Allocatable:
  38. """
  39. Get the worker with the latest allocatable resources, if gpu_type is provided, only consider the GPUs of that type.
  40. """
  41. def update_allocated_vram(allocated, resource_claim):
  42. for gpu_index, vram in resource_claim.vram.items():
  43. allocated.vram[gpu_index] = allocated.vram.get(gpu_index, 0) + vram
  44. is_unified_memory = worker.status.memory.is_unified_memory
  45. model_instances = get_worker_model_instances(all_model_instances, worker)
  46. allocated = Allocated(ram=0, vram={})
  47. for model_instance in model_instances:
  48. # Handle resource allocation for main worker
  49. if model_instance.worker_id == worker.id and (
  50. gpu_type is None
  51. or model_instance.gpu_type is None
  52. or model_instance.gpu_type == gpu_type
  53. ):
  54. allocated.ram += model_instance.computed_resource_claim.ram or 0
  55. if model_instance.gpu_indexes:
  56. update_allocated_vram(allocated, model_instance.computed_resource_claim)
  57. # Handle resource allocation for subordinate workers
  58. if (
  59. model_instance.distributed_servers
  60. and model_instance.distributed_servers.subordinate_workers
  61. ):
  62. for (
  63. subordinate_worker
  64. ) in model_instance.distributed_servers.subordinate_workers:
  65. if subordinate_worker.worker_id != worker.id:
  66. continue
  67. if subordinate_worker.computed_resource_claim and (
  68. gpu_type is None or model_instance.gpu_type == gpu_type
  69. ):
  70. # rpc server only consider the vram
  71. update_allocated_vram(
  72. allocated, subordinate_worker.computed_resource_claim
  73. )
  74. allocatable = Allocatable(ram=0, vram={})
  75. if worker.status.gpu_devices:
  76. for _, gpu in enumerate(worker.status.gpu_devices):
  77. gpu_index = gpu.index
  78. if (
  79. gpu.memory is None
  80. or gpu.memory.total is None
  81. or (gpu_type is not None and gpu.type != gpu_type)
  82. ):
  83. continue
  84. allocatable_vram = max(
  85. (
  86. gpu.memory.total
  87. - allocated.vram.get(gpu_index, 0)
  88. - worker.system_reserved.vram
  89. ),
  90. 0,
  91. )
  92. allocatable.vram[gpu_index] = allocatable_vram
  93. allocatable.ram = max(
  94. (worker.status.memory.total - allocated.ram - worker.system_reserved.ram), 0
  95. )
  96. if is_unified_memory:
  97. allocatable.ram = max(
  98. allocatable.ram
  99. - worker.system_reserved.vram
  100. - sum(allocated.vram.values()),
  101. 0,
  102. )
  103. # For UMA, we need to set the gpu memory to the minimum of
  104. # the calculated with max allow gpu memory and the allocatable memory.
  105. if allocatable.vram:
  106. allocatable.vram[0] = min(allocatable.ram, allocatable.vram[0])
  107. logger.debug(
  108. f"Worker {worker.name} gpu_type {gpu_type}, "
  109. f"reserved memory: {worker.system_reserved.ram}, "
  110. f"reserved gpu memory: {worker.system_reserved.vram}, "
  111. f"allocatable memory: {allocatable.ram}, "
  112. f"allocatable gpu memory: {allocatable.vram}"
  113. )
  114. return allocatable
  115. def group_gpu_devices_by_memory(
  116. gpu_devices: GPUDevicesStatus,
  117. ) -> List[GPUDevicesStatus]:
  118. """
  119. Group GPU devices by allocatable memory size with the constraint that the minimum
  120. allocatable GPU memory in each group should not be less than 0.9 times the
  121. allocatable memory of other GPUs in the same group.
  122. Args:
  123. gpu_devices: List of GPU device information
  124. Returns:
  125. List of GPU device groups, where each group is a list of GPU devices
  126. Example:
  127. If we have GPUs with allocatable memory [8GB, 8.5GB, 16GB, 16.5GB, 32GB],
  128. they might be grouped as:
  129. - Group 1: [8GB, 8.5GB] (8GB >= 8.5GB * 0.9 = 7.65GB)
  130. - Group 2: [16GB, 16.5GB] (16GB >= 16.5GB * 0.9 = 14.85GB)
  131. - Group 3: [32GB]
  132. """
  133. if not gpu_devices:
  134. return []
  135. def get_allocatable_memory(gpu: GPUDeviceStatus) -> Optional[int]:
  136. """Calculate allocatable memory (total - allocated)"""
  137. if not gpu.memory or gpu.memory.total is None:
  138. return None
  139. allocated = gpu.memory.allocated or 0
  140. return gpu.memory.total - allocated
  141. # Filter out GPUs without valid memory information
  142. valid_gpus = []
  143. for gpu in gpu_devices:
  144. allocatable_memory = get_allocatable_memory(gpu)
  145. if allocatable_memory is not None and allocatable_memory > 0:
  146. valid_gpus.append(gpu)
  147. if not valid_gpus:
  148. return []
  149. # Sort GPUs by allocatable memory size (ascending order)
  150. sorted_gpus = sorted(valid_gpus, key=lambda gpu: get_allocatable_memory(gpu))
  151. groups = []
  152. current_group = []
  153. for gpu in sorted_gpus:
  154. if not current_group:
  155. # Start a new group
  156. current_group = [gpu]
  157. else:
  158. # Check if this GPU can be added to the current group
  159. min_allocatable_memory = get_allocatable_memory(
  160. current_group[0]
  161. ) # First GPU has minimum allocatable memory
  162. current_gpu_allocatable_memory = get_allocatable_memory(gpu)
  163. # Check if min_allocatable_memory >= current_gpu_allocatable_memory * 0.9
  164. # This ensures the minimum allocatable memory is not less than 0.9 times any other allocatable memory in the group
  165. if min_allocatable_memory >= current_gpu_allocatable_memory * 0.9:
  166. current_group.append(gpu)
  167. else:
  168. # Cannot add to current group, start a new group
  169. groups.append(current_group)
  170. current_group = [gpu]
  171. # Add the last group
  172. if current_group:
  173. groups.append(current_group)
  174. return groups
  175. def group_workers_by_gpu_type(workers: List[Worker]) -> Dict[str, List[Worker]]:
  176. """
  177. Group workers by their GPU types.
  178. Args:
  179. workers: List of workers containing GPU devices
  180. Returns:
  181. Dictionary mapping GPU type to list of workers that with gpus of that type
  182. """
  183. gpu_type_to_workers: Dict[str, List[Worker]] = {}
  184. for worker in workers:
  185. if not worker.status or not worker.status.gpu_devices:
  186. gpu_type_to_workers.setdefault(None, []).append(worker)
  187. continue
  188. # Collect unique GPU types for this worker
  189. gpus: Dict[str, GPUDevicesStatus] = {}
  190. for gpu in worker.status.gpu_devices:
  191. gpus.setdefault(gpu.type, []).append(gpu)
  192. # Add worker to each GPU type group
  193. for gpu_type in gpus.keys():
  194. w = worker.model_copy()
  195. w.status.gpu_devices = gpus[gpu_type]
  196. gpu_type_to_workers.setdefault(gpu_type, []).append(w)
  197. return gpu_type_to_workers
  198. def get_vram_claim_from_model_env(model: Model) -> Optional[int]:
  199. """
  200. Get the VRAM claim from model environment variable 'GPUSTACK_MODEL_VRAM_CLAIM' if set.
  201. """
  202. if model.env and 'GPUSTACK_MODEL_VRAM_CLAIM' in model.env:
  203. try:
  204. return int(model.env['GPUSTACK_MODEL_VRAM_CLAIM'])
  205. except ValueError:
  206. logger.warning(
  207. f"Invalid VRAM claim value for model {model.name}: {model.env['GPUSTACK_MODEL_VRAM_CLAIM']}"
  208. )
  209. return None
  210. async def _get_cached_model_size(
  211. session: Optional[AsyncSession],
  212. model: Model,
  213. ) -> Optional[int]:
  214. """
  215. Get cached file size from downloaded ModelFile.
  216. """
  217. if not session:
  218. return None
  219. source_index = model.model_source_index
  220. if not source_index:
  221. return None
  222. model_files = await ModelFileService(session).get_by_source_index(source_index)
  223. if not model_files:
  224. return None
  225. # Find READY files with size
  226. for mf in model_files:
  227. if mf.state == ModelFileStateEnum.READY and mf.size:
  228. logger.info(f"Using cached size {mf.size} from ModelFile {mf.id}")
  229. return mf.size
  230. return None
  231. async def estimate_model_vram(
  232. model: Model,
  233. token: Optional[str] = None,
  234. workers: Optional[List[Worker]] = None,
  235. session: Optional[AsyncSession] = None,
  236. ) -> int:
  237. """
  238. Estimate the vram requirement in bytes heuristically.
  239. This is the minimum requirement to help us decide how many GPUs are needed for the model.
  240. If users explicitly set parameters like tp & pp, this estimation is not needed.
  241. Formula for Diffusion (Image) models:
  242. VRAM = WEIGHT_SIZE
  243. Formula for LLM models:
  244. VRAM = WEIGHT_SIZE * 1.2 + RESERVED_FOOTPRINT
  245. Reference for the 20% overhead: https://blog.eleuther.ai/transformer-math/#total-inference-memory
  246. For example, using bfloat16,
  247. - 0.5B requires 3.1 GiB
  248. - 3B requires 8.9 GiB
  249. - 7B requires 19.0 GiB
  250. - 72B requires 164.5 GiB
  251. """
  252. env_vram_claim = get_vram_claim_from_model_env(model)
  253. if env_vram_claim is not None:
  254. # Use as a potential workaround if the empirical vram estimation is far beyond the expected value.
  255. return env_vram_claim
  256. weight_size = 0
  257. timeout_in_seconds = 15
  258. try:
  259. if model.categories and CategoryEnum.IMAGE in model.categories:
  260. weight_size = await asyncio.wait_for(
  261. estimate_diffusion_model_vram(model, token, workers, session),
  262. timeout=timeout_in_seconds,
  263. )
  264. return weight_size
  265. elif (
  266. model.source == SourceEnum.HUGGING_FACE
  267. or model.source == SourceEnum.MODEL_SCOPE
  268. ):
  269. weight_size = await asyncio.wait_for(
  270. asyncio.to_thread(get_model_weight_size, model, token),
  271. timeout=timeout_in_seconds,
  272. )
  273. elif model.source == SourceEnum.LOCAL_PATH:
  274. # Try to get cached size from ModelFile first
  275. cached_size = await _get_cached_model_size(session, model)
  276. if cached_size:
  277. weight_size = cached_size
  278. else:
  279. # Fall back to querying workers
  280. weight_size = await get_local_model_weight_size(
  281. model.local_path, workers, is_diffusion=False
  282. )
  283. except asyncio.TimeoutError:
  284. logger.warning(f"Timeout when getting weight size for model {model.name}")
  285. except Exception as e:
  286. logger.warning(f"Cannot get weight size for model {model.name}: {e}")
  287. # Reference: https://blog.eleuther.ai/transformer-math/#total-inference-memory
  288. activation_overhead_factor = 1.2
  289. if model.categories and CategoryEnum.TEXT_TO_SPEECH in model.categories:
  290. # Emperical factor base on Qwen3-TTS
  291. activation_overhead_factor = 3
  292. # CUDA graphs can take additional 1~3 GiB memory
  293. # https://github.com/vllm-project/vllm/blob/v0.6.1/vllm/worker/model_runner.py#L1313
  294. # For non-LLM models like embedding, set a smaller overhead
  295. framework_overhead = 2 * 1024**3 if is_llm_model(model) else 512 * 1024**2
  296. return int(weight_size * activation_overhead_factor) + framework_overhead
  297. async def estimate_diffusion_model_vram(
  298. model: Model,
  299. token: Optional[str] = None,
  300. workers: Optional[List[Worker]] = None,
  301. session: Optional[AsyncSession] = None,
  302. ) -> int:
  303. """ """
  304. if model.env and 'GPUSTACK_MODEL_VRAM_CLAIM' in model.env:
  305. # Use as a potential workaround if the empirical vram estimation is far beyond the expected value.
  306. return int(model.env['GPUSTACK_MODEL_VRAM_CLAIM'])
  307. weight_size = 0
  308. timeout_in_seconds = 15
  309. try:
  310. if (
  311. model.source == SourceEnum.HUGGING_FACE
  312. or model.source == SourceEnum.MODEL_SCOPE
  313. ):
  314. weight_size = await asyncio.wait_for(
  315. asyncio.to_thread(get_diffusion_model_weight_size, model, token),
  316. timeout=timeout_in_seconds,
  317. )
  318. elif model.source == SourceEnum.LOCAL_PATH:
  319. # Try to get cached size from ModelFile first
  320. cached_size = await _get_cached_model_size(session, model)
  321. if cached_size:
  322. weight_size = cached_size
  323. else:
  324. # Fall back to querying workers with is_diffusion=True
  325. weight_size = await get_local_model_weight_size(
  326. model.local_path, workers, is_diffusion=True
  327. )
  328. except asyncio.TimeoutError:
  329. logger.warning(f"Timeout when getting weight size for model {model.name}")
  330. except Exception as e:
  331. logger.warning(f"Cannot get weight size for model {model.name}: {e}")
  332. return weight_size
  333. def get_worker_model_instances(
  334. all_model_instances: List[ModelInstance], worker: Worker
  335. ) -> List[ModelInstance]:
  336. """
  337. Get all model instances related to the worker, including:
  338. 1. Model instances assigned to this worker (main worker)
  339. 2. Model instances that use this worker as a subordinate worker in distributed inference
  340. """
  341. # Filter to get only the relevant instances:
  342. # 1. Instances assigned to this worker (main worker)
  343. # 2. Instances that use this worker as a subordinate worker
  344. relevant_instances = []
  345. for model_instance in all_model_instances:
  346. # Check if this is a main worker instance
  347. if model_instance.worker_id == worker.id:
  348. relevant_instances.append(model_instance)
  349. # Check if this worker is used as a subordinate worker
  350. elif (
  351. model_instance.distributed_servers
  352. and model_instance.distributed_servers.subordinate_workers
  353. ):
  354. for (
  355. subordinate_worker
  356. ) in model_instance.distributed_servers.subordinate_workers:
  357. if subordinate_worker.worker_id == worker.id:
  358. relevant_instances.append(model_instance)
  359. break
  360. return relevant_instances
  361. class ListMessageBuilder:
  362. def __init__(self, messages: Optional[str | List[str]]):
  363. if not messages:
  364. self._messages = []
  365. self._messages = messages if isinstance(messages, list) else [messages]
  366. def append(self, message: str):
  367. self._messages.append(message)
  368. def extend(self, message: List[str]):
  369. self._messages.extend(message)
  370. def __str__(self) -> str:
  371. return "\n".join([f"- {line}" for line in self._messages])
  372. def get_model_num_attention_heads(pretrained_config) -> Optional[int]:
  373. """
  374. Get the number of attention heads in the model.
  375. Priority: llm_config > text_config > root-level setting > thinker_config.text_config
  376. """
  377. num_attention_heads = None
  378. try:
  379. # Helper to get num_attention_heads from config
  380. def get_heads_from(cfg, key="num_attention_heads"):
  381. value = getattr(cfg, key, None)
  382. if isinstance(value, int) and value > 0:
  383. return value
  384. return None
  385. thinker_config = getattr(pretrained_config, "thinker_config", None)
  386. thinker_text_config = (
  387. getattr(thinker_config, "text_config", None) if thinker_config else None
  388. )
  389. configs_by_priority = [
  390. getattr(pretrained_config, "llm_config", None),
  391. getattr(pretrained_config, "text_config", None),
  392. pretrained_config,
  393. thinker_text_config,
  394. ]
  395. for config in configs_by_priority:
  396. if not config:
  397. continue
  398. heads = get_heads_from(config)
  399. if heads is not None:
  400. num_attention_heads = heads
  401. break
  402. except Exception as e:
  403. logger.warning(f"Cannot get num_attention_heads: {e}")
  404. return num_attention_heads
  405. def _get_config_value(config: Any, key: str) -> Any:
  406. if config is None:
  407. return None
  408. if isinstance(config, dict):
  409. return config.get(key)
  410. return getattr(config, key, None)
  411. def _get_config_int(config: Any, key: str) -> Optional[int]:
  412. value = _get_config_value(config, key)
  413. if type(value) is int:
  414. return value
  415. return None
  416. def get_model_vision_num_attention_heads(pretrained_config: Any) -> Optional[int]:
  417. """
  418. Get total vision attention heads used for TP divisibility check.
  419. Priority for raw heads: num_attention_heads > num_heads.
  420. The final value is:
  421. total_vision_heads = raw_heads + max(num_dummy_heads, 0)
  422. """
  423. try:
  424. vision_config = _get_config_value(pretrained_config, "vision_config")
  425. if not vision_config:
  426. return None
  427. num_heads = _get_config_int(vision_config, "num_attention_heads")
  428. if num_heads is None:
  429. num_heads = _get_config_int(vision_config, "num_heads")
  430. if num_heads is None or num_heads <= 0:
  431. return None
  432. num_dummy_heads = _get_config_int(vision_config, "num_dummy_heads") or 0
  433. return num_heads + max(num_dummy_heads, 0)
  434. except Exception as e:
  435. logger.warning(f"Cannot get vision num_attention_heads: {e}")
  436. return None
  437. async def get_local_model_weight_size(
  438. local_path: str,
  439. workers: Optional[List[Worker]] = None,
  440. is_diffusion: bool = False,
  441. ) -> int:
  442. """
  443. Get the local model weight size in bytes.
  444. If the model exists locally (on the server), calculate it locally.
  445. Otherwise, if workers are provided, check if the model exists on any worker and get the size from there.
  446. Args:
  447. local_path: Path to the model directory
  448. workers: Optional list of workers to check
  449. is_diffusion: Whether this is a diffusion model (default: False)
  450. Returns:
  451. Total size in bytes
  452. """
  453. if os.path.exists(local_path):
  454. if not os.path.isdir(local_path):
  455. raise NotADirectoryError(
  456. f"The specified path '{local_path}' is not a directory."
  457. )
  458. try:
  459. # Use utility function to calculate size
  460. return calculate_local_model_weight_size(local_path, is_diffusion)
  461. except Exception as e:
  462. logger.error(f"Failed to calculate size locally for {local_path}: {e}")
  463. raise e
  464. if not workers:
  465. raise FileNotFoundError(f"The specified path '{local_path}' does not exist.")
  466. async def try_get_size_from_worker(worker: Worker) -> Optional[int]:
  467. """Try to get model weight size from a single worker."""
  468. try:
  469. async with WorkerFilesystemClient() as fs_client:
  470. size = await fs_client.get_model_weight_size(worker, local_path)
  471. if isinstance(size, int):
  472. logger.info(
  473. f"Successfully got model weight size from worker {worker.id}: {size} bytes"
  474. )
  475. return size
  476. return None
  477. except Exception as e:
  478. logger.debug(
  479. f"Failed to get model weight size from worker {worker.id}: {e}"
  480. )
  481. return None
  482. # Concurrently try all workers and return the first successful result
  483. logger.info(f"Broadcasting model weight size request to {len(workers)} workers")
  484. tasks = [try_get_size_from_worker(worker) for worker in workers]
  485. # Use as_completed to get results as they finish
  486. for completed_task in asyncio.as_completed(tasks):
  487. result = await completed_task
  488. if result is not None:
  489. return result
  490. def group_worker_gpu_by_memory(
  491. workers: List[Worker],
  492. model_instances: List[ModelInstance],
  493. ram_claim: int = 0,
  494. gpu_type: Optional[str] = None,
  495. ) -> List[List[WorkerGPUInfo]]:
  496. """
  497. Group GPU devices from multiple workers by allocatable memory size with the constraint
  498. that the minimum allocatable GPU memory in each group should not be less than 0.9 times
  499. the allocatable memory of other GPUs in the same group.
  500. Args:
  501. engine: Database engine for calculating allocatable resources
  502. workers: List of workers containing GPU devices
  503. ram_claim: RAM claim in bytes to filter out workers that do not have enough RAM
  504. Returns:
  505. List of GPU device groups, where each group is a list of WorkerGPUInfo objects
  506. containing worker information and GPU device details
  507. Example:
  508. If we have GPUs from different workers with allocatable memory [8GB, 8.5GB, 16GB, 16.5GB, 32GB],
  509. they might be grouped as:
  510. - Group 1: [WorkerGPUInfo(worker1, gpu1, 8GB), WorkerGPUInfo(worker2, gpu2, 8.5GB)]
  511. - Group 2: [WorkerGPUInfo(worker1, gpu3, 16GB), WorkerGPUInfo(worker3, gpu1, 16.5GB)]
  512. - Group 3: [WorkerGPUInfo(worker2, gpu4, 32GB)]
  513. """
  514. if not workers:
  515. return []
  516. # Collect all GPU devices with their worker information and allocatable VRAM
  517. worker_gpu_infos = []
  518. for worker in workers:
  519. if not worker.status or not worker.status.gpu_devices:
  520. continue
  521. # Get allocatable resources for this worker
  522. allocatable = get_worker_allocatable_resource(model_instances, worker, gpu_type)
  523. if ram_not_enough(ram_claim, allocatable):
  524. continue
  525. for gpu_device in worker.status.gpu_devices:
  526. if gpu_device.index is None:
  527. continue
  528. if not gpu_device.memory or gpu_device.memory.total == 0:
  529. continue
  530. # Get allocatable VRAM for this specific GPU
  531. gpu_index = gpu_device.index
  532. allocatable_vram = allocatable.vram.get(gpu_index, 0)
  533. # Only include GPUs with positive allocatable VRAM
  534. if allocatable_vram > 0:
  535. worker_gpu_info = WorkerGPUInfo(
  536. worker_id=worker.id,
  537. worker_name=worker.name,
  538. gpu_device=gpu_device,
  539. allocatable_vram=allocatable_vram,
  540. )
  541. worker_gpu_infos.append(worker_gpu_info)
  542. if not worker_gpu_infos:
  543. return []
  544. # Sort GPUs by allocatable VRAM size (ascending order)
  545. sorted_worker_gpu_infos = sorted(
  546. worker_gpu_infos, key=lambda info: info.allocatable_vram
  547. )
  548. groups = []
  549. current_group = []
  550. for worker_gpu_info in sorted_worker_gpu_infos:
  551. if not current_group:
  552. # Start a new group
  553. current_group = [worker_gpu_info]
  554. else:
  555. # Check if this GPU can be added to the current group
  556. min_allocatable_vram = current_group[
  557. 0
  558. ].allocatable_vram # First GPU has minimum allocatable VRAM
  559. current_allocatable_vram = worker_gpu_info.allocatable_vram
  560. # Check if min_allocatable_vram >= current_allocatable_vram * 0.9
  561. # This ensures the minimum allocatable VRAM is not less than 0.9 times any other allocatable VRAM in the group
  562. if min_allocatable_vram >= current_allocatable_vram * 0.9:
  563. current_group.append(worker_gpu_info)
  564. else:
  565. # Cannot add to current group, start a new group
  566. groups.append(current_group)
  567. current_group = [worker_gpu_info]
  568. # Add the last group
  569. if current_group:
  570. groups.append(current_group)
  571. return groups
  572. def ram_not_enough(ram_claim: int, allocatable: Allocatable) -> bool:
  573. """
  574. Check if the allocatable RAM is not enough for the claimed RAM.
  575. """
  576. if ram_claim <= 0:
  577. return False
  578. return allocatable.ram < ram_claim
  579. def get_model_ram_claim(model: Model) -> int:
  580. """
  581. Get the RAM requirement for the model in bytes.
  582. """
  583. extended_kv_cache = model.extended_kv_cache
  584. if (
  585. extended_kv_cache
  586. and extended_kv_cache.enabled
  587. and extended_kv_cache.ram_size
  588. and extended_kv_cache.ram_size > 0
  589. ):
  590. # When extended kv cache is enabled, reserve the ram for KV cache.
  591. return extended_kv_cache.ram_size * 1024**3
  592. return 0
  593. def get_computed_ram_claim(
  594. model: Model, vram_claim: Dict[int, int], static_ram: Optional[int] = None
  595. ) -> Optional[int]:
  596. """
  597. Get the computed RAM claim for the model based on the provided model and vram_claim.
  598. The priority is as follows:
  599. 1. If static_ram is provided, use it.
  600. 2. If RAM size for extended KV cache is available, use it.
  601. 3. If RAM ratio for extended KV cache is set and vram_claim is available, calculate RAM as ram_ratio * total_vram_claim.
  602. 4. If neither is available, return None.
  603. """
  604. if static_ram:
  605. return static_ram
  606. ext = model.extended_kv_cache
  607. if not ext or not ext.enabled:
  608. return None
  609. claim = None
  610. # static ram size
  611. if ext.ram_size and ext.ram_size > 0:
  612. claim = ext.ram_size * 1024**3
  613. # ram ratio to vram
  614. elif ext.ram_ratio and ext.ram_ratio > 0 and vram_claim:
  615. total_vram_claim = sum(vram_claim.values())
  616. claim = int(total_vram_claim * ext.ram_ratio)
  617. return claim
  618. def sort_workers_by_gpu_count(workers: List[Worker]):
  619. """
  620. Sort workers by the number of GPUs.
  621. """
  622. workers.sort(
  623. key=lambda worker: (
  624. len(worker.status.gpu_devices)
  625. if worker.status and worker.status.gpu_devices
  626. else 0
  627. ),
  628. reverse=True,
  629. )
  630. def sort_gpu_indexes_by_allocatable_rate(
  631. worker: Worker, allocatable: dict, gpu_type: Optional[str] = None
  632. ) -> List[int]:
  633. """
  634. Sort GPU indexes of a worker by allocatable VRAM rate (allocatable_vram / total_vram), ascending.
  635. """
  636. allocatable_rate = {
  637. gpu.index: (
  638. allocatable.get(gpu.index, 0) / gpu.memory.total
  639. if gpu.memory
  640. and gpu.memory.total
  641. and (gpu_type is None or gpu.type == gpu_type)
  642. else 0
  643. )
  644. for gpu in worker.status.gpu_devices
  645. }
  646. # return a list of gpu indexes sorted by allocatable rate in ascending order
  647. return sorted(allocatable_rate, key=lambda idx: allocatable_rate[idx], reverse=True)
  648. def sort_selected_workers_by_gpu_type_and_resource(
  649. workers: List[Worker],
  650. selected_gpu_indexes_by_gpu_type_and_worker: Dict[str, Dict[str, List[int]]],
  651. get_worker_allocatable_resource: Callable[[Worker, Optional[str]], Allocatable],
  652. ) -> Dict[str, List[Worker]]:
  653. """
  654. Filter and sort selected workers by their GPU resource availability.
  655. This function selects workers whose names are in `selected_gpu_workers`,
  656. then sorts their GPU devices by allocatable VRAM rate (ascending),
  657. and finally sorts the workers by the number of GPUs (descending).
  658. """
  659. selected_workers_by_gpu_type = {
  660. gpu_type: [] for gpu_type in selected_gpu_indexes_by_gpu_type_and_worker.keys()
  661. }
  662. selected_gpu_types = list(selected_gpu_indexes_by_gpu_type_and_worker.keys())
  663. for gpu_type in selected_gpu_types:
  664. for worker in workers:
  665. # Skip invalid
  666. selected_gpu_indexes = selected_gpu_indexes_by_gpu_type_and_worker.get(
  667. gpu_type, {}
  668. ).get(worker.name)
  669. if (
  670. not worker.status
  671. or not worker.status.gpu_devices
  672. or not selected_gpu_indexes
  673. ):
  674. continue
  675. # Sort selected GPUs by allocatable rate
  676. allocatable = get_worker_allocatable_resource(worker, gpu_type)
  677. sorted_gpu_indexes = [
  678. idx
  679. for idx in sort_gpu_indexes_by_allocatable_rate(
  680. worker, allocatable.vram
  681. )
  682. if idx in selected_gpu_indexes
  683. ]
  684. sorted_gpus = [
  685. gpu
  686. for idx in sorted_gpu_indexes
  687. for gpu in worker.status.gpu_devices
  688. if gpu.index == idx and (gpu_type is None or gpu.type == gpu_type)
  689. ]
  690. # Create a copy of the worker with sorted GPUs
  691. w = worker.model_copy(deep=True)
  692. w.status.gpu_devices = sorted_gpus
  693. selected_workers_by_gpu_type[gpu_type].append(w)
  694. # Sort workers by GPU count
  695. for gpu_type in selected_workers_by_gpu_type:
  696. sort_workers_by_gpu_count(selected_workers_by_gpu_type[gpu_type])
  697. return selected_workers_by_gpu_type