| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617 |
- import asyncio
- import hashlib
- import json
- import logging
- import os
- from collections import defaultdict
- from typing import List, Tuple, Optional, Dict
- from gpustack_runtime.detector import ManufacturerEnum
- from sqlmodel.ext.asyncio.session import AsyncSession
- from cachetools import TTLCache
- from aiolimiter import AsyncLimiter
- from gpustack.api.exceptions import HTTPException
- from gpustack.client.worker_filesystem_client import WorkerFilesystemClient
- from gpustack.config.config import Config
- from gpustack.policies.base import ModelInstanceScheduleCandidate
- from gpustack import envs
- from gpustack.routes.models import validate_model_in
- from gpustack.scheduler import scheduler
- from gpustack.server.catalog import model_set_specs_by_key
- from gpustack.schemas.model_evaluations import (
- ModelEvaluationResult,
- ModelSpec,
- ResourceClaim,
- )
- from gpustack.schemas.models import (
- ModelInstance,
- BackendEnum,
- SourceEnum,
- get_backend,
- is_gguf_model,
- is_audio_model,
- )
- from gpustack.schemas.workers import Worker, WorkerStateEnum
- from gpustack.schemas.inference_backend import (
- is_built_in_backend_custom_version,
- is_custom_backend,
- )
- from gpustack.server.worker_selector import WorkerSelector
- from gpustack_runner import list_backend_runners
- from gpustack_runtime.deployer.__utils__ import compare_versions
- from gpustack.utils.gpu import (
- all_gpu_match,
- any_gpu_match,
- find_one_gpu,
- compare_compute_capability,
- )
- from gpustack.utils.hub import (
- auth_check,
- get_hugging_face_model_min_gguf_path,
- get_model_scope_model_min_gguf_path,
- is_repo_cached,
- )
- from gpustack.utils.task import run_in_thread
- from gpustack.utils.profiling import time_decorator
- logger = logging.getLogger(__name__)
- evaluate_cache = TTLCache(
- maxsize=envs.MODEL_EVALUATION_CACHE_MAX_SIZE, ttl=envs.MODEL_EVALUATION_CACHE_TTL
- )
- # To reduce the likelihood of hitting the Hugging Face API rate limit (600 RPM)
- # Limit the number of concurrent evaluations to 50 per 10 seconds
- evaluate_model_limiter = AsyncLimiter(50, 10)
- @time_decorator
- async def evaluate_models(
- config: Config,
- session: AsyncSession,
- model_specs: List[ModelSpec],
- cluster_id: Optional[int] = None,
- ) -> List[ModelEvaluationResult]:
- """
- Evaluate the compatibility of a list of model specs with the available workers.
- """
- fields = {
- "deleted_at": None,
- }
- if cluster_id is not None:
- fields["cluster_id"] = cluster_id
- extra_conditions = [
- ~(
- Worker.state.in_(
- [
- WorkerStateEnum.PROVISIONING,
- WorkerStateEnum.DELETING,
- WorkerStateEnum.ERROR,
- ]
- )
- )
- ]
- workers = await Worker.all_by_fields(
- session, fields=fields, extra_conditions=extra_conditions
- )
- model_instances = await ModelInstance.all_by_fields(session, fields=fields)
- if len(model_specs) == 1:
- # Sort worker for single-model evaluation only. No need for batch evaluation.
- workers = await scheduler.prioritize_workers_with_model_files(
- session, model_specs[0], workers
- )
- async def evaluate(model: ModelSpec):
- return await evaluate_model_with_cache(
- config,
- session,
- model,
- workers,
- model_instances,
- cluster_id=cluster_id,
- )
- tasks = [evaluate(model) for model in model_specs]
- results = await asyncio.gather(*tasks)
- return results
- def make_hashable_key(model: ModelSpec, workers: List[Worker]) -> str:
- key_data = json.dumps(
- {
- "model": model.model_dump(mode="json"),
- "workers": [
- w.model_dump(
- mode="json",
- exclude={
- "status": {
- "cpu": True,
- "swap": True,
- "filesystem": True,
- "os": True,
- "kernel": True,
- "uptime": True,
- "memory": {"utilization_rate", "used"},
- "gpu_devices": {
- "__all__": {
- "temperature": True,
- "core": {"utilization_rate"},
- "memory": {"utilization_rate", "used"},
- },
- },
- },
- "heartbeat_time": True,
- "created_at": True,
- "updated_at": True,
- },
- )
- for w in workers
- ],
- },
- sort_keys=True,
- )
- return hashlib.md5(key_data.encode()).hexdigest()
- async def evaluate_model_with_cache(
- config: Config,
- session: AsyncSession,
- model: ModelSpec,
- workers: List[Worker],
- model_instances: List[ModelInstance],
- cluster_id: Optional[int] = None,
- ) -> ModelEvaluationResult:
- cache_key = make_hashable_key(model, workers)
- if cache_key in evaluate_cache:
- logger.trace(
- f"Evaluation cache hit for model: {model.name or model.readable_source}"
- )
- return evaluate_cache[cache_key]
- try:
- async with evaluate_model_limiter:
- result = await evaluate_model(
- config, session, model, workers, model_instances, cluster_id=cluster_id
- )
- evaluate_cache[cache_key] = result
- except Exception as e:
- logger.exception(
- f"Error evaluating model {model.name or model.readable_source}: {e}"
- )
- result = ModelEvaluationResult(
- compatible=False, error=True, error_message=str(e)
- )
- return result
- @time_decorator
- async def evaluate_model(
- config: Config,
- session: AsyncSession,
- model: ModelSpec,
- workers: List[Worker],
- model_instances: List[ModelInstance],
- cluster_id: Optional[int] = None,
- ) -> ModelEvaluationResult:
- result = ModelEvaluationResult()
- if set_default_spec(model):
- result.default_spec = model.model_copy()
- await set_gguf_model_file_path(config, model)
- evaluations = [
- (evaluate_model_input, (session, model, cluster_id)),
- (evaluate_model_metadata, (config, model, workers)),
- (evaluate_environment, (model, workers)),
- (evaluate_runtime_version, (model, workers)),
- ]
- for evaluation, args in evaluations:
- compatible, messages = await evaluation(*args)
- if not compatible:
- result.compatible = False
- result.compatibility_messages = messages
- return result
- workers_by_cluster: Dict[int, List[Worker]] = defaultdict(list)
- for worker in workers:
- workers_by_cluster[worker.cluster_id].append(worker)
- overcommit_clusters = []
- result.resource_claim_by_cluster_id = {}
- for cluster_id, cluster_workers in workers_by_cluster.items():
- cluster_model_instances = [
- inst for inst in model_instances if inst.cluster_id == cluster_id
- ]
- candidate, schedule_messages = await scheduler.find_candidate(
- config, model, cluster_workers, cluster_model_instances
- )
- if not candidate:
- result.scheduling_messages.extend(schedule_messages)
- continue
- if candidate.overcommit:
- overcommit_clusters.append(cluster_id)
- result.scheduling_messages.extend(schedule_messages)
- continue
- result.resource_claim_by_cluster_id[cluster_id] = (
- summarize_candidate_resource_claim(candidate)
- )
- if result.resource_claim_by_cluster_id:
- result.resource_claim = next(iter(result.resource_claim_by_cluster_id.values()))
- else:
- result.resource_claim = None
- result.compatible = False
- result.compatibility_messages.append(
- "Unable to find a schedulable worker for the model."
- )
- return result
- def summarize_candidate_resource_claim(
- candidate: ModelInstanceScheduleCandidate,
- ) -> ResourceClaim:
- """
- Summarize the computed resource claim for a schedule candidate.
- """
- computed_resource_claims = [candidate.computed_resource_claim]
- if candidate.subordinate_workers:
- computed_resource_claims.extend(
- sw.computed_resource_claim
- for sw in candidate.subordinate_workers
- if sw.computed_resource_claim is not None
- )
- ram, vram = 0, 0
- for computed_resource_claim in computed_resource_claims:
- ram += computed_resource_claim.ram or 0
- if computed_resource_claim.vram:
- vram += sum(
- v for v in computed_resource_claim.vram.values() if v is not None
- )
- return ResourceClaim(ram=ram, vram=vram)
- async def set_gguf_model_file_path(config: Config, model: ModelSpec):
- if (
- model.source == SourceEnum.HUGGING_FACE
- and "gguf" in model.huggingface_repo_id.lower()
- and not model.huggingface_filename
- ):
- model.huggingface_filename = await run_in_thread(
- get_hugging_face_model_min_gguf_path,
- timeout=15,
- model_id=model.huggingface_repo_id,
- token=config.huggingface_token,
- )
- elif (
- model.source == SourceEnum.MODEL_SCOPE
- and "gguf" in model.model_scope_model_id.lower()
- and not model.model_scope_file_path
- ):
- model.model_scope_file_path = await run_in_thread(
- get_model_scope_model_min_gguf_path,
- timeout=15,
- model_id=model.model_scope_model_id,
- )
- async def evaluate_environment(
- model: ModelSpec,
- workers: List[Worker],
- ) -> Tuple[bool, List[str]]:
- backend = get_backend(model)
- if backend == BackendEnum.ASCEND_MINDIE and not any_gpu_match(
- workers, lambda gpu: gpu.vendor == ManufacturerEnum.ASCEND.value
- ):
- return False, [
- "The Ascend MindIE backend requires Ascend NPUs but none are available."
- ]
- if (
- backend == BackendEnum.SGLANG
- and all_gpu_match(
- workers, lambda gpu: gpu.vendor == ManufacturerEnum.NVIDIA.value
- )
- and not any_gpu_match(
- workers,
- lambda gpu: compare_compute_capability(gpu.compute_capability, "8.0") >= 0,
- )
- ):
- # Ref: https://github.com/sgl-project/sglang/issues/6006
- gpu = find_one_gpu(workers)
- return False, [
- "The SGLang backend requires NVIDIA GPUs with compute capability 8.0 or higher "
- "(e.g., A100/SM80, H100/SM90, RTX 3090/SM86). "
- + (
- f"Available GPU: {gpu.name} (compute capability: {gpu.compute_capability})"
- if gpu
- else ""
- )
- ]
- return True, []
- async def evaluate_runtime_version(
- model: ModelSpec, workers: List[Worker]
- ) -> Tuple[bool, List[str]]:
- """
- Evaluate if the highest GPU runtime version across all workers
- meets the minimum requirements for the backend runner.
- Args:
- model: Model specification to evaluate
- workers: List of workers to check
- Returns:
- Tuple of (compatible, messages):
- - compatible: True if runtime version is sufficient, False otherwise
- - messages: List of error messages if incompatible, empty list otherwise
- """
- backend_name = get_backend(model)
- if is_custom_backend(backend_name):
- return True, []
- if is_built_in_backend_custom_version(
- backend_name, model.backend_version, model.image_name
- ):
- return True, []
- max_runtime_version = None
- max_gpu_type = None
- for worker in workers:
- if not worker.status or not worker.status.gpu_devices:
- continue
- gpu = next(
- (
- gpu
- for gpu in worker.status.gpu_devices
- if gpu.type and gpu.runtime_version
- ),
- None,
- )
- if not gpu:
- continue
- if max_runtime_version is None:
- max_runtime_version = gpu.runtime_version
- max_gpu_type = gpu.type
- elif compare_versions(gpu.runtime_version, max_runtime_version) > 0:
- max_runtime_version = gpu.runtime_version
- max_gpu_type = gpu.type
- if max_runtime_version is None:
- return True, []
- is_supported, version_list = await _check_runtime_version(
- backend_name, model.backend_version, max_gpu_type, max_runtime_version
- )
- if is_supported:
- return True, []
- msg = _format_runtime_upgrade_message(
- backend_name,
- model.backend_version,
- max_gpu_type,
- max_runtime_version,
- version_list,
- )
- return False, [msg]
- async def _check_runtime_version(
- backend_name: str,
- model_backend_version: Optional[str],
- gpu_type: str,
- runtime_version: str,
- ) -> Tuple[bool, List[str]]:
- """
- Check if the runtime version meets the minimum requirements.
- Args:
- backend_name: Name of the backend (e.g., 'vLLM', 'llama-box')
- model_backend_version: Specific backend version requested by model
- gpu_type: Type of GPU (e.g., 'CUDA', 'ROCm')
- runtime_version: Current runtime version to check
- Returns:
- Tuple of (is_supported, version_list):
- - is_supported: True if version is supported
- - version_list: List of all supported versions if not supported
- """
- kwargs = {
- "backend": gpu_type,
- "service": backend_name.lower(),
- "backend_version": runtime_version,
- "service_version": model_backend_version,
- }
- if not model_backend_version:
- kwargs["with_deprecated"] = False
- runners_list = list_backend_runners(**kwargs)
- if runners_list and len(runners_list) > 0:
- return True, []
- kwargs.pop("backend_version")
- all_runners = list_backend_runners(**kwargs)
- if not all_runners or len(all_runners) == 0:
- return False, []
- supported_versions = []
- for runner in all_runners[0].versions:
- runner_version = runner.version
- supported_versions.append(runner_version)
- if not supported_versions:
- return False, []
- # Remove duplicates and sort versions
- unique_versions = list(set(supported_versions))
- sorted_versions = sorted(
- unique_versions, key=lambda v: (compare_versions(v, "0.0.0"), v)
- )
- min_version = sorted_versions[0]
- if compare_versions(runtime_version, min_version) < 0:
- return False, sorted_versions
- return True, []
- def _format_runtime_upgrade_message(
- backend_name: str,
- backend_version: Optional[str],
- gpu_type: str,
- current_version: str,
- all_versions: List[str],
- ) -> str:
- """
- Format an upgrade message for runtime version incompatibility.
- Args:
- backend_name: Name of the backend
- gpu_type: Type of GPU
- current_version: Current runtime version
- all_versions: List of all supported versions
- Returns:
- Formatted error message
- """
- msg = (
- f"The highest supported GPU runtime version ({gpu_type} {current_version}) "
- f"does not meet the requirements for backend {backend_name if not backend_version else backend_name + ' ' + backend_version}. "
- )
- if all_versions:
- versions_str = ", ".join(all_versions)
- msg += f"Supported versions: {versions_str}. "
- msg += f"It is recommended to upgrade your GPU driver to a version compatible with {gpu_type} {all_versions[-1]} or later."
- return msg
- async def evaluate_model_metadata(
- config: Config,
- model: ModelSpec,
- workers: List[Worker],
- ) -> Tuple[bool, List[str]]:
- try:
- if model.source == SourceEnum.LOCAL_PATH:
- # Check if local path exists on server
- path_exists_on_server = os.path.exists(model.local_path)
- if not path_exists_on_server:
- # Try to check if path exists on any worker
- try:
- async with WorkerFilesystemClient() as filesystem_client:
- selector = WorkerSelector(filesystem_client)
- found_worker = await selector.find_worker_with_path(
- workers, path=model.local_path
- )
- if found_worker:
- logger.info(
- f"Found path {model.local_path} on worker {found_worker.id}"
- )
- else:
- # Path not found on any worker
- return False, [
- "The model file path you specified does not exist."
- "Please ensure the model file is accessible from at least one node."
- ]
- except Exception as e:
- logger.warning(
- f"Failed to check path on workers: {e}, falling back to local check"
- )
- # Fallback to original warning
- return False, [
- "Failed to get model metadata. The model file path you specified does not exist."
- ]
- if model.source in [
- SourceEnum.HUGGING_FACE,
- SourceEnum.MODEL_SCOPE,
- ]:
- repo_id = model.huggingface_repo_id
- if model.source == SourceEnum.MODEL_SCOPE:
- repo_id = model.model_scope_model_id
- if not is_repo_cached(repo_id, model.source):
- await run_in_thread(
- auth_check,
- timeout=15,
- model=model,
- huggingface_token=config.huggingface_token,
- )
- if is_gguf_model(model):
- await scheduler.evaluate_gguf_model(model, workers=workers)
- elif not is_audio_model(model):
- await scheduler.evaluate_pretrained_config(model, workers=workers)
- except Exception as e:
- if model.env and model.env.get("GPUSTACK_SKIP_MODEL_EVALUATION"):
- logger.warning(f"Ignore model evaluation error for model {model.name}: {e}")
- return True, []
- return False, [str(e)]
- return True, []
- async def evaluate_model_input(
- session: AsyncSession,
- model: ModelSpec,
- cluster_id: Optional[int] = None,
- ) -> Tuple[bool, List[str]]:
- try:
- await validate_model_in(session, model, cluster_id=cluster_id)
- except HTTPException as e:
- return False, [e.message]
- except Exception as e:
- return False, [str(e)]
- return True, []
- def set_default_spec(model: ModelSpec) -> bool:
- """
- Set the default spec for the model if it matches the catalog spec.
- """
- model_spec_in_catalog = model_set_specs_by_key.get(model.model_source_key)
- modified = False
- if model_spec_in_catalog:
- if (
- model_spec_in_catalog.backend_parameters
- and model.backend_parameters is None
- ):
- model.backend_parameters = model_spec_in_catalog.backend_parameters
- modified = True
- if model_spec_in_catalog.env and model.env is None:
- model.env = model_spec_in_catalog.env
- modified = True
- if model_spec_in_catalog.categories and not model.categories:
- model.categories = model_spec_in_catalog.categories
- modified = True
- gpus_per_replica_modified = scheduler.set_model_gpus_per_replica(model)
- return modified or gpus_per_replica_modified
|