| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838 |
- import asyncio
- import logging
- import os
- from typing import Any, Callable, Dict, List, Optional
- from sqlmodel.ext.asyncio.session import AsyncSession
- from gpustack.client.worker_filesystem_client import WorkerFilesystemClient
- from gpustack.policies.base import (
- Allocatable,
- Allocated,
- )
- from gpustack.scheduler.calculator import calculate_local_model_weight_size
- from gpustack.schemas.models import (
- ModelInstance,
- Model,
- CategoryEnum,
- SourceEnum,
- is_llm_model,
- )
- from gpustack.schemas.model_files import ModelFileStateEnum
- from gpustack.schemas.workers import Worker, GPUDevicesStatus, GPUDeviceStatus
- from pydantic import BaseModel
- from gpustack.server.services import ModelFileService
- from gpustack.utils.hub import get_model_weight_size, get_diffusion_model_weight_size
- logger = logging.getLogger(__name__)
- class WorkerGPUInfo(BaseModel):
- """
- Data structure to represent a GPU device with its associated worker information.
- """
- worker_id: int
- worker_name: str
- gpu_device: GPUDeviceStatus
- allocatable_vram: int # in bytes
- def get_worker_allocatable_resource( # noqa: C901
- all_model_instances: List[ModelInstance],
- worker: Worker,
- gpu_type: Optional[str] = None,
- ) -> Allocatable:
- """
- Get the worker with the latest allocatable resources, if gpu_type is provided, only consider the GPUs of that type.
- """
- def update_allocated_vram(allocated, resource_claim):
- for gpu_index, vram in resource_claim.vram.items():
- allocated.vram[gpu_index] = allocated.vram.get(gpu_index, 0) + vram
- is_unified_memory = worker.status.memory.is_unified_memory
- model_instances = get_worker_model_instances(all_model_instances, worker)
- allocated = Allocated(ram=0, vram={})
- for model_instance in model_instances:
- # Handle resource allocation for main worker
- if model_instance.worker_id == worker.id and (
- gpu_type is None
- or model_instance.gpu_type is None
- or model_instance.gpu_type == gpu_type
- ):
- allocated.ram += model_instance.computed_resource_claim.ram or 0
- if model_instance.gpu_indexes:
- update_allocated_vram(allocated, model_instance.computed_resource_claim)
- # Handle resource allocation for subordinate workers
- if (
- model_instance.distributed_servers
- and model_instance.distributed_servers.subordinate_workers
- ):
- for (
- subordinate_worker
- ) in model_instance.distributed_servers.subordinate_workers:
- if subordinate_worker.worker_id != worker.id:
- continue
- if subordinate_worker.computed_resource_claim and (
- gpu_type is None or model_instance.gpu_type == gpu_type
- ):
- # rpc server only consider the vram
- update_allocated_vram(
- allocated, subordinate_worker.computed_resource_claim
- )
- allocatable = Allocatable(ram=0, vram={})
- if worker.status.gpu_devices:
- for _, gpu in enumerate(worker.status.gpu_devices):
- gpu_index = gpu.index
- if (
- gpu.memory is None
- or gpu.memory.total is None
- or (gpu_type is not None and gpu.type != gpu_type)
- ):
- continue
- allocatable_vram = max(
- (
- gpu.memory.total
- - allocated.vram.get(gpu_index, 0)
- - worker.system_reserved.vram
- ),
- 0,
- )
- allocatable.vram[gpu_index] = allocatable_vram
- allocatable.ram = max(
- (worker.status.memory.total - allocated.ram - worker.system_reserved.ram), 0
- )
- if is_unified_memory:
- allocatable.ram = max(
- allocatable.ram
- - worker.system_reserved.vram
- - sum(allocated.vram.values()),
- 0,
- )
- # For UMA, we need to set the gpu memory to the minimum of
- # the calculated with max allow gpu memory and the allocatable memory.
- if allocatable.vram:
- allocatable.vram[0] = min(allocatable.ram, allocatable.vram[0])
- logger.debug(
- f"Worker {worker.name} gpu_type {gpu_type}, "
- f"reserved memory: {worker.system_reserved.ram}, "
- f"reserved gpu memory: {worker.system_reserved.vram}, "
- f"allocatable memory: {allocatable.ram}, "
- f"allocatable gpu memory: {allocatable.vram}"
- )
- return allocatable
- def group_gpu_devices_by_memory(
- gpu_devices: GPUDevicesStatus,
- ) -> List[GPUDevicesStatus]:
- """
- Group GPU devices by allocatable memory size with the constraint that the minimum
- allocatable GPU memory in each group should not be less than 0.9 times the
- allocatable memory of other GPUs in the same group.
- Args:
- gpu_devices: List of GPU device information
- Returns:
- List of GPU device groups, where each group is a list of GPU devices
- Example:
- If we have GPUs with allocatable memory [8GB, 8.5GB, 16GB, 16.5GB, 32GB],
- they might be grouped as:
- - Group 1: [8GB, 8.5GB] (8GB >= 8.5GB * 0.9 = 7.65GB)
- - Group 2: [16GB, 16.5GB] (16GB >= 16.5GB * 0.9 = 14.85GB)
- - Group 3: [32GB]
- """
- if not gpu_devices:
- return []
- def get_allocatable_memory(gpu: GPUDeviceStatus) -> Optional[int]:
- """Calculate allocatable memory (total - allocated)"""
- if not gpu.memory or gpu.memory.total is None:
- return None
- allocated = gpu.memory.allocated or 0
- return gpu.memory.total - allocated
- # Filter out GPUs without valid memory information
- valid_gpus = []
- for gpu in gpu_devices:
- allocatable_memory = get_allocatable_memory(gpu)
- if allocatable_memory is not None and allocatable_memory > 0:
- valid_gpus.append(gpu)
- if not valid_gpus:
- return []
- # Sort GPUs by allocatable memory size (ascending order)
- sorted_gpus = sorted(valid_gpus, key=lambda gpu: get_allocatable_memory(gpu))
- groups = []
- current_group = []
- for gpu in sorted_gpus:
- if not current_group:
- # Start a new group
- current_group = [gpu]
- else:
- # Check if this GPU can be added to the current group
- min_allocatable_memory = get_allocatable_memory(
- current_group[0]
- ) # First GPU has minimum allocatable memory
- current_gpu_allocatable_memory = get_allocatable_memory(gpu)
- # Check if min_allocatable_memory >= current_gpu_allocatable_memory * 0.9
- # This ensures the minimum allocatable memory is not less than 0.9 times any other allocatable memory in the group
- if min_allocatable_memory >= current_gpu_allocatable_memory * 0.9:
- current_group.append(gpu)
- else:
- # Cannot add to current group, start a new group
- groups.append(current_group)
- current_group = [gpu]
- # Add the last group
- if current_group:
- groups.append(current_group)
- return groups
- def group_workers_by_gpu_type(workers: List[Worker]) -> Dict[str, List[Worker]]:
- """
- Group workers by their GPU types.
- Args:
- workers: List of workers containing GPU devices
- Returns:
- Dictionary mapping GPU type to list of workers that with gpus of that type
- """
- gpu_type_to_workers: Dict[str, List[Worker]] = {}
- for worker in workers:
- if not worker.status or not worker.status.gpu_devices:
- gpu_type_to_workers.setdefault(None, []).append(worker)
- continue
- # Collect unique GPU types for this worker
- gpus: Dict[str, GPUDevicesStatus] = {}
- for gpu in worker.status.gpu_devices:
- gpus.setdefault(gpu.type, []).append(gpu)
- # Add worker to each GPU type group
- for gpu_type in gpus.keys():
- w = worker.model_copy()
- w.status.gpu_devices = gpus[gpu_type]
- gpu_type_to_workers.setdefault(gpu_type, []).append(w)
- return gpu_type_to_workers
- def get_vram_claim_from_model_env(model: Model) -> Optional[int]:
- """
- Get the VRAM claim from model environment variable 'GPUSTACK_MODEL_VRAM_CLAIM' if set.
- """
- if model.env and 'GPUSTACK_MODEL_VRAM_CLAIM' in model.env:
- try:
- return int(model.env['GPUSTACK_MODEL_VRAM_CLAIM'])
- except ValueError:
- logger.warning(
- f"Invalid VRAM claim value for model {model.name}: {model.env['GPUSTACK_MODEL_VRAM_CLAIM']}"
- )
- return None
- async def _get_cached_model_size(
- session: Optional[AsyncSession],
- model: Model,
- ) -> Optional[int]:
- """
- Get cached file size from downloaded ModelFile.
- """
- if not session:
- return None
- source_index = model.model_source_index
- if not source_index:
- return None
- model_files = await ModelFileService(session).get_by_source_index(source_index)
- if not model_files:
- return None
- # Find READY files with size
- for mf in model_files:
- if mf.state == ModelFileStateEnum.READY and mf.size:
- logger.info(f"Using cached size {mf.size} from ModelFile {mf.id}")
- return mf.size
- return None
- async def estimate_model_vram(
- model: Model,
- token: Optional[str] = None,
- workers: Optional[List[Worker]] = None,
- session: Optional[AsyncSession] = None,
- ) -> int:
- """
- Estimate the vram requirement in bytes heuristically.
- This is the minimum requirement to help us decide how many GPUs are needed for the model.
- If users explicitly set parameters like tp & pp, this estimation is not needed.
- Formula for Diffusion (Image) models:
- VRAM = WEIGHT_SIZE
- Formula for LLM models:
- VRAM = WEIGHT_SIZE * 1.2 + RESERVED_FOOTPRINT
- Reference for the 20% overhead: https://blog.eleuther.ai/transformer-math/#total-inference-memory
- For example, using bfloat16,
- - 0.5B requires 3.1 GiB
- - 3B requires 8.9 GiB
- - 7B requires 19.0 GiB
- - 72B requires 164.5 GiB
- """
- env_vram_claim = get_vram_claim_from_model_env(model)
- if env_vram_claim is not None:
- # Use as a potential workaround if the empirical vram estimation is far beyond the expected value.
- return env_vram_claim
- weight_size = 0
- timeout_in_seconds = 15
- try:
- if model.categories and CategoryEnum.IMAGE in model.categories:
- weight_size = await asyncio.wait_for(
- estimate_diffusion_model_vram(model, token, workers, session),
- timeout=timeout_in_seconds,
- )
- return weight_size
- elif (
- model.source == SourceEnum.HUGGING_FACE
- or model.source == SourceEnum.MODEL_SCOPE
- ):
- weight_size = await asyncio.wait_for(
- asyncio.to_thread(get_model_weight_size, model, token),
- timeout=timeout_in_seconds,
- )
- elif model.source == SourceEnum.LOCAL_PATH:
- # Try to get cached size from ModelFile first
- cached_size = await _get_cached_model_size(session, model)
- if cached_size:
- weight_size = cached_size
- else:
- # Fall back to querying workers
- weight_size = await get_local_model_weight_size(
- model.local_path, workers, is_diffusion=False
- )
- except asyncio.TimeoutError:
- logger.warning(f"Timeout when getting weight size for model {model.name}")
- except Exception as e:
- logger.warning(f"Cannot get weight size for model {model.name}: {e}")
- # Reference: https://blog.eleuther.ai/transformer-math/#total-inference-memory
- activation_overhead_factor = 1.2
- if model.categories and CategoryEnum.TEXT_TO_SPEECH in model.categories:
- # Emperical factor base on Qwen3-TTS
- activation_overhead_factor = 3
- # CUDA graphs can take additional 1~3 GiB memory
- # https://github.com/vllm-project/vllm/blob/v0.6.1/vllm/worker/model_runner.py#L1313
- # For non-LLM models like embedding, set a smaller overhead
- framework_overhead = 2 * 1024**3 if is_llm_model(model) else 512 * 1024**2
- return int(weight_size * activation_overhead_factor) + framework_overhead
- async def estimate_diffusion_model_vram(
- model: Model,
- token: Optional[str] = None,
- workers: Optional[List[Worker]] = None,
- session: Optional[AsyncSession] = None,
- ) -> int:
- """ """
- if model.env and 'GPUSTACK_MODEL_VRAM_CLAIM' in model.env:
- # Use as a potential workaround if the empirical vram estimation is far beyond the expected value.
- return int(model.env['GPUSTACK_MODEL_VRAM_CLAIM'])
- weight_size = 0
- timeout_in_seconds = 15
- try:
- if (
- model.source == SourceEnum.HUGGING_FACE
- or model.source == SourceEnum.MODEL_SCOPE
- ):
- weight_size = await asyncio.wait_for(
- asyncio.to_thread(get_diffusion_model_weight_size, model, token),
- timeout=timeout_in_seconds,
- )
- elif model.source == SourceEnum.LOCAL_PATH:
- # Try to get cached size from ModelFile first
- cached_size = await _get_cached_model_size(session, model)
- if cached_size:
- weight_size = cached_size
- else:
- # Fall back to querying workers with is_diffusion=True
- weight_size = await get_local_model_weight_size(
- model.local_path, workers, is_diffusion=True
- )
- except asyncio.TimeoutError:
- logger.warning(f"Timeout when getting weight size for model {model.name}")
- except Exception as e:
- logger.warning(f"Cannot get weight size for model {model.name}: {e}")
- return weight_size
- def get_worker_model_instances(
- all_model_instances: List[ModelInstance], worker: Worker
- ) -> List[ModelInstance]:
- """
- Get all model instances related to the worker, including:
- 1. Model instances assigned to this worker (main worker)
- 2. Model instances that use this worker as a subordinate worker in distributed inference
- """
- # Filter to get only the relevant instances:
- # 1. Instances assigned to this worker (main worker)
- # 2. Instances that use this worker as a subordinate worker
- relevant_instances = []
- for model_instance in all_model_instances:
- # Check if this is a main worker instance
- if model_instance.worker_id == worker.id:
- relevant_instances.append(model_instance)
- # Check if this worker is used as a subordinate worker
- elif (
- model_instance.distributed_servers
- and model_instance.distributed_servers.subordinate_workers
- ):
- for (
- subordinate_worker
- ) in model_instance.distributed_servers.subordinate_workers:
- if subordinate_worker.worker_id == worker.id:
- relevant_instances.append(model_instance)
- break
- return relevant_instances
- class ListMessageBuilder:
- def __init__(self, messages: Optional[str | List[str]]):
- if not messages:
- self._messages = []
- self._messages = messages if isinstance(messages, list) else [messages]
- def append(self, message: str):
- self._messages.append(message)
- def extend(self, message: List[str]):
- self._messages.extend(message)
- def __str__(self) -> str:
- return "\n".join([f"- {line}" for line in self._messages])
- def get_model_num_attention_heads(pretrained_config) -> Optional[int]:
- """
- Get the number of attention heads in the model.
- Priority: llm_config > text_config > root-level setting > thinker_config.text_config
- """
- num_attention_heads = None
- try:
- # Helper to get num_attention_heads from config
- def get_heads_from(cfg, key="num_attention_heads"):
- value = getattr(cfg, key, None)
- if isinstance(value, int) and value > 0:
- return value
- return None
- thinker_config = getattr(pretrained_config, "thinker_config", None)
- thinker_text_config = (
- getattr(thinker_config, "text_config", None) if thinker_config else None
- )
- configs_by_priority = [
- getattr(pretrained_config, "llm_config", None),
- getattr(pretrained_config, "text_config", None),
- pretrained_config,
- thinker_text_config,
- ]
- for config in configs_by_priority:
- if not config:
- continue
- heads = get_heads_from(config)
- if heads is not None:
- num_attention_heads = heads
- break
- except Exception as e:
- logger.warning(f"Cannot get num_attention_heads: {e}")
- return num_attention_heads
- def _get_config_value(config: Any, key: str) -> Any:
- if config is None:
- return None
- if isinstance(config, dict):
- return config.get(key)
- return getattr(config, key, None)
- def _get_config_int(config: Any, key: str) -> Optional[int]:
- value = _get_config_value(config, key)
- if type(value) is int:
- return value
- return None
- def get_model_vision_num_attention_heads(pretrained_config: Any) -> Optional[int]:
- """
- Get total vision attention heads used for TP divisibility check.
- Priority for raw heads: num_attention_heads > num_heads.
- The final value is:
- total_vision_heads = raw_heads + max(num_dummy_heads, 0)
- """
- try:
- vision_config = _get_config_value(pretrained_config, "vision_config")
- if not vision_config:
- return None
- num_heads = _get_config_int(vision_config, "num_attention_heads")
- if num_heads is None:
- num_heads = _get_config_int(vision_config, "num_heads")
- if num_heads is None or num_heads <= 0:
- return None
- num_dummy_heads = _get_config_int(vision_config, "num_dummy_heads") or 0
- return num_heads + max(num_dummy_heads, 0)
- except Exception as e:
- logger.warning(f"Cannot get vision num_attention_heads: {e}")
- return None
- async def get_local_model_weight_size(
- local_path: str,
- workers: Optional[List[Worker]] = None,
- is_diffusion: bool = False,
- ) -> int:
- """
- Get the local model weight size in bytes.
- If the model exists locally (on the server), calculate it locally.
- Otherwise, if workers are provided, check if the model exists on any worker and get the size from there.
- Args:
- local_path: Path to the model directory
- workers: Optional list of workers to check
- is_diffusion: Whether this is a diffusion model (default: False)
- Returns:
- Total size in bytes
- """
- if os.path.exists(local_path):
- if not os.path.isdir(local_path):
- raise NotADirectoryError(
- f"The specified path '{local_path}' is not a directory."
- )
- try:
- # Use utility function to calculate size
- return calculate_local_model_weight_size(local_path, is_diffusion)
- except Exception as e:
- logger.error(f"Failed to calculate size locally for {local_path}: {e}")
- raise e
- if not workers:
- raise FileNotFoundError(f"The specified path '{local_path}' does not exist.")
- async def try_get_size_from_worker(worker: Worker) -> Optional[int]:
- """Try to get model weight size from a single worker."""
- try:
- async with WorkerFilesystemClient() as fs_client:
- size = await fs_client.get_model_weight_size(worker, local_path)
- if isinstance(size, int):
- logger.info(
- f"Successfully got model weight size from worker {worker.id}: {size} bytes"
- )
- return size
- return None
- except Exception as e:
- logger.debug(
- f"Failed to get model weight size from worker {worker.id}: {e}"
- )
- return None
- # Concurrently try all workers and return the first successful result
- logger.info(f"Broadcasting model weight size request to {len(workers)} workers")
- tasks = [try_get_size_from_worker(worker) for worker in workers]
- # Use as_completed to get results as they finish
- for completed_task in asyncio.as_completed(tasks):
- result = await completed_task
- if result is not None:
- return result
- def group_worker_gpu_by_memory(
- workers: List[Worker],
- model_instances: List[ModelInstance],
- ram_claim: int = 0,
- gpu_type: Optional[str] = None,
- ) -> List[List[WorkerGPUInfo]]:
- """
- Group GPU devices from multiple workers by allocatable memory size with the constraint
- that the minimum allocatable GPU memory in each group should not be less than 0.9 times
- the allocatable memory of other GPUs in the same group.
- Args:
- engine: Database engine for calculating allocatable resources
- workers: List of workers containing GPU devices
- ram_claim: RAM claim in bytes to filter out workers that do not have enough RAM
- Returns:
- List of GPU device groups, where each group is a list of WorkerGPUInfo objects
- containing worker information and GPU device details
- Example:
- If we have GPUs from different workers with allocatable memory [8GB, 8.5GB, 16GB, 16.5GB, 32GB],
- they might be grouped as:
- - Group 1: [WorkerGPUInfo(worker1, gpu1, 8GB), WorkerGPUInfo(worker2, gpu2, 8.5GB)]
- - Group 2: [WorkerGPUInfo(worker1, gpu3, 16GB), WorkerGPUInfo(worker3, gpu1, 16.5GB)]
- - Group 3: [WorkerGPUInfo(worker2, gpu4, 32GB)]
- """
- if not workers:
- return []
- # Collect all GPU devices with their worker information and allocatable VRAM
- worker_gpu_infos = []
- for worker in workers:
- if not worker.status or not worker.status.gpu_devices:
- continue
- # Get allocatable resources for this worker
- allocatable = get_worker_allocatable_resource(model_instances, worker, gpu_type)
- if ram_not_enough(ram_claim, allocatable):
- continue
- for gpu_device in worker.status.gpu_devices:
- if gpu_device.index is None:
- continue
- if not gpu_device.memory or gpu_device.memory.total == 0:
- continue
- # Get allocatable VRAM for this specific GPU
- gpu_index = gpu_device.index
- allocatable_vram = allocatable.vram.get(gpu_index, 0)
- # Only include GPUs with positive allocatable VRAM
- if allocatable_vram > 0:
- worker_gpu_info = WorkerGPUInfo(
- worker_id=worker.id,
- worker_name=worker.name,
- gpu_device=gpu_device,
- allocatable_vram=allocatable_vram,
- )
- worker_gpu_infos.append(worker_gpu_info)
- if not worker_gpu_infos:
- return []
- # Sort GPUs by allocatable VRAM size (ascending order)
- sorted_worker_gpu_infos = sorted(
- worker_gpu_infos, key=lambda info: info.allocatable_vram
- )
- groups = []
- current_group = []
- for worker_gpu_info in sorted_worker_gpu_infos:
- if not current_group:
- # Start a new group
- current_group = [worker_gpu_info]
- else:
- # Check if this GPU can be added to the current group
- min_allocatable_vram = current_group[
- 0
- ].allocatable_vram # First GPU has minimum allocatable VRAM
- current_allocatable_vram = worker_gpu_info.allocatable_vram
- # Check if min_allocatable_vram >= current_allocatable_vram * 0.9
- # This ensures the minimum allocatable VRAM is not less than 0.9 times any other allocatable VRAM in the group
- if min_allocatable_vram >= current_allocatable_vram * 0.9:
- current_group.append(worker_gpu_info)
- else:
- # Cannot add to current group, start a new group
- groups.append(current_group)
- current_group = [worker_gpu_info]
- # Add the last group
- if current_group:
- groups.append(current_group)
- return groups
- def ram_not_enough(ram_claim: int, allocatable: Allocatable) -> bool:
- """
- Check if the allocatable RAM is not enough for the claimed RAM.
- """
- if ram_claim <= 0:
- return False
- return allocatable.ram < ram_claim
- def get_model_ram_claim(model: Model) -> int:
- """
- Get the RAM requirement for the model in bytes.
- """
- extended_kv_cache = model.extended_kv_cache
- if (
- extended_kv_cache
- and extended_kv_cache.enabled
- and extended_kv_cache.ram_size
- and extended_kv_cache.ram_size > 0
- ):
- # When extended kv cache is enabled, reserve the ram for KV cache.
- return extended_kv_cache.ram_size * 1024**3
- return 0
- def get_computed_ram_claim(
- model: Model, vram_claim: Dict[int, int], static_ram: Optional[int] = None
- ) -> Optional[int]:
- """
- Get the computed RAM claim for the model based on the provided model and vram_claim.
- The priority is as follows:
- 1. If static_ram is provided, use it.
- 2. If RAM size for extended KV cache is available, use it.
- 3. If RAM ratio for extended KV cache is set and vram_claim is available, calculate RAM as ram_ratio * total_vram_claim.
- 4. If neither is available, return None.
- """
- if static_ram:
- return static_ram
- ext = model.extended_kv_cache
- if not ext or not ext.enabled:
- return None
- claim = None
- # static ram size
- if ext.ram_size and ext.ram_size > 0:
- claim = ext.ram_size * 1024**3
- # ram ratio to vram
- elif ext.ram_ratio and ext.ram_ratio > 0 and vram_claim:
- total_vram_claim = sum(vram_claim.values())
- claim = int(total_vram_claim * ext.ram_ratio)
- return claim
- def sort_workers_by_gpu_count(workers: List[Worker]):
- """
- Sort workers by the number of GPUs.
- """
- workers.sort(
- key=lambda worker: (
- len(worker.status.gpu_devices)
- if worker.status and worker.status.gpu_devices
- else 0
- ),
- reverse=True,
- )
- def sort_gpu_indexes_by_allocatable_rate(
- worker: Worker, allocatable: dict, gpu_type: Optional[str] = None
- ) -> List[int]:
- """
- Sort GPU indexes of a worker by allocatable VRAM rate (allocatable_vram / total_vram), ascending.
- """
- allocatable_rate = {
- gpu.index: (
- allocatable.get(gpu.index, 0) / gpu.memory.total
- if gpu.memory
- and gpu.memory.total
- and (gpu_type is None or gpu.type == gpu_type)
- else 0
- )
- for gpu in worker.status.gpu_devices
- }
- # return a list of gpu indexes sorted by allocatable rate in ascending order
- return sorted(allocatable_rate, key=lambda idx: allocatable_rate[idx], reverse=True)
- def sort_selected_workers_by_gpu_type_and_resource(
- workers: List[Worker],
- selected_gpu_indexes_by_gpu_type_and_worker: Dict[str, Dict[str, List[int]]],
- get_worker_allocatable_resource: Callable[[Worker, Optional[str]], Allocatable],
- ) -> Dict[str, List[Worker]]:
- """
- Filter and sort selected workers by their GPU resource availability.
- This function selects workers whose names are in `selected_gpu_workers`,
- then sorts their GPU devices by allocatable VRAM rate (ascending),
- and finally sorts the workers by the number of GPUs (descending).
- """
- selected_workers_by_gpu_type = {
- gpu_type: [] for gpu_type in selected_gpu_indexes_by_gpu_type_and_worker.keys()
- }
- selected_gpu_types = list(selected_gpu_indexes_by_gpu_type_and_worker.keys())
- for gpu_type in selected_gpu_types:
- for worker in workers:
- # Skip invalid
- selected_gpu_indexes = selected_gpu_indexes_by_gpu_type_and_worker.get(
- gpu_type, {}
- ).get(worker.name)
- if (
- not worker.status
- or not worker.status.gpu_devices
- or not selected_gpu_indexes
- ):
- continue
- # Sort selected GPUs by allocatable rate
- allocatable = get_worker_allocatable_resource(worker, gpu_type)
- sorted_gpu_indexes = [
- idx
- for idx in sort_gpu_indexes_by_allocatable_rate(
- worker, allocatable.vram
- )
- if idx in selected_gpu_indexes
- ]
- sorted_gpus = [
- gpu
- for idx in sorted_gpu_indexes
- for gpu in worker.status.gpu_devices
- if gpu.index == idx and (gpu_type is None or gpu.type == gpu_type)
- ]
- # Create a copy of the worker with sorted GPUs
- w = worker.model_copy(deep=True)
- w.status.gpu_devices = sorted_gpus
- selected_workers_by_gpu_type[gpu_type].append(w)
- # Sort workers by GPU count
- for gpu_type in selected_workers_by_gpu_type:
- sort_workers_by_gpu_count(selected_workers_by_gpu_type[gpu_type])
- return selected_workers_by_gpu_type
|