benchmark_manager.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768
  1. import asyncio
  2. import multiprocessing
  3. import setproctitle
  4. import os
  5. import re
  6. import time
  7. from typing import Dict, Optional, Callable, List, Tuple
  8. import logging
  9. from collections import Counter, deque
  10. from gpustack_runtime.deployer import (
  11. delete_workload,
  12. get_workload,
  13. WorkloadStatusStateEnum,
  14. )
  15. from gpustack.api.exceptions import raise_if_response_error
  16. from gpustack.config.config import Config
  17. from gpustack.config import registration
  18. from gpustack.logging import RedirectStdoutStderr
  19. from gpustack.schemas.benchmark import (
  20. Benchmark,
  21. BenchmarkStateEnum,
  22. )
  23. from gpustack.utils.process import terminate_process_tree, add_signal_handlers
  24. from gpustack.worker.benchmark.runner import BenchmarkRunner
  25. from gpustack.client import ClientSet
  26. from gpustack.server.bus import Event, EventType
  27. from gpustack.worker.schemas.benchmark_runner import (
  28. GenerativeBenchmarksReport,
  29. GenerativeRequestStats,
  30. )
  31. from gpustack_runtime.deployer import logs_workload
  32. logger = logging.getLogger(__name__)
  33. HTTP_ERROR_PATTERN = re.compile(
  34. r"^HTTP\s+(?P<status>\d+):\s+(?P<msg>.*)\s+\(type=(?P<type>[^,]+),\s*code=(?P<code>[^)]+)\)$"
  35. )
  36. TRUNCATION_SUFFIX = "..."
  37. BENCHMARK_STATE_MESSAGE_MAX_LEN = 1024
  38. BENCHMARK_FAILURE_REASON_MAX_LEN = 220
  39. class BenchmarkManager:
  40. @property
  41. def _worker_id(self) -> int:
  42. return self._worker_id_getter()
  43. """
  44. The ID of current worker.
  45. """
  46. _config: Config
  47. """
  48. Global configuration.
  49. """
  50. _benchmark_log_dir: str
  51. """
  52. The directory to store logs of benchmarks(in subprocess).
  53. """
  54. _benchmark_dir: str
  55. """
  56. The directory to store results of benchmarks(in subprocess).
  57. """
  58. @property
  59. def _clientset(self) -> ClientSet:
  60. return self._clientset_getter()
  61. """
  62. The clientset to access the API server.
  63. """
  64. _provisioning_processes: Dict[int, multiprocessing.Process]
  65. """
  66. The mapping of benchmark ID to provisioning (sub)process.
  67. When the (sub)process is alive, the benchmark is provisioning.
  68. If the (sub)process exited, the benchmark is either running or failed.
  69. """
  70. _benchmark_by_id: Dict[int, Benchmark]
  71. _benchmark_queue: deque
  72. _queue_lock: asyncio.Lock
  73. _worker_task: Optional[asyncio.Task]
  74. _active_benchmark_id: Optional[int]
  75. _active_benchmark_started_at: Optional[float]
  76. _clientset_getter: Callable[[], ClientSet]
  77. _worker_id_getter: Callable[[], int]
  78. def __init__(
  79. self,
  80. worker_id_getter: Callable[[], int],
  81. clientset_getter: Callable[[], ClientSet],
  82. cfg: Config,
  83. ):
  84. self._worker_id_getter = worker_id_getter
  85. self._config = cfg
  86. self._benchmark_log_dir = f"{cfg.log_dir}/benchmarks"
  87. self._benchmark_dir = f"{cfg.benchmark_dir}"
  88. self._clientset_getter = clientset_getter
  89. self._provisioning_processes = {}
  90. self._benchmark_by_id = {}
  91. self._benchmark_queue = deque()
  92. self._queue_lock = asyncio.Lock()
  93. self._worker_task = None
  94. self._active_benchmark_id = None
  95. self._active_benchmark_started_at = None
  96. os.makedirs(self._benchmark_log_dir, exist_ok=True)
  97. os.makedirs(self._benchmark_dir, exist_ok=True)
  98. async def watch_benchmarks_event(self):
  99. """
  100. Loop to watch benchmarks' event and handle.
  101. """
  102. logger.info("Watching benchmarks event.")
  103. if not self._worker_task or self._worker_task.done():
  104. self._worker_task = asyncio.create_task(self._benchmark_queue_worker())
  105. while True:
  106. try:
  107. await self._clientset.benchmarks.awatch(
  108. callback=self._handle_benchmark_event
  109. )
  110. except asyncio.CancelledError:
  111. break
  112. except Exception as e:
  113. logger.error(f"Error watching benchmarks: {e}")
  114. await asyncio.sleep(5)
  115. def _handle_benchmark_event(self, event: Event):
  116. """
  117. Handle benchmark events.
  118. Args:
  119. event: The benchmark event to handle.
  120. """
  121. benchmark = Benchmark.model_validate(event.data)
  122. logger.trace(
  123. f"Received event: {str(event.type)}, id: {benchmark.id}, name: {benchmark.name}, state: {str(benchmark.state)}"
  124. )
  125. is_pending = benchmark.state == BenchmarkStateEnum.PENDING
  126. is_stopped = benchmark.state == BenchmarkStateEnum.STOPPED
  127. is_current_worker = benchmark.worker_id == self._worker_id
  128. if not is_current_worker:
  129. return
  130. if event.type == EventType.DELETED:
  131. self._stop_benchmark(benchmark)
  132. logger.trace(
  133. f"DELETED event: stopped deleted benchmark {benchmark.name}(id={benchmark.id})."
  134. )
  135. return
  136. if is_pending:
  137. asyncio.create_task(self._enqueue_benchmark(benchmark))
  138. return
  139. if is_stopped:
  140. asyncio.create_task(self._handle_stop_benchmark_event(benchmark))
  141. async def _handle_stop_benchmark_event(self, benchmark: Benchmark):
  142. try:
  143. self._dump_benchmark_logs_to_file(benchmark)
  144. self._stop_benchmark(benchmark)
  145. self._clear_active_benchmark(benchmark.id)
  146. except Exception as e:
  147. logger.error(f"Failed to stop benchmark {benchmark.name}: {e}")
  148. async def _enqueue_benchmark(self, benchmark: Benchmark):
  149. async with self._queue_lock:
  150. if benchmark.id not in [b.id for b in self._benchmark_queue]:
  151. self._benchmark_queue.append(benchmark)
  152. patch_dict = {"state": BenchmarkStateEnum.QUEUED}
  153. await self._update_benchmark_state(benchmark.id, **patch_dict)
  154. logger.info(
  155. f"Enqueued benchmark {benchmark.name}(id={benchmark.id}) and set to QUEUED."
  156. )
  157. async def _benchmark_queue_worker(self):
  158. """
  159. Process benchmarks in the queue.
  160. """
  161. while True:
  162. benchmark = None
  163. async with self._queue_lock:
  164. if self._active_benchmark_id is not None:
  165. benchmark = None
  166. elif self._benchmark_queue:
  167. benchmark = self._benchmark_queue.popleft()
  168. if benchmark:
  169. try:
  170. await self._start_benchmark(benchmark)
  171. except Exception as e:
  172. logger.error(
  173. f"Failed to start benchmark {benchmark.name}(id={benchmark.id}): {e}"
  174. )
  175. else:
  176. await asyncio.sleep(1)
  177. async def _start_benchmark(self, benchmark: Benchmark):
  178. """
  179. Start benchmark through a subprocess.
  180. Args:
  181. benchmark: The benchmark to start.
  182. """
  183. if benchmark.id in self._provisioning_processes:
  184. logger.warning(
  185. f"Benchmark {benchmark.name}(id={benchmark.id}) is provisioning. Skipping start."
  186. )
  187. return
  188. log_file_path = f"{self._benchmark_log_dir}/{benchmark.id}.log"
  189. try:
  190. if os.path.exists(log_file_path):
  191. os.remove(log_file_path)
  192. except Exception as e:
  193. logger.warning(f"Failed to remove old log file {log_file_path}: {e}")
  194. try:
  195. fallback_registry = registration.determine_default_registry(
  196. self._config.system_default_container_registry
  197. )
  198. process = multiprocessing.Process(
  199. target=BenchmarkManager._launch_benchmark,
  200. args=(
  201. benchmark,
  202. self._clientset.headers,
  203. log_file_path,
  204. self._config,
  205. fallback_registry,
  206. ),
  207. )
  208. process.daemon = False
  209. process.start()
  210. self._provisioning_processes[benchmark.id] = process
  211. self._set_active_benchmark(benchmark.id)
  212. patch_dict = {
  213. "state": BenchmarkStateEnum.RUNNING,
  214. "pid": process.pid,
  215. }
  216. await self._update_benchmark_state(benchmark.id, **patch_dict)
  217. logger.info(f"Started benchmark {benchmark.name}(id={benchmark.id})")
  218. except Exception as e:
  219. # Clean up provisioning process if started.
  220. if benchmark.id in self._provisioning_processes:
  221. self._stop_benchmark(benchmark)
  222. patch_dict = {
  223. "state": BenchmarkStateEnum.ERROR,
  224. "state_message": f"Failed to start benchmark: {e}",
  225. }
  226. await self._update_benchmark_state(benchmark.id, **patch_dict)
  227. logger.error(
  228. f"Failed to start benchmark {benchmark.name}(id={benchmark.id}): {e}"
  229. )
  230. @staticmethod
  231. def _launch_benchmark(
  232. benchmark: Benchmark,
  233. client_headers: dict,
  234. log_file_path: str,
  235. cfg: Config,
  236. fallback_registry: Optional[str] = None,
  237. ):
  238. """
  239. Serve benchmark in a subprocess.
  240. Exits the subprocess when serving ends.
  241. Args:
  242. benchmark: The benchmark to serve.
  243. client_headers: The headers for the clientset.
  244. log_file_path: The path to the log file.
  245. cfg: The configuration.
  246. fallback_registry: The fallback container registry to use if needed.
  247. """
  248. setproctitle.setproctitle(f"gpustack_benchmark_{benchmark.id}")
  249. add_signal_handlers()
  250. clientset = ClientSet(
  251. base_url=cfg.get_server_url(),
  252. headers=client_headers,
  253. )
  254. with open(log_file_path, "w", buffering=1, encoding="utf-8") as log_file:
  255. with RedirectStdoutStderr(log_file):
  256. try:
  257. server_ins = BenchmarkRunner(
  258. clientset,
  259. benchmark,
  260. cfg,
  261. fallback_registry,
  262. )
  263. logger.info(
  264. f"Provisioning benchmark {benchmark.name}(id={benchmark.id})"
  265. )
  266. server_ins.start()
  267. logger.info(
  268. f"Finished provisioning benchmark {benchmark.name}(id={benchmark.id})"
  269. )
  270. except Exception as e:
  271. logger.exception(
  272. f"Error provisioning benchmark {benchmark.name}(id={benchmark.id}): {e}"
  273. )
  274. raise e
  275. async def _update_benchmark_state(self, id: int, **kwargs):
  276. client = self._clientset.http_client.get_async_httpx_client()
  277. resp = await client.patch(f"/benchmarks/{id}/state", json=kwargs)
  278. resp.raise_for_status()
  279. def _update_benchmark_state_sync(self, id: int, **kwargs):
  280. client = self._clientset.http_client.get_httpx_client()
  281. resp = client.patch(f"/benchmarks/{id}/state", json=kwargs)
  282. resp.raise_for_status()
  283. def _stop_benchmark(self, benchmark: Benchmark):
  284. """
  285. Stop benchmark and clean up.
  286. Args:
  287. benchmark: The benchmark to stop.
  288. """
  289. # Teardown provisioning process if still alive.
  290. if self._is_provisioning(benchmark):
  291. terminate_process_tree(self._provisioning_processes[benchmark.id].pid)
  292. # Delete workload.
  293. delete_workload(benchmark.name)
  294. # Cleanup internal states.
  295. self._provisioning_processes.pop(benchmark.id, None)
  296. self._benchmark_by_id.pop(benchmark.id, None)
  297. self._clear_active_benchmark(benchmark.id)
  298. logger.info(f"Stopped benchmark {benchmark.name}(id={benchmark.id})")
  299. def _is_provisioning(self, benchmark: Benchmark) -> bool:
  300. """
  301. Check if the benchmark is still provisioning.
  302. Args:
  303. benchmark: The benchmark to check.
  304. """
  305. if process := self._provisioning_processes.get(benchmark.id):
  306. if process.is_alive():
  307. process.join(timeout=0)
  308. return process.is_alive()
  309. return False
  310. def sync_benchmark_state(self):
  311. """
  312. Synchronize benchmarks' state.
  313. - If the provision process is still alive, skip.
  314. - If the workload is still launching, skip.
  315. - If the workload is not existed, unhealthy, failed, update the benchmark state to ERROR.
  316. - If the workload is inactive, update the benchmark state to COMPLETED.
  317. """
  318. benchmarks_page = self._clientset.benchmarks.list(
  319. params={"worker_id": self._worker_id, "state": BenchmarkStateEnum.RUNNING}
  320. )
  321. if not benchmarks_page.items:
  322. return
  323. for benchmark in benchmarks_page.items:
  324. self._sync_single_benchmark_state(benchmark)
  325. def _sync_single_benchmark_state(self, benchmark: Benchmark):
  326. """Synchronize a single benchmark's state."""
  327. # Check for timeout
  328. if self._is_benchmark_timed_out(benchmark):
  329. self._handle_benchmark_timeout(benchmark)
  330. return
  331. # Skip if still provisioning
  332. if self._is_provisioning(benchmark):
  333. logger.trace(
  334. f"Benchmark {benchmark.name}(id={benchmark.id}) is provisioning. Skipping sync."
  335. )
  336. return
  337. # Get workload and handle based on state
  338. workload = get_workload(benchmark.name)
  339. if self._should_skip_workload(benchmark, workload):
  340. return
  341. if self._is_workload_completed(workload):
  342. self._handle_benchmark_completion(benchmark)
  343. return
  344. if self._is_workload_failed(workload):
  345. self._handle_benchmark_failure(benchmark)
  346. return
  347. def _should_skip_workload(self, benchmark: Benchmark, workload) -> bool:
  348. """Check if workload should be skipped (still launching or running)."""
  349. if not workload:
  350. return False
  351. if workload.state in [
  352. WorkloadStatusStateEnum.PENDING,
  353. WorkloadStatusStateEnum.INITIALIZING,
  354. ]:
  355. logger.trace(
  356. f"Benchmark {benchmark.name}(id={benchmark.id}) workload is still launching. Skipping sync."
  357. )
  358. return True
  359. if workload.state == WorkloadStatusStateEnum.RUNNING:
  360. logger.trace(
  361. f"Benchmark {benchmark.name}(id={benchmark.id}) workload is running. Skipping sync."
  362. )
  363. return True
  364. return False
  365. def _is_workload_completed(self, workload) -> bool:
  366. """Check if workload has completed successfully."""
  367. return workload and workload.state == WorkloadStatusStateEnum.INACTIVE
  368. def _is_workload_failed(self, workload) -> bool:
  369. """Check if workload has failed or is unhealthy."""
  370. if not workload:
  371. return True
  372. return workload.state in [
  373. WorkloadStatusStateEnum.UNKNOWN,
  374. WorkloadStatusStateEnum.UNHEALTHY,
  375. WorkloadStatusStateEnum.FAILED,
  376. ]
  377. def _handle_benchmark_timeout(self, benchmark: Benchmark):
  378. """Handle benchmark timeout."""
  379. patch_dict = {
  380. "state": BenchmarkStateEnum.ERROR,
  381. "state_message": "Benchmark timed out.",
  382. }
  383. self._update_benchmark_state_sync(benchmark.id, **patch_dict)
  384. self._dump_benchmark_logs_to_file(benchmark)
  385. self._stop_benchmark(benchmark)
  386. def _handle_benchmark_completion(self, benchmark: Benchmark):
  387. """Handle successful benchmark completion."""
  388. patch_dict = {
  389. "state": BenchmarkStateEnum.COMPLETED,
  390. }
  391. self._update_benchmark_state_sync(benchmark.id, **patch_dict)
  392. logger.info(f"Benchmark {benchmark.name} finished.")
  393. self._dump_benchmark_logs_to_file(benchmark)
  394. self._sync_benchmark_metrics(benchmark)
  395. self._stop_benchmark(benchmark)
  396. def _handle_benchmark_failure(self, benchmark: Benchmark):
  397. """Handle benchmark failure."""
  398. patch_dict = {
  399. "state": BenchmarkStateEnum.ERROR,
  400. "state_message": "Benchmark exited or unhealthy.",
  401. }
  402. self._update_benchmark_state_sync(benchmark.id, **patch_dict)
  403. self._dump_benchmark_logs_to_file(benchmark)
  404. self._stop_benchmark(benchmark)
  405. def _sync_benchmark_metrics(self, benchmark):
  406. """
  407. Synchronize benchmarks' metrics.
  408. """
  409. metrics = None
  410. try:
  411. metrics_file_path = f"{self._benchmark_dir}/{benchmark.id}.json"
  412. report = GenerativeBenchmarksReport.load_file(metrics_file_path)
  413. metrics = report.to_metrics()
  414. except Exception as e:
  415. logger.error(
  416. f"Failed to load metrics for benchmark {benchmark.name}(id={benchmark.id}): {e}"
  417. )
  418. return
  419. if not metrics:
  420. logger.error(
  421. f"No metrics found for benchmark {benchmark.name}(id={benchmark.id})."
  422. )
  423. return
  424. total = metrics.request_total or 0
  425. successful = metrics.request_successful or 0
  426. errored = metrics.request_errored or 0
  427. incomplete = metrics.request_incomplete or 0
  428. try:
  429. errored_samples, incomplete_samples = self._load_request_samples(
  430. report, limit=None
  431. )
  432. except Exception as e:
  433. logger.error(
  434. "Failed to read request error samples for benchmark "
  435. f"{benchmark.name}(id={benchmark.id}): {e}"
  436. )
  437. errored_samples, incomplete_samples = [], []
  438. self._log_request_failures_if_any(
  439. benchmark=benchmark,
  440. total=total,
  441. successful=successful,
  442. errored=errored,
  443. incomplete=incomplete,
  444. errored_samples=errored_samples,
  445. incomplete_samples=incomplete_samples,
  446. )
  447. partial_failure_message = self._build_partial_failure_state_message(
  448. errored=errored,
  449. incomplete=incomplete,
  450. errored_samples=errored_samples,
  451. incomplete_samples=incomplete_samples,
  452. )
  453. resp = self._clientset.http_client.get_httpx_client().post(
  454. f"/benchmarks/{benchmark.id}/metrics", json=metrics.model_dump()
  455. )
  456. raise_if_response_error(resp)
  457. if partial_failure_message:
  458. self._update_benchmark_state_sync(
  459. benchmark.id,
  460. state_message=partial_failure_message,
  461. )
  462. def _log_request_failures_if_any(
  463. self,
  464. benchmark: Benchmark,
  465. total: int,
  466. successful: int,
  467. errored: int,
  468. incomplete: int,
  469. errored_samples: List[GenerativeRequestStats],
  470. incomplete_samples: List[GenerativeRequestStats],
  471. limit: int = 5,
  472. ) -> None:
  473. if errored <= 0 and incomplete <= 0:
  474. return
  475. errored_samples_to_show = errored_samples[:limit]
  476. incomplete_samples_to_show = incomplete_samples[:limit]
  477. if not errored_samples_to_show and not incomplete_samples_to_show:
  478. return
  479. lines: List[str] = [
  480. "",
  481. "=== BENCHMARK REQUEST FAILURES ===",
  482. "SUMMARY: "
  483. f"benchmark={benchmark.name}(id={benchmark.id}) "
  484. f"total={total} successful={successful} "
  485. f"errored={errored} incomplete={incomplete} "
  486. f"showing_up_to={limit}",
  487. ]
  488. if errored_samples_to_show:
  489. lines.append("")
  490. lines.append(f"---- ERRORED REQUESTS (SHOWING UP TO {limit}) ----")
  491. lines.extend(self._format_request_samples(errored_samples_to_show))
  492. if incomplete_samples_to_show:
  493. lines.append("")
  494. lines.append(f"---- INCOMPLETE REQUESTS (SHOWING UP TO {limit}) ----")
  495. lines.extend(self._format_request_samples(incomplete_samples_to_show))
  496. message = "\n".join(lines)
  497. self._append_benchmark_log(benchmark, message)
  498. def _load_request_samples(
  499. self, report: GenerativeBenchmarksReport, limit: Optional[int] = 5
  500. ) -> Tuple[List[GenerativeRequestStats], List[GenerativeRequestStats]]:
  501. if (
  502. not report.benchmarks
  503. or len(report.benchmarks) == 0
  504. or report.benchmarks[0] is None
  505. or report.benchmarks[0].requests_truncated is None
  506. ):
  507. return [], []
  508. requests = report.benchmarks[0].requests_truncated
  509. errored = requests.errored or []
  510. incomplete = requests.incomplete or []
  511. if limit is None:
  512. return errored, incomplete
  513. return errored[:limit], incomplete[:limit]
  514. def _format_request_samples(
  515. self, samples: List[GenerativeRequestStats]
  516. ) -> List[str]:
  517. lines: List[str] = []
  518. for idx, sample in enumerate(samples, start=1):
  519. request_id = sample.request_id or "unknown"
  520. request_type = sample.request_type or "unknown"
  521. status = sample.info.status or "unknown"
  522. error = sample.info.error
  523. traceback = sample.info.traceback
  524. base = (
  525. f"- [{idx}] request_id={request_id} type={request_type} "
  526. f"status={status}"
  527. )
  528. lines.append(base)
  529. if error:
  530. lines.append(f" ERROR: {error}")
  531. if traceback:
  532. lines.append(" TRACEBACK:")
  533. indented = "\n".join(f" {line}" for line in traceback.splitlines())
  534. lines.append(indented)
  535. lines.append("")
  536. return lines
  537. def _build_partial_failure_state_message(
  538. self,
  539. errored: int,
  540. incomplete: int,
  541. errored_samples: List[GenerativeRequestStats],
  542. incomplete_samples: List[GenerativeRequestStats],
  543. top_n: int = 3,
  544. ) -> Optional[str]:
  545. if errored <= 0 and incomplete <= 0:
  546. return None
  547. summary = (
  548. "Completed with partial success: "
  549. f"errored={errored}, incomplete={incomplete}."
  550. )
  551. errored_reasons = self._collect_failure_reasons(
  552. errored_samples, fallback="Errored"
  553. )
  554. incomplete_reasons = self._collect_failure_reasons(
  555. incomplete_samples, fallback="Incomplete"
  556. )
  557. reason_parts: List[str] = []
  558. if errored_reasons:
  559. top_errored = ", ".join(
  560. f"{reason} (x{count})"
  561. for reason, count in errored_reasons.most_common(top_n)
  562. )
  563. reason_parts.append(f"Top errored reasons: {top_errored}")
  564. if incomplete_reasons:
  565. top_incomplete = ", ".join(
  566. f"{reason} (x{count})"
  567. for reason, count in incomplete_reasons.most_common(top_n)
  568. )
  569. reason_parts.append(f"Top incomplete reasons: {top_incomplete}")
  570. if reason_parts:
  571. summary = f"{summary} {'; '.join(reason_parts)}"
  572. else:
  573. summary = f"{summary} See benchmark logs for details."
  574. return self._truncate_state_message(summary)
  575. def _collect_failure_reasons(
  576. self, samples: List[GenerativeRequestStats], fallback: str
  577. ) -> Counter[str]:
  578. reasons: Counter[str] = Counter()
  579. for sample in samples:
  580. error = sample.info.error
  581. if error:
  582. reason = self._normalize_error_message(error)
  583. else:
  584. status = sample.info.status or "unknown"
  585. reason = f"{fallback} request (status={status})"
  586. reasons[reason] += 1
  587. return reasons
  588. def _normalize_error_message(self, error: str) -> str:
  589. stripped = error.strip()
  590. if not stripped:
  591. return "Unknown error"
  592. first_line = stripped.splitlines()[0]
  593. match = HTTP_ERROR_PATTERN.match(first_line)
  594. if not match:
  595. return first_line
  596. status = match.group("status")
  597. msg = " ".join(match.group("msg").split())
  598. error_type = match.group("type").strip()
  599. code = match.group("code").strip()
  600. if code and code.lower() != "none":
  601. normalized = f"HTTP {status} {error_type}/{code}: {msg}"
  602. else:
  603. normalized = f"HTTP {status} {error_type}: {msg}"
  604. return self._truncate_with_ellipsis(
  605. normalized, BENCHMARK_FAILURE_REASON_MAX_LEN
  606. )
  607. def _truncate_state_message(self, message: str) -> str:
  608. return self._truncate_with_ellipsis(message, BENCHMARK_STATE_MESSAGE_MAX_LEN)
  609. def _truncate_with_ellipsis(self, text: str, max_len: int) -> str:
  610. if len(text) <= max_len:
  611. return text
  612. if max_len <= len(TRUNCATION_SUFFIX):
  613. return TRUNCATION_SUFFIX[:max_len]
  614. return text[: max_len - len(TRUNCATION_SUFFIX)] + TRUNCATION_SUFFIX
  615. def _append_benchmark_log(self, benchmark: Benchmark, message: str) -> None:
  616. log_file_path = f"{self._benchmark_log_dir}/{benchmark.id}.log"
  617. try:
  618. with open(log_file_path, "a", encoding="utf-8") as f:
  619. f.write(message)
  620. if not message.endswith("\n"):
  621. f.write("\n")
  622. except Exception as e:
  623. logger.error(
  624. f"Failed to append benchmark log for {benchmark.name}(id={benchmark.id}): {e}"
  625. )
  626. def _set_active_benchmark(self, benchmark_id: int):
  627. self._active_benchmark_id = benchmark_id
  628. self._active_benchmark_started_at = time.time()
  629. def _clear_active_benchmark(self, benchmark_id: int):
  630. if self._active_benchmark_id == benchmark_id:
  631. self._active_benchmark_id = None
  632. self._active_benchmark_started_at = None
  633. def _is_benchmark_timed_out(self, benchmark: Benchmark) -> bool:
  634. limit = self._config.benchmark_max_duration_seconds
  635. if not limit:
  636. return False
  637. if self._active_benchmark_id != benchmark.id:
  638. return False
  639. if self._active_benchmark_started_at is None:
  640. return False
  641. return (time.time() - self._active_benchmark_started_at) > limit
  642. def _dump_benchmark_logs_to_file(
  643. self,
  644. benchmark: Benchmark,
  645. ):
  646. try:
  647. logs = logs_workload(
  648. name=benchmark.name,
  649. )
  650. except Exception as e:
  651. logger.error(
  652. f"Failed to fetch workload logs for benchmark {benchmark.name}(id={benchmark.id}): {e}"
  653. )
  654. return
  655. log_file_path = f"{self._benchmark_log_dir}/{benchmark.id}.log"
  656. with open(log_file_path, "a", encoding="utf-8") as f:
  657. log_str = logs
  658. if isinstance(log_str, bytes):
  659. log_str = log_str.decode("utf-8", errors="replace")
  660. log_str = str(log_str)
  661. f.write(log_str)
  662. if not log_str.endswith("\n"):
  663. f.write("\n")