evaluator.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634
  1. import asyncio
  2. import hashlib
  3. import json
  4. import logging
  5. import os
  6. from collections import defaultdict
  7. from typing import List, Tuple, Optional, Dict
  8. from gpustack_runtime.detector import ManufacturerEnum
  9. from sqlmodel.ext.asyncio.session import AsyncSession
  10. from cachetools import TTLCache
  11. from aiolimiter import AsyncLimiter
  12. from gpustack.api.exceptions import HTTPException
  13. from gpustack.client.worker_filesystem_client import WorkerFilesystemClient
  14. from gpustack.config.config import Config
  15. from gpustack.policies.base import ModelInstanceScheduleCandidate
  16. from gpustack import envs
  17. from gpustack.routes.models import validate_model_in
  18. from gpustack.scheduler import scheduler
  19. from gpustack.server.catalog import model_set_specs_by_key
  20. from gpustack.schemas.model_evaluations import (
  21. ModelEvaluationResult,
  22. ModelSpec,
  23. ResourceClaim,
  24. )
  25. from gpustack.schemas.models import (
  26. ModelInstance,
  27. BackendEnum,
  28. SourceEnum,
  29. get_backend,
  30. is_gguf_model,
  31. is_audio_model,
  32. )
  33. from gpustack.schemas.workers import Worker, WorkerStateEnum
  34. from gpustack.schemas.inference_backend import (
  35. is_built_in_backend_custom_version,
  36. is_custom_backend,
  37. )
  38. from gpustack.server.worker_selector import WorkerSelector
  39. from gpustack_runner import list_backend_runners
  40. from gpustack_runtime.deployer.__utils__ import compare_versions
  41. from gpustack.utils.gpu import (
  42. all_gpu_match,
  43. any_gpu_match,
  44. find_one_gpu,
  45. compare_compute_capability,
  46. )
  47. from gpustack.utils.hub import (
  48. auth_check,
  49. get_hugging_face_model_min_gguf_path,
  50. get_model_scope_model_min_gguf_path,
  51. is_repo_cached,
  52. )
  53. from gpustack.utils.task import run_in_thread
  54. from gpustack.utils.profiling import time_decorator
  55. logger = logging.getLogger(__name__)
  56. evaluate_cache = TTLCache(
  57. maxsize=envs.MODEL_EVALUATION_CACHE_MAX_SIZE, ttl=envs.MODEL_EVALUATION_CACHE_TTL
  58. )
  59. # To reduce the likelihood of hitting the Hugging Face API rate limit (600 RPM)
  60. # Limit the number of concurrent evaluations to 50 per 10 seconds
  61. evaluate_model_limiter = AsyncLimiter(50, 10)
  62. @time_decorator
  63. async def evaluate_models(
  64. config: Config,
  65. session: AsyncSession,
  66. model_specs: List[ModelSpec],
  67. cluster_id: Optional[int] = None,
  68. ) -> List[ModelEvaluationResult]:
  69. """
  70. Evaluate the compatibility of a list of model specs with the available workers.
  71. """
  72. fields = {
  73. "deleted_at": None,
  74. }
  75. if cluster_id is not None:
  76. fields["cluster_id"] = cluster_id
  77. extra_conditions = [
  78. ~(
  79. Worker.state.in_(
  80. [
  81. WorkerStateEnum.PROVISIONING,
  82. WorkerStateEnum.DELETING,
  83. WorkerStateEnum.ERROR,
  84. ]
  85. )
  86. )
  87. ]
  88. workers = await Worker.all_by_fields(
  89. session, fields=fields, extra_conditions=extra_conditions
  90. )
  91. model_instances = await ModelInstance.all_by_fields(session, fields=fields)
  92. if len(model_specs) == 1:
  93. # Sort worker for single-model evaluation only. No need for batch evaluation.
  94. workers = await scheduler.prioritize_workers_with_model_files(
  95. session, model_specs[0], workers
  96. )
  97. async def evaluate(model: ModelSpec):
  98. return await evaluate_model_with_cache(
  99. config,
  100. session,
  101. model,
  102. workers,
  103. model_instances,
  104. cluster_id=cluster_id,
  105. )
  106. tasks = [evaluate(model) for model in model_specs]
  107. results = await asyncio.gather(*tasks)
  108. return results
  109. def make_hashable_key(model: ModelSpec, workers: List[Worker]) -> str:
  110. key_data = json.dumps(
  111. {
  112. "model": model.model_dump(mode="json"),
  113. "workers": [
  114. w.model_dump(
  115. mode="json",
  116. exclude={
  117. "status": {
  118. "cpu": True,
  119. "swap": True,
  120. "filesystem": True,
  121. "os": True,
  122. "kernel": True,
  123. "uptime": True,
  124. "memory": {"utilization_rate", "used"},
  125. "gpu_devices": {
  126. "__all__": {
  127. "temperature": True,
  128. "core": {"utilization_rate"},
  129. "memory": {"utilization_rate", "used"},
  130. },
  131. },
  132. },
  133. "heartbeat_time": True,
  134. "created_at": True,
  135. "updated_at": True,
  136. },
  137. )
  138. for w in workers
  139. ],
  140. },
  141. sort_keys=True,
  142. )
  143. return hashlib.md5(key_data.encode()).hexdigest()
  144. async def evaluate_model_with_cache(
  145. config: Config,
  146. session: AsyncSession,
  147. model: ModelSpec,
  148. workers: List[Worker],
  149. model_instances: List[ModelInstance],
  150. cluster_id: Optional[int] = None,
  151. ) -> ModelEvaluationResult:
  152. cache_key = make_hashable_key(model, workers)
  153. if cache_key in evaluate_cache:
  154. logger.trace(
  155. f"Evaluation cache hit for model: {model.name or model.readable_source}"
  156. )
  157. return evaluate_cache[cache_key]
  158. try:
  159. async with evaluate_model_limiter:
  160. result = await evaluate_model(
  161. config, session, model, workers, model_instances, cluster_id=cluster_id
  162. )
  163. evaluate_cache[cache_key] = result
  164. except Exception as e:
  165. logger.exception(
  166. f"Error evaluating model {model.name or model.readable_source}: {e}"
  167. )
  168. result = ModelEvaluationResult(
  169. compatible=False, error=True, error_message=str(e)
  170. )
  171. return result
  172. @time_decorator
  173. async def evaluate_model(
  174. config: Config,
  175. session: AsyncSession,
  176. model: ModelSpec,
  177. workers: List[Worker],
  178. model_instances: List[ModelInstance],
  179. cluster_id: Optional[int] = None,
  180. ) -> ModelEvaluationResult:
  181. result = ModelEvaluationResult()
  182. if set_default_spec(model):
  183. result.default_spec = model.model_copy()
  184. await set_gguf_model_file_path(config, model)
  185. evaluations = [
  186. (evaluate_model_input, (session, model, cluster_id)),
  187. (evaluate_model_metadata, (config, model, workers)),
  188. (evaluate_environment, (model, workers)),
  189. (evaluate_runtime_version, (model, workers)),
  190. ]
  191. for evaluation, args in evaluations:
  192. compatible, messages = await evaluation(*args)
  193. if not compatible:
  194. result.compatible = False
  195. result.compatibility_messages = messages
  196. return result
  197. workers_by_cluster: Dict[int, List[Worker]] = defaultdict(list)
  198. for worker in workers:
  199. workers_by_cluster[worker.cluster_id].append(worker)
  200. overcommit_clusters = []
  201. result.resource_claim_by_cluster_id = {}
  202. for cluster_id, cluster_workers in workers_by_cluster.items():
  203. cluster_model_instances = [
  204. inst for inst in model_instances if inst.cluster_id == cluster_id
  205. ]
  206. candidate, schedule_messages = await scheduler.find_candidate(
  207. config, model, cluster_workers, cluster_model_instances
  208. )
  209. if not candidate:
  210. result.scheduling_messages.extend(schedule_messages)
  211. continue
  212. if candidate.overcommit:
  213. overcommit_clusters.append(cluster_id)
  214. result.scheduling_messages.extend(schedule_messages)
  215. continue
  216. result.resource_claim_by_cluster_id[cluster_id] = (
  217. summarize_candidate_resource_claim(candidate)
  218. )
  219. if result.resource_claim_by_cluster_id:
  220. result.resource_claim = next(iter(result.resource_claim_by_cluster_id.values()))
  221. else:
  222. result.resource_claim = None
  223. result.compatible = False
  224. result.compatibility_messages.append(
  225. "Unable to find a schedulable worker for the model."
  226. )
  227. return result
  228. def summarize_candidate_resource_claim(
  229. candidate: ModelInstanceScheduleCandidate,
  230. ) -> ResourceClaim:
  231. """
  232. Summarize the computed resource claim for a schedule candidate.
  233. For VRAM, prefer `estimated_vram` (the model's actual estimated requirement)
  234. over the allocated `vram` (total_gpu_memory * utilization_rate), so the
  235. reported value reflects the model's actual need rather than the GPU allocation.
  236. """
  237. computed_resource_claims = [candidate.computed_resource_claim]
  238. if candidate.subordinate_workers:
  239. computed_resource_claims.extend(
  240. sw.computed_resource_claim
  241. for sw in candidate.subordinate_workers
  242. if sw.computed_resource_claim is not None
  243. )
  244. ram, vram = 0, 0
  245. has_estimated_vram = False
  246. for computed_resource_claim in computed_resource_claims:
  247. ram += computed_resource_claim.ram or 0
  248. if computed_resource_claim.estimated_vram is not None:
  249. has_estimated_vram = True
  250. vram += computed_resource_claim.estimated_vram
  251. elif computed_resource_claim.vram:
  252. vram += sum(
  253. v for v in computed_resource_claim.vram.values() if v is not None
  254. )
  255. # If any claim has estimated_vram, use it for all (ignore allocated vram from
  256. # claims that don't have estimated_vram to avoid mixing semantics).
  257. if has_estimated_vram:
  258. vram = sum(
  259. c.estimated_vram
  260. for c in computed_resource_claims
  261. if c.estimated_vram is not None
  262. )
  263. return ResourceClaim(ram=ram, vram=vram)
  264. async def set_gguf_model_file_path(config: Config, model: ModelSpec):
  265. if (
  266. model.source == SourceEnum.HUGGING_FACE
  267. and "gguf" in model.huggingface_repo_id.lower()
  268. and not model.huggingface_filename
  269. ):
  270. model.huggingface_filename = await run_in_thread(
  271. get_hugging_face_model_min_gguf_path,
  272. timeout=15,
  273. model_id=model.huggingface_repo_id,
  274. token=config.huggingface_token,
  275. )
  276. elif (
  277. model.source == SourceEnum.MODEL_SCOPE
  278. and "gguf" in model.model_scope_model_id.lower()
  279. and not model.model_scope_file_path
  280. ):
  281. model.model_scope_file_path = await run_in_thread(
  282. get_model_scope_model_min_gguf_path,
  283. timeout=15,
  284. model_id=model.model_scope_model_id,
  285. )
  286. async def evaluate_environment(
  287. model: ModelSpec,
  288. workers: List[Worker],
  289. ) -> Tuple[bool, List[str]]:
  290. backend = get_backend(model)
  291. if backend == BackendEnum.ASCEND_MINDIE and not any_gpu_match(
  292. workers, lambda gpu: gpu.vendor == ManufacturerEnum.ASCEND.value
  293. ):
  294. return False, [
  295. "The Ascend MindIE backend requires Ascend NPUs but none are available."
  296. ]
  297. if (
  298. backend == BackendEnum.SGLANG
  299. and all_gpu_match(
  300. workers, lambda gpu: gpu.vendor == ManufacturerEnum.NVIDIA.value
  301. )
  302. and not any_gpu_match(
  303. workers,
  304. lambda gpu: compare_compute_capability(gpu.compute_capability, "8.0") >= 0,
  305. )
  306. ):
  307. # Ref: https://github.com/sgl-project/sglang/issues/6006
  308. gpu = find_one_gpu(workers)
  309. return False, [
  310. "The SGLang backend requires NVIDIA GPUs with compute capability 8.0 or higher "
  311. "(e.g., A100/SM80, H100/SM90, RTX 3090/SM86). "
  312. + (
  313. f"Available GPU: {gpu.name} (compute capability: {gpu.compute_capability})"
  314. if gpu
  315. else ""
  316. )
  317. ]
  318. return True, []
  319. async def evaluate_runtime_version(
  320. model: ModelSpec, workers: List[Worker]
  321. ) -> Tuple[bool, List[str]]:
  322. """
  323. Evaluate if the highest GPU runtime version across all workers
  324. meets the minimum requirements for the backend runner.
  325. Args:
  326. model: Model specification to evaluate
  327. workers: List of workers to check
  328. Returns:
  329. Tuple of (compatible, messages):
  330. - compatible: True if runtime version is sufficient, False otherwise
  331. - messages: List of error messages if incompatible, empty list otherwise
  332. """
  333. backend_name = get_backend(model)
  334. if is_custom_backend(backend_name):
  335. return True, []
  336. if is_built_in_backend_custom_version(
  337. backend_name, model.backend_version, model.image_name
  338. ):
  339. return True, []
  340. max_runtime_version = None
  341. max_gpu_type = None
  342. for worker in workers:
  343. if not worker.status or not worker.status.gpu_devices:
  344. continue
  345. gpu = next(
  346. (
  347. gpu
  348. for gpu in worker.status.gpu_devices
  349. if gpu.type and gpu.runtime_version
  350. ),
  351. None,
  352. )
  353. if not gpu:
  354. continue
  355. if max_runtime_version is None:
  356. max_runtime_version = gpu.runtime_version
  357. max_gpu_type = gpu.type
  358. elif compare_versions(gpu.runtime_version, max_runtime_version) > 0:
  359. max_runtime_version = gpu.runtime_version
  360. max_gpu_type = gpu.type
  361. if max_runtime_version is None:
  362. return True, []
  363. is_supported, version_list = await _check_runtime_version(
  364. backend_name, model.backend_version, max_gpu_type, max_runtime_version
  365. )
  366. if is_supported:
  367. return True, []
  368. msg = _format_runtime_upgrade_message(
  369. backend_name,
  370. model.backend_version,
  371. max_gpu_type,
  372. max_runtime_version,
  373. version_list,
  374. )
  375. return False, [msg]
  376. async def _check_runtime_version(
  377. backend_name: str,
  378. model_backend_version: Optional[str],
  379. gpu_type: str,
  380. runtime_version: str,
  381. ) -> Tuple[bool, List[str]]:
  382. """
  383. Check if the runtime version meets the minimum requirements.
  384. Args:
  385. backend_name: Name of the backend (e.g., 'vLLM', 'llama-box')
  386. model_backend_version: Specific backend version requested by model
  387. gpu_type: Type of GPU (e.g., 'CUDA', 'ROCm')
  388. runtime_version: Current runtime version to check
  389. Returns:
  390. Tuple of (is_supported, version_list):
  391. - is_supported: True if version is supported
  392. - version_list: List of all supported versions if not supported
  393. """
  394. kwargs = {
  395. "backend": gpu_type,
  396. "service": backend_name.lower(),
  397. "backend_version": runtime_version,
  398. "service_version": model_backend_version,
  399. }
  400. if not model_backend_version:
  401. kwargs["with_deprecated"] = False
  402. runners_list = list_backend_runners(**kwargs)
  403. if runners_list and len(runners_list) > 0:
  404. return True, []
  405. kwargs.pop("backend_version")
  406. all_runners = list_backend_runners(**kwargs)
  407. if not all_runners or len(all_runners) == 0:
  408. return False, []
  409. supported_versions = []
  410. for runner in all_runners[0].versions:
  411. runner_version = runner.version
  412. supported_versions.append(runner_version)
  413. if not supported_versions:
  414. return False, []
  415. # Remove duplicates and sort versions
  416. unique_versions = list(set(supported_versions))
  417. sorted_versions = sorted(
  418. unique_versions, key=lambda v: (compare_versions(v, "0.0.0"), v)
  419. )
  420. min_version = sorted_versions[0]
  421. if compare_versions(runtime_version, min_version) < 0:
  422. return False, sorted_versions
  423. return True, []
  424. def _format_runtime_upgrade_message(
  425. backend_name: str,
  426. backend_version: Optional[str],
  427. gpu_type: str,
  428. current_version: str,
  429. all_versions: List[str],
  430. ) -> str:
  431. """
  432. Format an upgrade message for runtime version incompatibility.
  433. Args:
  434. backend_name: Name of the backend
  435. gpu_type: Type of GPU
  436. current_version: Current runtime version
  437. all_versions: List of all supported versions
  438. Returns:
  439. Formatted error message
  440. """
  441. msg = (
  442. f"The highest supported GPU runtime version ({gpu_type} {current_version}) "
  443. f"does not meet the requirements for backend {backend_name if not backend_version else backend_name + ' ' + backend_version}. "
  444. )
  445. if all_versions:
  446. versions_str = ", ".join(all_versions)
  447. msg += f"Supported versions: {versions_str}. "
  448. msg += f"It is recommended to upgrade your GPU driver to a version compatible with {gpu_type} {all_versions[-1]} or later."
  449. return msg
  450. async def evaluate_model_metadata(
  451. config: Config,
  452. model: ModelSpec,
  453. workers: List[Worker],
  454. ) -> Tuple[bool, List[str]]:
  455. try:
  456. if model.source == SourceEnum.LOCAL_PATH:
  457. # Check if local path exists on server
  458. path_exists_on_server = os.path.exists(model.local_path)
  459. if not path_exists_on_server:
  460. # Try to check if path exists on any worker
  461. try:
  462. async with WorkerFilesystemClient() as filesystem_client:
  463. selector = WorkerSelector(filesystem_client)
  464. found_worker = await selector.find_worker_with_path(
  465. workers, path=model.local_path
  466. )
  467. if found_worker:
  468. logger.info(
  469. f"Found path {model.local_path} on worker {found_worker.id}"
  470. )
  471. else:
  472. # Path not found on any worker
  473. return False, [
  474. "The model file path you specified does not exist."
  475. "Please ensure the model file is accessible from at least one node."
  476. ]
  477. except Exception as e:
  478. logger.warning(
  479. f"Failed to check path on workers: {e}, falling back to local check"
  480. )
  481. # Fallback to original warning
  482. return False, [
  483. "Failed to get model metadata. The model file path you specified does not exist."
  484. ]
  485. if model.source in [
  486. SourceEnum.HUGGING_FACE,
  487. SourceEnum.MODEL_SCOPE,
  488. ]:
  489. repo_id = model.huggingface_repo_id
  490. if model.source == SourceEnum.MODEL_SCOPE:
  491. repo_id = model.model_scope_model_id
  492. if not is_repo_cached(repo_id, model.source):
  493. await run_in_thread(
  494. auth_check,
  495. timeout=15,
  496. model=model,
  497. huggingface_token=config.huggingface_token,
  498. )
  499. if is_gguf_model(model):
  500. await scheduler.evaluate_gguf_model(model, workers=workers)
  501. elif not is_audio_model(model):
  502. await scheduler.evaluate_pretrained_config(model, workers=workers)
  503. except Exception as e:
  504. if model.env and model.env.get("GPUSTACK_SKIP_MODEL_EVALUATION"):
  505. logger.warning(f"Ignore model evaluation error for model {model.name}: {e}")
  506. return True, []
  507. return False, [str(e)]
  508. return True, []
  509. async def evaluate_model_input(
  510. session: AsyncSession,
  511. model: ModelSpec,
  512. cluster_id: Optional[int] = None,
  513. ) -> Tuple[bool, List[str]]:
  514. try:
  515. await validate_model_in(session, model, cluster_id=cluster_id)
  516. except HTTPException as e:
  517. return False, [e.message]
  518. except Exception as e:
  519. return False, [str(e)]
  520. return True, []
  521. def set_default_spec(model: ModelSpec) -> bool:
  522. """
  523. Set the default spec for the model if it matches the catalog spec.
  524. """
  525. model_spec_in_catalog = model_set_specs_by_key.get(model.model_source_key)
  526. modified = False
  527. if model_spec_in_catalog:
  528. if (
  529. model_spec_in_catalog.backend_parameters
  530. and model.backend_parameters is None
  531. ):
  532. model.backend_parameters = model_spec_in_catalog.backend_parameters
  533. modified = True
  534. if model_spec_in_catalog.env and model.env is None:
  535. model.env = model_spec_in_catalog.env
  536. modified = True
  537. if model_spec_in_catalog.categories and not model.categories:
  538. model.categories = model_spec_in_catalog.categories
  539. modified = True
  540. gpus_per_replica_modified = scheduler.set_model_gpus_per_replica(model)
  541. return modified or gpus_per_replica_modified