evaluator.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617
  1. import asyncio
  2. import hashlib
  3. import json
  4. import logging
  5. import os
  6. from collections import defaultdict
  7. from typing import List, Tuple, Optional, Dict
  8. from gpustack_runtime.detector import ManufacturerEnum
  9. from sqlmodel.ext.asyncio.session import AsyncSession
  10. from cachetools import TTLCache
  11. from aiolimiter import AsyncLimiter
  12. from gpustack.api.exceptions import HTTPException
  13. from gpustack.client.worker_filesystem_client import WorkerFilesystemClient
  14. from gpustack.config.config import Config
  15. from gpustack.policies.base import ModelInstanceScheduleCandidate
  16. from gpustack import envs
  17. from gpustack.routes.models import validate_model_in
  18. from gpustack.scheduler import scheduler
  19. from gpustack.server.catalog import model_set_specs_by_key
  20. from gpustack.schemas.model_evaluations import (
  21. ModelEvaluationResult,
  22. ModelSpec,
  23. ResourceClaim,
  24. )
  25. from gpustack.schemas.models import (
  26. ModelInstance,
  27. BackendEnum,
  28. SourceEnum,
  29. get_backend,
  30. is_gguf_model,
  31. is_audio_model,
  32. )
  33. from gpustack.schemas.workers import Worker, WorkerStateEnum
  34. from gpustack.schemas.inference_backend import (
  35. is_built_in_backend_custom_version,
  36. is_custom_backend,
  37. )
  38. from gpustack.server.worker_selector import WorkerSelector
  39. from gpustack_runner import list_backend_runners
  40. from gpustack_runtime.deployer.__utils__ import compare_versions
  41. from gpustack.utils.gpu import (
  42. all_gpu_match,
  43. any_gpu_match,
  44. find_one_gpu,
  45. compare_compute_capability,
  46. )
  47. from gpustack.utils.hub import (
  48. auth_check,
  49. get_hugging_face_model_min_gguf_path,
  50. get_model_scope_model_min_gguf_path,
  51. is_repo_cached,
  52. )
  53. from gpustack.utils.task import run_in_thread
  54. from gpustack.utils.profiling import time_decorator
  55. logger = logging.getLogger(__name__)
  56. evaluate_cache = TTLCache(
  57. maxsize=envs.MODEL_EVALUATION_CACHE_MAX_SIZE, ttl=envs.MODEL_EVALUATION_CACHE_TTL
  58. )
  59. # To reduce the likelihood of hitting the Hugging Face API rate limit (600 RPM)
  60. # Limit the number of concurrent evaluations to 50 per 10 seconds
  61. evaluate_model_limiter = AsyncLimiter(50, 10)
  62. @time_decorator
  63. async def evaluate_models(
  64. config: Config,
  65. session: AsyncSession,
  66. model_specs: List[ModelSpec],
  67. cluster_id: Optional[int] = None,
  68. ) -> List[ModelEvaluationResult]:
  69. """
  70. Evaluate the compatibility of a list of model specs with the available workers.
  71. """
  72. fields = {
  73. "deleted_at": None,
  74. }
  75. if cluster_id is not None:
  76. fields["cluster_id"] = cluster_id
  77. extra_conditions = [
  78. ~(
  79. Worker.state.in_(
  80. [
  81. WorkerStateEnum.PROVISIONING,
  82. WorkerStateEnum.DELETING,
  83. WorkerStateEnum.ERROR,
  84. ]
  85. )
  86. )
  87. ]
  88. workers = await Worker.all_by_fields(
  89. session, fields=fields, extra_conditions=extra_conditions
  90. )
  91. model_instances = await ModelInstance.all_by_fields(session, fields=fields)
  92. if len(model_specs) == 1:
  93. # Sort worker for single-model evaluation only. No need for batch evaluation.
  94. workers = await scheduler.prioritize_workers_with_model_files(
  95. session, model_specs[0], workers
  96. )
  97. async def evaluate(model: ModelSpec):
  98. return await evaluate_model_with_cache(
  99. config,
  100. session,
  101. model,
  102. workers,
  103. model_instances,
  104. cluster_id=cluster_id,
  105. )
  106. tasks = [evaluate(model) for model in model_specs]
  107. results = await asyncio.gather(*tasks)
  108. return results
  109. def make_hashable_key(model: ModelSpec, workers: List[Worker]) -> str:
  110. key_data = json.dumps(
  111. {
  112. "model": model.model_dump(mode="json"),
  113. "workers": [
  114. w.model_dump(
  115. mode="json",
  116. exclude={
  117. "status": {
  118. "cpu": True,
  119. "swap": True,
  120. "filesystem": True,
  121. "os": True,
  122. "kernel": True,
  123. "uptime": True,
  124. "memory": {"utilization_rate", "used"},
  125. "gpu_devices": {
  126. "__all__": {
  127. "temperature": True,
  128. "core": {"utilization_rate"},
  129. "memory": {"utilization_rate", "used"},
  130. },
  131. },
  132. },
  133. "heartbeat_time": True,
  134. "created_at": True,
  135. "updated_at": True,
  136. },
  137. )
  138. for w in workers
  139. ],
  140. },
  141. sort_keys=True,
  142. )
  143. return hashlib.md5(key_data.encode()).hexdigest()
  144. async def evaluate_model_with_cache(
  145. config: Config,
  146. session: AsyncSession,
  147. model: ModelSpec,
  148. workers: List[Worker],
  149. model_instances: List[ModelInstance],
  150. cluster_id: Optional[int] = None,
  151. ) -> ModelEvaluationResult:
  152. cache_key = make_hashable_key(model, workers)
  153. if cache_key in evaluate_cache:
  154. logger.trace(
  155. f"Evaluation cache hit for model: {model.name or model.readable_source}"
  156. )
  157. return evaluate_cache[cache_key]
  158. try:
  159. async with evaluate_model_limiter:
  160. result = await evaluate_model(
  161. config, session, model, workers, model_instances, cluster_id=cluster_id
  162. )
  163. evaluate_cache[cache_key] = result
  164. except Exception as e:
  165. logger.exception(
  166. f"Error evaluating model {model.name or model.readable_source}: {e}"
  167. )
  168. result = ModelEvaluationResult(
  169. compatible=False, error=True, error_message=str(e)
  170. )
  171. return result
  172. @time_decorator
  173. async def evaluate_model(
  174. config: Config,
  175. session: AsyncSession,
  176. model: ModelSpec,
  177. workers: List[Worker],
  178. model_instances: List[ModelInstance],
  179. cluster_id: Optional[int] = None,
  180. ) -> ModelEvaluationResult:
  181. result = ModelEvaluationResult()
  182. if set_default_spec(model):
  183. result.default_spec = model.model_copy()
  184. await set_gguf_model_file_path(config, model)
  185. evaluations = [
  186. (evaluate_model_input, (session, model, cluster_id)),
  187. (evaluate_model_metadata, (config, model, workers)),
  188. (evaluate_environment, (model, workers)),
  189. (evaluate_runtime_version, (model, workers)),
  190. ]
  191. for evaluation, args in evaluations:
  192. compatible, messages = await evaluation(*args)
  193. if not compatible:
  194. result.compatible = False
  195. result.compatibility_messages = messages
  196. return result
  197. workers_by_cluster: Dict[int, List[Worker]] = defaultdict(list)
  198. for worker in workers:
  199. workers_by_cluster[worker.cluster_id].append(worker)
  200. overcommit_clusters = []
  201. result.resource_claim_by_cluster_id = {}
  202. for cluster_id, cluster_workers in workers_by_cluster.items():
  203. cluster_model_instances = [
  204. inst for inst in model_instances if inst.cluster_id == cluster_id
  205. ]
  206. candidate, schedule_messages = await scheduler.find_candidate(
  207. config, model, cluster_workers, cluster_model_instances
  208. )
  209. if not candidate:
  210. result.scheduling_messages.extend(schedule_messages)
  211. continue
  212. if candidate.overcommit:
  213. overcommit_clusters.append(cluster_id)
  214. result.scheduling_messages.extend(schedule_messages)
  215. continue
  216. result.resource_claim_by_cluster_id[cluster_id] = (
  217. summarize_candidate_resource_claim(candidate)
  218. )
  219. if result.resource_claim_by_cluster_id:
  220. result.resource_claim = next(iter(result.resource_claim_by_cluster_id.values()))
  221. else:
  222. result.resource_claim = None
  223. result.compatible = False
  224. result.compatibility_messages.append(
  225. "Unable to find a schedulable worker for the model."
  226. )
  227. return result
  228. def summarize_candidate_resource_claim(
  229. candidate: ModelInstanceScheduleCandidate,
  230. ) -> ResourceClaim:
  231. """
  232. Summarize the computed resource claim for a schedule candidate.
  233. """
  234. computed_resource_claims = [candidate.computed_resource_claim]
  235. if candidate.subordinate_workers:
  236. computed_resource_claims.extend(
  237. sw.computed_resource_claim
  238. for sw in candidate.subordinate_workers
  239. if sw.computed_resource_claim is not None
  240. )
  241. ram, vram = 0, 0
  242. for computed_resource_claim in computed_resource_claims:
  243. ram += computed_resource_claim.ram or 0
  244. if computed_resource_claim.vram:
  245. vram += sum(
  246. v for v in computed_resource_claim.vram.values() if v is not None
  247. )
  248. return ResourceClaim(ram=ram, vram=vram)
  249. async def set_gguf_model_file_path(config: Config, model: ModelSpec):
  250. if (
  251. model.source == SourceEnum.HUGGING_FACE
  252. and "gguf" in model.huggingface_repo_id.lower()
  253. and not model.huggingface_filename
  254. ):
  255. model.huggingface_filename = await run_in_thread(
  256. get_hugging_face_model_min_gguf_path,
  257. timeout=15,
  258. model_id=model.huggingface_repo_id,
  259. token=config.huggingface_token,
  260. )
  261. elif (
  262. model.source == SourceEnum.MODEL_SCOPE
  263. and "gguf" in model.model_scope_model_id.lower()
  264. and not model.model_scope_file_path
  265. ):
  266. model.model_scope_file_path = await run_in_thread(
  267. get_model_scope_model_min_gguf_path,
  268. timeout=15,
  269. model_id=model.model_scope_model_id,
  270. )
  271. async def evaluate_environment(
  272. model: ModelSpec,
  273. workers: List[Worker],
  274. ) -> Tuple[bool, List[str]]:
  275. backend = get_backend(model)
  276. if backend == BackendEnum.ASCEND_MINDIE and not any_gpu_match(
  277. workers, lambda gpu: gpu.vendor == ManufacturerEnum.ASCEND.value
  278. ):
  279. return False, [
  280. "The Ascend MindIE backend requires Ascend NPUs but none are available."
  281. ]
  282. if (
  283. backend == BackendEnum.SGLANG
  284. and all_gpu_match(
  285. workers, lambda gpu: gpu.vendor == ManufacturerEnum.NVIDIA.value
  286. )
  287. and not any_gpu_match(
  288. workers,
  289. lambda gpu: compare_compute_capability(gpu.compute_capability, "8.0") >= 0,
  290. )
  291. ):
  292. # Ref: https://github.com/sgl-project/sglang/issues/6006
  293. gpu = find_one_gpu(workers)
  294. return False, [
  295. "The SGLang backend requires NVIDIA GPUs with compute capability 8.0 or higher "
  296. "(e.g., A100/SM80, H100/SM90, RTX 3090/SM86). "
  297. + (
  298. f"Available GPU: {gpu.name} (compute capability: {gpu.compute_capability})"
  299. if gpu
  300. else ""
  301. )
  302. ]
  303. return True, []
  304. async def evaluate_runtime_version(
  305. model: ModelSpec, workers: List[Worker]
  306. ) -> Tuple[bool, List[str]]:
  307. """
  308. Evaluate if the highest GPU runtime version across all workers
  309. meets the minimum requirements for the backend runner.
  310. Args:
  311. model: Model specification to evaluate
  312. workers: List of workers to check
  313. Returns:
  314. Tuple of (compatible, messages):
  315. - compatible: True if runtime version is sufficient, False otherwise
  316. - messages: List of error messages if incompatible, empty list otherwise
  317. """
  318. backend_name = get_backend(model)
  319. if is_custom_backend(backend_name):
  320. return True, []
  321. if is_built_in_backend_custom_version(
  322. backend_name, model.backend_version, model.image_name
  323. ):
  324. return True, []
  325. max_runtime_version = None
  326. max_gpu_type = None
  327. for worker in workers:
  328. if not worker.status or not worker.status.gpu_devices:
  329. continue
  330. gpu = next(
  331. (
  332. gpu
  333. for gpu in worker.status.gpu_devices
  334. if gpu.type and gpu.runtime_version
  335. ),
  336. None,
  337. )
  338. if not gpu:
  339. continue
  340. if max_runtime_version is None:
  341. max_runtime_version = gpu.runtime_version
  342. max_gpu_type = gpu.type
  343. elif compare_versions(gpu.runtime_version, max_runtime_version) > 0:
  344. max_runtime_version = gpu.runtime_version
  345. max_gpu_type = gpu.type
  346. if max_runtime_version is None:
  347. return True, []
  348. is_supported, version_list = await _check_runtime_version(
  349. backend_name, model.backend_version, max_gpu_type, max_runtime_version
  350. )
  351. if is_supported:
  352. return True, []
  353. msg = _format_runtime_upgrade_message(
  354. backend_name,
  355. model.backend_version,
  356. max_gpu_type,
  357. max_runtime_version,
  358. version_list,
  359. )
  360. return False, [msg]
  361. async def _check_runtime_version(
  362. backend_name: str,
  363. model_backend_version: Optional[str],
  364. gpu_type: str,
  365. runtime_version: str,
  366. ) -> Tuple[bool, List[str]]:
  367. """
  368. Check if the runtime version meets the minimum requirements.
  369. Args:
  370. backend_name: Name of the backend (e.g., 'vLLM', 'llama-box')
  371. model_backend_version: Specific backend version requested by model
  372. gpu_type: Type of GPU (e.g., 'CUDA', 'ROCm')
  373. runtime_version: Current runtime version to check
  374. Returns:
  375. Tuple of (is_supported, version_list):
  376. - is_supported: True if version is supported
  377. - version_list: List of all supported versions if not supported
  378. """
  379. kwargs = {
  380. "backend": gpu_type,
  381. "service": backend_name.lower(),
  382. "backend_version": runtime_version,
  383. "service_version": model_backend_version,
  384. }
  385. if not model_backend_version:
  386. kwargs["with_deprecated"] = False
  387. runners_list = list_backend_runners(**kwargs)
  388. if runners_list and len(runners_list) > 0:
  389. return True, []
  390. kwargs.pop("backend_version")
  391. all_runners = list_backend_runners(**kwargs)
  392. if not all_runners or len(all_runners) == 0:
  393. return False, []
  394. supported_versions = []
  395. for runner in all_runners[0].versions:
  396. runner_version = runner.version
  397. supported_versions.append(runner_version)
  398. if not supported_versions:
  399. return False, []
  400. # Remove duplicates and sort versions
  401. unique_versions = list(set(supported_versions))
  402. sorted_versions = sorted(
  403. unique_versions, key=lambda v: (compare_versions(v, "0.0.0"), v)
  404. )
  405. min_version = sorted_versions[0]
  406. if compare_versions(runtime_version, min_version) < 0:
  407. return False, sorted_versions
  408. return True, []
  409. def _format_runtime_upgrade_message(
  410. backend_name: str,
  411. backend_version: Optional[str],
  412. gpu_type: str,
  413. current_version: str,
  414. all_versions: List[str],
  415. ) -> str:
  416. """
  417. Format an upgrade message for runtime version incompatibility.
  418. Args:
  419. backend_name: Name of the backend
  420. gpu_type: Type of GPU
  421. current_version: Current runtime version
  422. all_versions: List of all supported versions
  423. Returns:
  424. Formatted error message
  425. """
  426. msg = (
  427. f"The highest supported GPU runtime version ({gpu_type} {current_version}) "
  428. f"does not meet the requirements for backend {backend_name if not backend_version else backend_name + ' ' + backend_version}. "
  429. )
  430. if all_versions:
  431. versions_str = ", ".join(all_versions)
  432. msg += f"Supported versions: {versions_str}. "
  433. msg += f"It is recommended to upgrade your GPU driver to a version compatible with {gpu_type} {all_versions[-1]} or later."
  434. return msg
  435. async def evaluate_model_metadata(
  436. config: Config,
  437. model: ModelSpec,
  438. workers: List[Worker],
  439. ) -> Tuple[bool, List[str]]:
  440. try:
  441. if model.source == SourceEnum.LOCAL_PATH:
  442. # Check if local path exists on server
  443. path_exists_on_server = os.path.exists(model.local_path)
  444. if not path_exists_on_server:
  445. # Try to check if path exists on any worker
  446. try:
  447. async with WorkerFilesystemClient() as filesystem_client:
  448. selector = WorkerSelector(filesystem_client)
  449. found_worker = await selector.find_worker_with_path(
  450. workers, path=model.local_path
  451. )
  452. if found_worker:
  453. logger.info(
  454. f"Found path {model.local_path} on worker {found_worker.id}"
  455. )
  456. else:
  457. # Path not found on any worker
  458. return False, [
  459. "The model file path you specified does not exist."
  460. "Please ensure the model file is accessible from at least one node."
  461. ]
  462. except Exception as e:
  463. logger.warning(
  464. f"Failed to check path on workers: {e}, falling back to local check"
  465. )
  466. # Fallback to original warning
  467. return False, [
  468. "Failed to get model metadata. The model file path you specified does not exist."
  469. ]
  470. if model.source in [
  471. SourceEnum.HUGGING_FACE,
  472. SourceEnum.MODEL_SCOPE,
  473. ]:
  474. repo_id = model.huggingface_repo_id
  475. if model.source == SourceEnum.MODEL_SCOPE:
  476. repo_id = model.model_scope_model_id
  477. if not is_repo_cached(repo_id, model.source):
  478. await run_in_thread(
  479. auth_check,
  480. timeout=15,
  481. model=model,
  482. huggingface_token=config.huggingface_token,
  483. )
  484. if is_gguf_model(model):
  485. await scheduler.evaluate_gguf_model(model, workers=workers)
  486. elif not is_audio_model(model):
  487. await scheduler.evaluate_pretrained_config(model, workers=workers)
  488. except Exception as e:
  489. if model.env and model.env.get("GPUSTACK_SKIP_MODEL_EVALUATION"):
  490. logger.warning(f"Ignore model evaluation error for model {model.name}: {e}")
  491. return True, []
  492. return False, [str(e)]
  493. return True, []
  494. async def evaluate_model_input(
  495. session: AsyncSession,
  496. model: ModelSpec,
  497. cluster_id: Optional[int] = None,
  498. ) -> Tuple[bool, List[str]]:
  499. try:
  500. await validate_model_in(session, model, cluster_id=cluster_id)
  501. except HTTPException as e:
  502. return False, [e.message]
  503. except Exception as e:
  504. return False, [str(e)]
  505. return True, []
  506. def set_default_spec(model: ModelSpec) -> bool:
  507. """
  508. Set the default spec for the model if it matches the catalog spec.
  509. """
  510. model_spec_in_catalog = model_set_specs_by_key.get(model.model_source_key)
  511. modified = False
  512. if model_spec_in_catalog:
  513. if (
  514. model_spec_in_catalog.backend_parameters
  515. and model.backend_parameters is None
  516. ):
  517. model.backend_parameters = model_spec_in_catalog.backend_parameters
  518. modified = True
  519. if model_spec_in_catalog.env and model.env is None:
  520. model.env = model_spec_in_catalog.env
  521. modified = True
  522. if model_spec_in_catalog.categories and not model.categories:
  523. model.categories = model_spec_in_catalog.categories
  524. modified = True
  525. gpus_per_replica_modified = scheduler.set_model_gpus_per_replica(model)
  526. return modified or gpus_per_replica_modified