| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082 |
- from abc import ABC, abstractmethod
- from dataclasses import dataclass, fields
- import enum
- import logging
- from typing import Any, Dict, List, Optional, Tuple
- from gpustack.config import Config
- from gpustack.policies.event_recorder.recorder import EventCollector, EventLevelEnum
- from gpustack.schemas.models import (
- BackendEnum,
- CategoryEnum,
- ComputedResourceClaim,
- Model,
- ModelInstance,
- ModelInstanceSubordinateWorker,
- is_omni_model,
- )
- from gpustack.schemas.workers import Worker
- from gpustack.utils.hub import get_hf_text_config, get_max_model_len
- from gpustack.scheduler.calculator import get_pretrained_config_with_workers
- from gpustack.utils.gpu import (
- abbreviate_worker_gpu_indexes,
- group_gpu_ids_by_worker,
- group_gpu_indexes_by_gpu_type_and_worker,
- )
- from gpustack.policies.base import Allocatable, ModelInstanceScheduleCandidate
- from gpustack.policies.utils import (
- ListMessageBuilder,
- get_computed_ram_claim,
- get_model_num_attention_heads,
- get_model_vision_num_attention_heads,
- get_worker_allocatable_resource,
- get_worker_model_instances,
- sort_gpu_indexes_by_allocatable_rate,
- sort_selected_workers_by_gpu_type_and_resource,
- )
- from gpustack.utils.unit import byte_to_gib
- logger = logging.getLogger(__name__)
- EVENT_ACTION_DEFAULT = "default_scheduling_msg"
- EVENT_ACTION_MANUAL_MULTI = "manual_multi_gpu_scheduling_msg"
- EVENT_ACTION_AUTO_MULTI_WORKER_MULTI_GPU = "auto_multi_worker_multi_gpu_scheduling_msg"
- EVENT_ACTION_AUTO_SINGLE_WORKER_MULTI_GPU = (
- "auto_single_worker_multi_gpu_scheduling_msg"
- )
- EVENT_ACTION_AUTO_SINGLE_GPU = "auto_single_gpu_scheduling_msg"
- @dataclass
- class RequestEstimateUsage:
- ram: int
- vram: int
- class ModelAttentionTypeEnum(enum.Enum):
- UNK = "unknown"
- MHA = "multi_head_attention"
- GQA = "grouped_query_attention"
- MQA = "multi_query_attention"
- MLA = "multi_head_latent_attention"
- @dataclass
- class ModelParameters:
- architectures: List[str] = None
- derived_max_seq_len: int = 0
- num_hidden_layers: int = 0
- hidden_size: Optional[int] = None
- vocab_size: Optional[int] = None
- num_attention_heads: Optional[int] = None
- num_key_value_heads: int = 1
- n_group: Optional[int] = None
- head_dim: Optional[int] = None
- q_lora_rank: Optional[int] = None
- kv_lora_rank: Optional[int] = None
- qk_rope_head_dim: Optional[int] = None
- qk_nope_head_dim: Optional[int] = None
- v_head_dim: Optional[int] = None
- torch_dtype: str = "bfloat16"
- quantize: Optional[str] = None
- quantization_config: Optional[Dict] = None
- moe_num_experts: Optional[int] = None
- moe_num_shared_experts: Optional[int] = None
- moe_intermediate_size: Optional[int] = None
- is_multimodel: bool = False
- vision_config: Optional[Dict] = None
- vision_num_attention_heads: Optional[int] = None
- def from_model_pretrained_config( # noqa: C901
- self, model: Model, pretrained_config: Any
- ):
- """
- Parse the model's (hyper)parameters from the model.
- """
- if hasattr(pretrained_config, "vision_config"):
- self.is_multimodel = True
- self.vision_config = pretrained_config.vision_config
- self.vision_num_attention_heads = get_model_vision_num_attention_heads(
- pretrained_config
- )
- # Get architectures first, it is not available in text_config.
- if hasattr(pretrained_config, "architectures"):
- self.architectures = pretrained_config.architectures
- pretrained_config = get_hf_text_config(pretrained_config)
- if pretrained_config is None and CategoryEnum.LLM in model.categories:
- # Exclude empty dict cases, as they indicate the locally-sourced model is not local to the server node.
- raise ValueError(f"Failed to get model {model.name} pretrained config")
- for attr_name in [attr.name for attr in fields(self.__class__)]:
- try:
- attr_value = getattr(pretrained_config, attr_name, None)
- if attr_value is not None:
- setattr(self, attr_name, attr_value)
- except AttributeError:
- # If reach here, that means the field is an internal property,
- # which would not register in the argument parser.
- pass
- # Default
- self.derived_max_seq_len = get_max_model_len(pretrained_config)
- if not self.num_attention_heads:
- # For backward compatibility, try to get num_attention_heads from llm_config.
- self.num_attention_heads = get_model_num_attention_heads(pretrained_config)
- if not self.head_dim and self.hidden_size and self.num_attention_heads:
- self.head_dim = self.hidden_size // self.num_attention_heads
- if not self.moe_num_experts:
- for key in [
- "n_routed_experts",
- "num_local_experts",
- "num_experts",
- ]:
- if value := getattr(pretrained_config, key, None):
- setattr(self, "moe_num_experts", value)
- break
- if self.moe_num_experts and not self.moe_num_shared_experts:
- for key in [
- "n_shared_experts",
- "num_shared_experts",
- ]:
- if value := getattr(pretrained_config, key, None):
- setattr(self, "moe_num_shared_experts", value)
- break
- def get_attention_type(self) -> ModelAttentionTypeEnum:
- """
- Get the attention type based on the hyperparameters.
- """
- if self.num_attention_heads:
- if self.num_key_value_heads == 1:
- return ModelAttentionTypeEnum.MQA
- elif (
- 1 < self.num_key_value_heads < self.num_attention_heads
- and self.num_attention_heads % self.num_key_value_heads == 0
- ):
- return ModelAttentionTypeEnum.GQA
- elif self.num_key_value_heads == self.num_attention_heads:
- if self.q_lora_rank and self.kv_lora_rank:
- return ModelAttentionTypeEnum.MLA
- return ModelAttentionTypeEnum.MHA
- return ModelAttentionTypeEnum.UNK
- class ScheduleCandidatesSelector(ABC):
- # GPUStack configuration.
- _config: Config
- # Model to be scheduled.
- _model: Model
- # Model hyperparameters.
- _model_params: ModelParameters
- # Frequently used model parameter in selectors.
- _num_attention_heads: int
- _vision_num_attention_heads: int
- # Number of GPUs required by the model.
- # Derived from GPU selectors(manual) or Parallelism parameters(auto).
- _gpu_count: int
- # Selected worker names if manual GPU selection is used.
- _selected_gpu_workers: Optional[List[str]]
- # Selected GPU indexes by gpu type and worker name if manual GPU selection is used.
- _selected_gpu_indexes_by_gpu_type_and_worker: Dict[str, Dict[str, List[int]]]
- # Worker VRAM totals by GPU indexs cache.
- _vram_totals_by_gpu_type_and_worker_and_gpu_idxs: Dict[
- str, Dict[str, Dict[int, int]]
- ]
- # Worker allocatable resource cache by gpu type.
- _workers_allocatable_resource_by_gpu_type: Dict[str, Dict[int, Allocatable]]
- def __init__(
- self,
- config: Config,
- model: Model,
- model_instances: List[ModelInstance],
- ):
- self._config = config
- self._model = model
- self._model_instances = model_instances
- self._model_params = ModelParameters()
- self._num_attention_heads = 0
- self._vision_num_attention_heads = 0
- self._gpu_count = 0
- self._selected_gpu_workers = None
- self._selected_gpu_indexes_by_gpu_type_and_worker = {}
- self._vram_totals_by_gpu_type_and_worker_and_gpu_idxs = {}
- self._workers_allocatable_resource_by_gpu_type = {}
- @abstractmethod
- def get_messages(self) -> List[str]:
- """
- Get diagnostic messages from the selector.
- :return: A list of diagnostic messages.
- """
- pass
- @abstractmethod
- async def select_candidates(
- self, workers: List[Worker]
- ) -> List[ModelInstanceScheduleCandidate]:
- """
- Get schedule candidates.
- :param workers: The list of workers to select from.
- :return: A list of schedule candidates.
- """
- pass
- @abstractmethod
- def _should_check_vision_tp_divisibility(self) -> bool:
- """
- Whether this backend enforces TP divisibility for vision attention heads.
- """
- pass
- async def _init_model_parameters(self, workers: List[Worker]):
- if is_omni_model(self._model):
- # Current model parameters are for llm-like models.
- return
- try:
- pretrained_config = await get_pretrained_config_with_workers(
- self._model,
- workers,
- trust_remote_code=True,
- )
- self._model_params.from_model_pretrained_config(
- self._model, pretrained_config
- )
- self._num_attention_heads = self._model_params.num_attention_heads
- self._vision_num_attention_heads = (
- self._model_params.vision_num_attention_heads
- )
- except Exception as e:
- raise ValueError(
- f"Failed to parse model {self._model.name} hyperparameters: {e}"
- )
- def _set_gpu_count(
- self,
- world_size: Optional[int] = None,
- strategies: Optional[List[str]] = None,
- ):
- model = self._model
- if model.gpu_selector and model.gpu_selector.gpu_ids:
- gpu_indexes_by_gpu_type_and_worker = (
- group_gpu_indexes_by_gpu_type_and_worker(model.gpu_selector.gpu_ids)
- )
- gpu_ids_by_worker = group_gpu_ids_by_worker(model.gpu_selector.gpu_ids)
- self._selected_gpu_workers = list(gpu_ids_by_worker.keys())
- self._selected_gpu_indexes_by_gpu_type_and_worker = (
- gpu_indexes_by_gpu_type_and_worker
- )
- self._gpu_count = model.gpu_selector.gpus_per_replica or len(
- model.gpu_selector.gpu_ids
- )
- # When world_size is set.
- if world_size and world_size > 0:
- if self._gpu_count and self._gpu_count != world_size:
- # Both gpu selector and parallelism are set, validate they match.
- strategies_str = "/".join(strategies) if strategies else "parallelism"
- raise ValueError(
- f"Model {model.name} has {strategies_str} set, but the selected gpu count ({self._gpu_count}) does not match the world size ({world_size})."
- )
- self._gpu_count = world_size
- def get_worker_allocatable_resource(
- self, worker: Worker, gpu_type: Optional[str] = None
- ) -> Allocatable:
- """
- Get the worker's allocatable resource, with caching for efficiency.
- Args:
- worker: Worker object
- gpu_type: Optional GPU type to filter
- Returns:
- Allocatable: The allocatable resource of the worker
- """
- allocatable = self._workers_allocatable_resource_by_gpu_type.get(
- gpu_type, {}
- ).get(worker.id)
- if allocatable is not None:
- return allocatable
- allocatable = get_worker_allocatable_resource(
- self._model_instances, worker, gpu_type
- )
- self._workers_allocatable_resource_by_gpu_type.setdefault(
- gpu_type, {}
- ).setdefault(worker.id, allocatable)
- return allocatable
- def _get_worker_vram_totals(
- self, worker: Worker, gpu_type: Optional[str] = None
- ) -> Dict[int, int]:
- """
- Get a mapping from GPU idxs to total VRAM for the given worker, filtered by gpu_type if provided.
- Uses cache for efficiency.
- Args:
- worker: Worker object
- Returns:
- Dict[int, int]: Mapping from GPU idxs to total VRAM (bytes)
- """
- vram_by_ids = self._vram_totals_by_gpu_type_and_worker_and_gpu_idxs.get(
- gpu_type, {}
- ).get(worker.name)
- if vram_by_ids is not None:
- return vram_by_ids
- if not worker.status or not worker.status.gpu_devices:
- return {}
- vram_by_idxs = {
- gpu.index: gpu.memory.total if gpu.memory and gpu.memory.total else 0
- for gpu in worker.status.gpu_devices
- if gpu.index is not None and (gpu_type is None or gpu.type == gpu_type)
- }
- self._vram_totals_by_gpu_type_and_worker_and_gpu_idxs.setdefault(gpu_type, {})[
- worker.name
- ] = vram_by_idxs
- return vram_by_idxs
- def _get_worker_resource_claim(
- self,
- worker: Worker,
- gpu_indexes: List[int],
- gpu_memory_utilization: float,
- request: Optional[RequestEstimateUsage] = None,
- gpu_type: Optional[str] = None,
- ) -> Dict[int, int]:
- """
- Given a worker and gpu indexes, get the vram claim according to gpu_memory_utilization.
- Returns a dictionary of gpu index to vram claim in bytes.
- """
- vram_claim: Dict[int, int] = {}
- for gpu in worker.status.gpu_devices:
- if gpu.index not in gpu_indexes or (
- gpu_type is not None and gpu.type != gpu_type
- ):
- continue
- claim = 0
- # LLMs
- if gpu_memory_utilization > 0:
- claim = int(gpu.memory.total * gpu_memory_utilization)
- # non LLMs
- else:
- if request is not None:
- claim = int(request.vram / len(gpu_indexes))
- else:
- claim = 0
- vram_claim[gpu.index] = claim
- return vram_claim
- def _get_worker_gpu_addresses(
- self,
- worker: Worker,
- gpu_indexes: List[int],
- gpu_type: Optional[str] = None,
- ) -> List[str]:
- """
- Given a worker and gpu indexes, get the gpu addresses according to gpu_indexes.
- Returns a list of GPU addresses.
- """
- gpu_addresses = []
- if self._model.backend != BackendEnum.ASCEND_MINDIE.value:
- return gpu_addresses
- for gpu in worker.status.gpu_devices:
- if gpu.index not in gpu_indexes and (
- gpu_type is not None and gpu.type != gpu_type
- ):
- continue
- addr = (
- gpu.network.inet
- if gpu.network and gpu.network.status == 'up' and gpu.network.inet
- else "-.-.-.-"
- )
- gpu_addresses.append(addr)
- return gpu_addresses
- def _generate_manual_selected_gpus_overcommit_message( # noqa: C901
- self,
- candidate: ModelInstanceScheduleCandidate,
- workers: List[Worker],
- request: RequestEstimateUsage,
- gpu_memory_utilization: float,
- ) -> Tuple[bool, str, float]:
- """Check whether the candidate is overcommit and generate overcommit message.
- Args:
- candidate (ModelInstanceScheduleCandidate): The candidate to check.
- workers (List[Worker]): The list of workers.
- vram_claim (int): The model VRAM claim in bytes.
- gpu_memory_utilization (float): The required GPU memory utilization ratio.
- Returns:
- Tuple[bool, str, float]: A tuple of (is_overcommit, message, effective_vram).
- """
- if not self._model.gpu_selector or not self._model.gpu_selector.gpu_ids:
- return (False, "", 0)
- # 1. Build worker set for easy access.
- worker_set = {w.name: w for w in workers}
- # 2. Summarize the total VRAM and the number that meets the utilization for selected GPUs.
- # Generate:
- # - total_vram
- # - satisfied_gpus_by_worker
- # - satisfied_gpus_total_allocatable_vram_for_non_llms
- # - selected_gpus_total_allocatable_vram_for_non_llms
- selected_gpu_count = len(self._model.gpu_selector.gpu_ids)
- satisfied_gpus_by_worker = {}
- satisfied_gpus_count = 0
- satisfied_gpus_total_allocatable_vram_for_non_llms = 0
- selected_gpus_total_allocatable_vram_for_non_llms = 0
- for (
- gpu_type,
- worker_gpu_indexes,
- ) in self._selected_gpu_indexes_by_gpu_type_and_worker.items():
- for wn, g in worker_gpu_indexes.items():
- w = worker_set.get(wn)
- if w is None:
- continue
- wa = self.get_worker_allocatable_resource(w, gpu_type)
- vram_total_by_index = self._get_worker_vram_totals(w, gpu_type)
- if wa.vram is None:
- continue
- for gpu_index in g:
- total_vram = vram_total_by_index.get(gpu_index)
- allocatable_vram = wa.vram.get(gpu_index)
- selected_gpus_total_allocatable_vram_for_non_llms += (
- allocatable_vram or 0
- )
- if gpu_memory_utilization == 0: # non LLMs
- satisfied_gpus_count += 1
- satisfied_gpus_total_allocatable_vram_for_non_llms += (
- allocatable_vram or 0
- )
- satisfied_gpus_by_worker.setdefault(gpu_type, {}).setdefault(
- wn, {}
- )[gpu_index] = True
- else: # LLMs
- if total_vram is None or allocatable_vram is None:
- continue
- allocatable_gpu_memory_utilization = (
- allocatable_vram / total_vram
- )
- if allocatable_gpu_memory_utilization >= gpu_memory_utilization:
- satisfied_gpus_count += 1
- satisfied_gpus_by_worker.setdefault(
- gpu_type, {}
- ).setdefault(wn, {})[gpu_index] = True
- # 3. Summarize the total VRAM and the number that meets the utilization for used GPUs.
- used_gpu_count = 0
- used_satisfied_count = 0
- used_gpus_total_allocatable_vram_for_non_llms = 0
- used_vram_claim_total = 0
- used_ram_claim_total = 0
- pairs = [
- (
- candidate.worker.name,
- candidate.gpu_type,
- candidate.gpu_indexes,
- candidate.computed_resource_claim,
- )
- ]
- if candidate.subordinate_workers:
- for sw in candidate.subordinate_workers:
- pairs.append(
- (
- sw.worker_name,
- sw.gpu_type,
- sw.gpu_indexes,
- sw.computed_resource_claim,
- )
- )
- for wn, gpu_type, indexes, claim in pairs:
- w = worker_set.get(wn)
- wa = self.get_worker_allocatable_resource(w, gpu_type)
- used_vram_claim_total += (
- sum(v for k, v in claim.vram.items()) if claim.vram else 0
- )
- used_ram_claim_total += claim.ram or 0
- used_gpu_count += len(indexes)
- for idx in indexes:
- allocatable_vram = wa.vram.get(idx)
- used_gpus_total_allocatable_vram_for_non_llms += allocatable_vram or 0
- if (
- satisfied_gpus_by_worker.get(gpu_type, {})
- .get(wn, {})
- .get(idx, None)
- is not None
- ):
- used_satisfied_count += 1
- # 4. Determine if overcommit occurred.
- overcommit = (
- used_vram_claim_total < request.vram
- or (request.ram > 0 and used_ram_claim_total < request.ram)
- or used_satisfied_count < used_gpu_count
- )
- if not overcommit:
- return (False, "", 0)
- # 5. Build scheduling message.
- using_partial = used_gpu_count < selected_gpu_count
- used_devices_msg = abbreviate_worker_gpu_indexes(
- candidate.worker.name,
- candidate.gpu_indexes,
- len(candidate.subordinate_workers or []),
- (used_gpu_count - len(candidate.gpu_indexes)),
- 8,
- )
- scheduling_msg = ListMessageBuilder(
- "Manual GPU selection resulted in resource overcommit."
- )
- effective_vram = 0
- if gpu_memory_utilization == 0:
- # non-LLM case
- if using_partial:
- effective_vram = used_gpus_total_allocatable_vram_for_non_llms
- scheduling_msg.extend(
- [
- f"Using {used_devices_msg} out of {selected_gpu_count} selected devices.",
- f"Used GPUs provide {byte_to_gib(effective_vram)} GiB allocatable VRAM.",
- ]
- )
- else:
- effective_vram = selected_gpus_total_allocatable_vram_for_non_llms
- scheduling_msg.append(
- f"Selected GPUs have {byte_to_gib(effective_vram)} GiB of VRAM."
- )
- else:
- # LLM case
- if using_partial:
- effective_vram = (
- used_gpus_total_allocatable_vram_for_non_llms
- * gpu_memory_utilization
- )
- scheduling_msg.extend(
- [
- f"Using {used_devices_msg} out of {selected_gpu_count} selected devices.",
- f"Used GPUs provide {byte_to_gib(used_gpus_total_allocatable_vram_for_non_llms):.2f} GiB allocatable VRAM, "
- f"{used_satisfied_count}/{used_gpu_count} of GPUs meet the VRAM utilization ratio, "
- f"providing {byte_to_gib(effective_vram):.2f} GiB of allocatable VRAM.",
- ]
- )
- else:
- effective_vram = (
- selected_gpus_total_allocatable_vram_for_non_llms
- * gpu_memory_utilization
- )
- scheduling_msg.extend(
- [
- f"Selected GPUs have {byte_to_gib(selected_gpus_total_allocatable_vram_for_non_llms):.2f} GiB allocatable VRAM, "
- f"{satisfied_gpus_count}/{selected_gpu_count} of GPUs meet the VRAM utilization ratio, "
- f"providing {byte_to_gib(effective_vram):.2f} GiB of allocatable VRAM."
- ]
- )
- return (True, str(scheduling_msg), effective_vram)
- def _get_non_overcommit_and_best_overcommit_candidates(
- self,
- candidates: List[ModelInstanceScheduleCandidate],
- workers: List[Worker],
- request: RequestEstimateUsage,
- gpu_memory_utilization: Dict[str, float],
- ) -> Tuple[
- List[ModelInstanceScheduleCandidate],
- Optional[ModelInstanceScheduleCandidate],
- Optional[str],
- ]:
- """Separate non-overcommit candidates and find the best overcommit candidate.
- Args:
- candidates (List[ModelInstanceScheduleCandidate]): The list of candidates to check.
- workers (List[Worker]): The list of workers.
- request (RequestEstimateUsage): The estimated resource usage request.
- gpu_memory_utilization (Dict[str, float]): The required GPU memory utilization ratio by GPU type, key is the GPU type, * is for all types.
- Returns:
- Tuple[List[ModelInstanceScheduleCandidate], Optional[ModelInstanceScheduleCandidate], Optional[str]]:
- A tuple of (non_overcommit_candidates, best_overcommit_candidate, overcommit_message).
- """
- non_overcommits_candidates = []
- max_effective_vram = 0
- best_overcommit_candidate = None
- overcommit_msg = None
- for c in candidates:
- gpu_memory_utilization_for_type = gpu_memory_utilization.get(
- c.gpu_type,
- gpu_memory_utilization.get("*", 0),
- )
- overcommit, msg, effective_vram = (
- self._generate_manual_selected_gpus_overcommit_message(
- c, workers, request, gpu_memory_utilization_for_type
- )
- )
- c.overcommit = overcommit
- if not overcommit:
- non_overcommits_candidates.append(c)
- elif effective_vram >= max_effective_vram:
- max_effective_vram = effective_vram
- best_overcommit_candidate = c
- overcommit_msg = msg
- if non_overcommits_candidates:
- return non_overcommits_candidates, None, None
- return non_overcommits_candidates, best_overcommit_candidate, overcommit_msg
- def _find_manual_gpu_selection_candidates( # noqa: C901
- self,
- workers: List[Worker],
- gpu_memory_utilization: Dict[str, float],
- request: RequestEstimateUsage,
- event_collector: EventCollector,
- ) -> List[ModelInstanceScheduleCandidate]:
- """
- Find candidates for manual GPU selection based on user-specified GPU IDs.
- This function handles all manual GPU selection scenarios for vLLM.
- args:
- workers: List of available workers.
- gpu_memory_utilization: Required GPU memory utilization ratio by GPU type, key is the GPU type, * is for all types.
- request: The estimated resource usage request.
- event_collector: Event collector for logging events.
- """
- # Skip if no manual GPU selection is specified
- if not self._selected_gpu_workers:
- return []
- # Not allow heterogeneous gpu types
- if (
- self._gpu_count == len(self._model.gpu_selector.gpu_ids)
- and len(self._selected_gpu_indexes_by_gpu_type_and_worker.keys()) > 1
- ):
- event_collector.add(
- EventLevelEnum.ERROR,
- EVENT_ACTION_MANUAL_MULTI,
- str(
- ListMessageBuilder(
- "Deployment with heterogeneous GPU types is not supported, please select GPUs of the same type or update GPUs per replica.",
- )
- ),
- )
- return []
- logger.debug(
- f"Manual GPU selection: workers={self._selected_gpu_workers}, "
- f"worker_count={len(self._selected_gpu_workers)}, "
- f"gpu_indexes_by_worker_and_type={self._selected_gpu_indexes_by_gpu_type_and_worker}"
- )
- candidates = []
- # Filter and sort selected workers by resource
- selected_workers_by_gpu_type = sort_selected_workers_by_gpu_type_and_resource(
- workers,
- self._selected_gpu_indexes_by_gpu_type_and_worker,
- self.get_worker_allocatable_resource,
- )
- # Handle single-worker single gpu scenarios
- if self._gpu_count == 1:
- for gpu_type, workers_of_type in selected_workers_by_gpu_type.items():
- gpu_memory_utilization_for_type = gpu_memory_utilization.get(
- gpu_type,
- gpu_memory_utilization.get("*", 0),
- )
- for worker in workers_of_type:
- for gpu in worker.status.gpu_devices:
- worker_candidates = (
- self._manual_select_single_worker_multi_gpu_candidates(
- worker,
- [gpu.index],
- gpu_memory_utilization_for_type,
- request,
- gpu_type,
- )
- )
- candidates.extend(worker_candidates)
- # Handle single-worker multi-GPU and multi-worker scenarios
- elif self._gpu_count > 1:
- # Single-worker multi-GPU selection
- for gpu_type, workers_of_type in selected_workers_by_gpu_type.items():
- gpu_memory_utilization_for_type = gpu_memory_utilization.get(
- gpu_type,
- gpu_memory_utilization.get("*", 0),
- )
- for worker in workers_of_type:
- selected_gpu_indexes = [
- gpu.index for gpu in worker.status.gpu_devices
- ]
- if selected_gpu_indexes is None:
- continue
- worker_candidates = (
- self._manual_select_single_worker_multi_gpu_candidates(
- worker,
- selected_gpu_indexes,
- gpu_memory_utilization_for_type,
- request,
- gpu_type,
- )
- )
- candidates.extend(worker_candidates)
- # Multi-worker multi-GPU selection
- if not candidates:
- for gpu_type, selected_workers in selected_workers_by_gpu_type.items():
- gpu_memory_utilization_for_type = gpu_memory_utilization.get(
- gpu_type,
- gpu_memory_utilization.get("*", 0),
- )
- worker_candidates = (
- self._manual_select_multi_worker_multi_gpu_candidates(
- selected_workers,
- gpu_memory_utilization_for_type,
- request,
- gpu_type,
- )
- )
- candidates.extend(worker_candidates)
- if not candidates:
- return []
- # Separate non-overcommit and overcommit candidates
- non_overcommits_candidates, best_overcommit_candidate, overcommit_msg = (
- self._get_non_overcommit_and_best_overcommit_candidates(
- candidates, workers, request, gpu_memory_utilization
- )
- )
- # Return non-overcommit candidates if any
- if non_overcommits_candidates:
- return non_overcommits_candidates
- # Handle overcommit candidates
- if self._model.replicas > 1:
- event_collector.add(
- EventLevelEnum.INFO,
- EVENT_ACTION_MANUAL_MULTI,
- str(
- ListMessageBuilder(
- f"Found {len(candidates) - len(non_overcommits_candidates)} candidate, manual scheduling for multi-replica model instances does not allow overcommit or heterogeneous deployment topologies.",
- )
- ),
- )
- return []
- if best_overcommit_candidate:
- event_collector.add(
- EventLevelEnum.INFO, EVENT_ACTION_MANUAL_MULTI, overcommit_msg
- )
- return [best_overcommit_candidate]
- return []
- def _manual_select_single_worker_multi_gpu_candidates(
- self,
- worker: Worker,
- gpu_indexes: List[int],
- gpu_memory_utilization: float,
- request: RequestEstimateUsage,
- gpu_type: Optional[str] = None,
- ) -> List[ModelInstanceScheduleCandidate]:
- """Manually select multi GPU candidates."""
- # Early exit
- if len(gpu_indexes) < self._gpu_count:
- return []
- allocatable = self.get_worker_allocatable_resource(worker, gpu_type)
- vram_totals_by_gpu_idx = self._get_worker_vram_totals(worker, gpu_type)
- # Check if the GPU is satisfied the requirement
- satisfied_gpu_indexes = []
- unsatisfied_gpu_indexes = []
- for gpu_index in gpu_indexes:
- allocatable_vram = allocatable.vram.get(gpu_index, 0)
- total_vram = vram_totals_by_gpu_idx.get(gpu_index, 0)
- # LLMs
- if gpu_memory_utilization > 0:
- if (
- total_vram > 0
- and allocatable_vram / total_vram >= gpu_memory_utilization
- ):
- satisfied_gpu_indexes.append(gpu_index)
- else:
- unsatisfied_gpu_indexes.append(gpu_index)
- # non LLMs
- else:
- satisfied_gpu_indexes.append(gpu_index)
- if len(satisfied_gpu_indexes) >= self._gpu_count:
- break
- # Extend with unsatisfied gpu indexes if not enough satisfied gpus
- used_gpu_indexes = satisfied_gpu_indexes.copy()
- used_gpu_indexes.extend(
- unsatisfied_gpu_indexes[: self._gpu_count - len(satisfied_gpu_indexes)]
- )
- # Get vram claims for used gpus
- vram_claims = self._get_worker_resource_claim(
- worker, used_gpu_indexes, gpu_memory_utilization, request, gpu_type
- )
- return [
- ModelInstanceScheduleCandidate(
- worker=worker,
- gpu_type=gpu_type,
- gpu_indexes=used_gpu_indexes,
- gpu_addresses=self._get_worker_gpu_addresses(
- worker, used_gpu_indexes, gpu_type
- ),
- computed_resource_claim=ComputedResourceClaim(
- vram=vram_claims,
- ram=get_computed_ram_claim(self._model, vram_claims, request.ram),
- vram_utilization=gpu_memory_utilization,
- ),
- )
- ]
- def _manual_select_multi_worker_multi_gpu_candidates(
- self,
- workers: List[Worker],
- gpu_memory_utilization: float,
- request: RequestEstimateUsage,
- gpu_type: Optional[str] = None,
- ) -> List[ModelInstanceScheduleCandidate]:
- """Manual select multi worker multi GPU candidates."""
- if len(workers) < 2:
- return []
- # Main worker is the first one (with most GPUs)
- main_worker = workers[0]
- main_worker_name = main_worker.name
- main_gpu_indexes = self._selected_gpu_indexes_by_gpu_type_and_worker.get(
- gpu_type, {}
- ).get(main_worker_name, [])
- main_vram_claim = self._get_worker_resource_claim(
- main_worker, main_gpu_indexes, gpu_memory_utilization, request, gpu_type
- )
- # Handle subordinate workers
- subordinate_workers: List[ModelInstanceSubordinateWorker] = []
- for worker in workers:
- # Skip if the worker is not selected
- if (
- self._selected_gpu_indexes_by_gpu_type_and_worker.get(gpu_type, {}).get(
- worker.name
- )
- is None
- ):
- continue
- if worker.name == main_worker_name:
- continue
- if not self._validate_distributed_limit_per_worker(worker):
- continue
- # Sort GPUs by allocatable rate
- gpu_indexes = self._selected_gpu_indexes_by_gpu_type_and_worker.get(
- gpu_type, {}
- ).get(worker.name, [])
- vram_allocatable = self._get_worker_resource_claim(
- worker, gpu_indexes, gpu_memory_utilization, request, gpu_type
- )
- sorted_gpu_indexes = sort_gpu_indexes_by_allocatable_rate(
- worker, vram_allocatable, gpu_type
- )
- # Calculate how many GPUs can be assigned to this subordinate worker
- current_gpu_count = len(main_gpu_indexes) + sum(
- len(sw.gpu_indexes) for sw in subordinate_workers
- )
- remaining_gpu_count = max(self._gpu_count - current_gpu_count, 0)
- assign_count = min(len(gpu_indexes), remaining_gpu_count)
- # Assign GPUs to the subordinate worker
- sw_gpu_indexes = sorted_gpu_indexes[:assign_count]
- vram_claim = self._get_worker_resource_claim(
- worker, sw_gpu_indexes, gpu_memory_utilization, request, gpu_type
- )
- subordinate_workers.append(
- ModelInstanceSubordinateWorker(
- worker_id=worker.id,
- worker_name=worker.name,
- worker_ip=worker.ip,
- worker_ifname=worker.ifname,
- total_gpus=len(worker.status.gpu_devices),
- gpu_type=gpu_type,
- gpu_indexes=sw_gpu_indexes,
- gpu_addresses=self._get_worker_gpu_addresses(
- worker, sw_gpu_indexes
- ),
- computed_resource_claim=ComputedResourceClaim(
- vram=vram_claim,
- ram=get_computed_ram_claim(
- self._model, vram_claim, request.ram
- ),
- vram_utilization=gpu_memory_utilization,
- ),
- )
- )
- current_gpu_count = len(main_gpu_indexes) + sum(
- len(sw.gpu_indexes) for sw in subordinate_workers
- )
- if self._gpu_count and current_gpu_count >= self._gpu_count:
- break
- if not subordinate_workers:
- return []
- return [
- ModelInstanceScheduleCandidate(
- worker=main_worker,
- gpu_type=gpu_type,
- gpu_indexes=main_gpu_indexes,
- gpu_addresses=self._get_worker_gpu_addresses(
- main_worker, main_gpu_indexes
- ),
- computed_resource_claim=ComputedResourceClaim(
- vram=main_vram_claim,
- ram=get_computed_ram_claim(
- self._model, main_vram_claim, request.ram
- ),
- ),
- subordinate_workers=subordinate_workers,
- )
- ]
- def _validate_distributed_limit_per_worker(self, worker: Worker) -> bool:
- """
- Validate that there is no more than one distributed vLLM instance per worker.
- """
- instances = get_worker_model_instances(self._model_instances, worker)
- for instance in instances:
- if (
- instance.distributed_servers
- and instance.distributed_servers.subordinate_workers
- and (
- instance.model
- and instance.model.backend
- and instance.model.backend == self._model.backend
- )
- ):
- self._messages = [
- str(
- ListMessageBuilder(
- f"Each worker can run only one distributed vLLM instance. Worker '{worker.name}' already has '{instance.name}'."
- )
- ),
- ]
- return False
- return True
- def _is_tp_size_divisible(self, tp_size: int) -> bool:
- """
- Check whether InferenceBackend's constraint of parameter divisibility is satisfied.
- 1. num_attention_heads
- 2. vision_num_attention_heads
- 3. vocab_size
- Notes on `tp_size` (tensor parallel size) usage in auto scheduling:
- - Single-worker multi-GPU: `tp_size` is the number of GPUs currently selected
- in the traversal (i.e., `gpu_sum`). The scheduler grows the candidate set
- incrementally and validates divisibility at each step.
- - Multi-worker multi-GPU: selected workers are constrained to have the same
- number of GPUs, so `tp_size` equals the per-worker GPU count (i.e., `gpu_count`)
- """
- if not tp_size:
- return False
- if self._num_attention_heads and self._num_attention_heads % tp_size != 0:
- return False
- if (
- self._should_check_vision_tp_divisibility()
- and self._vision_num_attention_heads
- and self._vision_num_attention_heads % tp_size != 0
- ):
- return False
- if (
- self._model_params.vocab_size
- and self._model_params.vocab_size % tp_size != 0
- ):
- return False
- return True
- def _check_tp_size_divisibility(
- self,
- tp_size: int,
- ) -> Optional[str]:
- """
- Check whether InferenceBackend's constraint of parameter divisibility is satisfied.
- 1. num_attention_heads
- 2. vision_num_attention_heads
- 3. vocab_size
- Return:
- None if divisibility is satisfied, otherwise an error message.
- """
- if not tp_size:
- return None
- if self._num_attention_heads and self._num_attention_heads % tp_size != 0:
- return (
- f"Total number of attention heads ({self._num_attention_heads})"
- " must be divisible by tensor parallel size "
- f"({tp_size})."
- )
- if (
- self._should_check_vision_tp_divisibility()
- and self._vision_num_attention_heads
- and self._vision_num_attention_heads % tp_size != 0
- ):
- return (
- f"Total number of vision attention heads ({self._vision_num_attention_heads})"
- " must be divisible by tensor parallel size "
- f"({tp_size})."
- )
- if (
- self._model_params.vocab_size
- and self._model_params.vocab_size % tp_size != 0
- ):
- return (
- f"Vocabulary size ({self._model_params.vocab_size})"
- " must be divisible by tensor parallel size "
- f"({tp_size})."
- )
- return None
|