logs.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. import asyncio
  2. import logging
  3. import re
  4. from collections import defaultdict
  5. from datetime import datetime, timezone
  6. from pathlib import Path
  7. from typing import Dict, List, Optional
  8. from fastapi import APIRouter, Request, Query
  9. from fastapi.responses import StreamingResponse
  10. from gpustack.api.exceptions import NotFoundException
  11. from gpustack.schemas.models import (
  12. ModelInstanceLogRestartEntry,
  13. ServeLogOptionsResponse,
  14. )
  15. from gpustack.worker.logs import LogOptions, LogOptionsDep, log_generator
  16. from gpustack.worker.log_sources import (
  17. ContainerLogSource,
  18. DownloadLogSource,
  19. LogSourceChain,
  20. MainLogSource,
  21. )
  22. from gpustack.server.deps import SessionDep
  23. router = APIRouter()
  24. logger = logging.getLogger(__name__)
  25. def extract_restart_count(filename: str) -> int:
  26. """Extract restart count from filename like '123.5.log'.
  27. Args:
  28. filename: Log filename in format {id}.{restart_count}.log
  29. Returns:
  30. Restart count as integer, or 0 if pattern doesn't match
  31. """
  32. match = re.match(r'\d+\.(\d+)\.log', filename)
  33. return int(match.group(1)) if match else 0
  34. def extract_container_restart_count(filename: str) -> int:
  35. """Extract restart count from container log filename.
  36. Args:
  37. filename: Log filename in format {id}.container.{restart_count}.log
  38. Returns:
  39. Restart count as integer, or 0 if pattern doesn't match
  40. """
  41. match = re.match(r'\d+\.container\.(\d+)\.log', filename)
  42. return int(match.group(1)) if match else 0
  43. def extract_sidecar_container_restart_count(filename: str) -> int:
  44. """Extract restart count from sidecar container log filename.
  45. Args:
  46. filename: Log filename in format {id}.container.{name}.{restart_count}.log
  47. Returns:
  48. Restart count as integer, or 0 if pattern doesn't match
  49. """
  50. match = re.match(r'\d+\.container\.[^.]+\.(\d+)\.log', filename)
  51. return int(match.group(1)) if match else 0
  52. def extract_sidecar_container_name(filename: str) -> str:
  53. """Extract container name from sidecar container log filename.
  54. Args:
  55. filename: Log filename in format {id}.container.{name}.{restart_count}.log
  56. Returns:
  57. Container name as string, or empty string if pattern doesn't match
  58. """
  59. match = re.match(r'\d+\.container\.([^.]+)\.\d+\.log', filename)
  60. if not match:
  61. return ""
  62. name = match.group(1)
  63. # Exclude pure numeric names (those are default container restart counts)
  64. if name.isdigit():
  65. return ""
  66. return name
  67. async def get_all_log_files(
  68. log_dir: Path,
  69. model_instance_id: int,
  70. container: bool = False,
  71. restart_count: Optional[int] = None,
  72. container_name: Optional[str] = None,
  73. ) -> List[Path]:
  74. """Get all log files sorted by restart count.
  75. Args:
  76. log_dir: Directory containing log files
  77. model_instance_id: Model instance ID
  78. container: If True, get container logs; if False, get main logs
  79. restart_count: If specified, only return logs for this restart count
  80. container_name: If specified with container=True, get sidecar container
  81. logs for this container name (e.g., "ray-head").
  82. Pattern: {id}.container.{name}.{restart_count}.log
  83. Returns:
  84. List of log file paths sorted by restart count (oldest first)
  85. """
  86. if container and container_name:
  87. # Sidecar container logs: {id}.container.{name}.{restart_count}.log
  88. pattern = f"{model_instance_id}.container.{container_name}.*.log"
  89. extract_fn = extract_sidecar_container_restart_count
  90. elif container:
  91. # Default container logs: {id}.container.{restart_count}.log
  92. pattern = f"{model_instance_id}.container.*.log"
  93. extract_fn = extract_container_restart_count
  94. else:
  95. pattern = f"{model_instance_id}.*.log"
  96. extract_fn = extract_restart_count
  97. files = await asyncio.to_thread(lambda: list(log_dir.glob(pattern)))
  98. # Exclude container log files when getting main logs
  99. if not container:
  100. files = [f for f in files if '.container.' not in f.name]
  101. # When getting default container logs (no container_name),
  102. # exclude sidecar container logs (those with non-numeric segment after "container.").
  103. if container and not container_name:
  104. files = [f for f in files if not extract_sidecar_container_name(f.name)]
  105. # Filter by restart_count if specified
  106. if restart_count is not None:
  107. files = [f for f in files if extract_fn(f.name) == restart_count]
  108. return sorted(files, key=lambda p: extract_fn(p.name))
  109. async def resolve_restart_count(
  110. log_dir: Path, model_instance_id: int, previous: bool
  111. ) -> Optional[int]:
  112. """Resolve ``previous`` flag to an actual restart_count from disk files.
  113. Returns:
  114. The restart_count integer for the target log set, or ``None`` when
  115. no log files exist on disk yet.
  116. """
  117. files = await get_all_log_files(log_dir, model_instance_id, container=False)
  118. if not files:
  119. return None
  120. counts = sorted(set(extract_restart_count(f.name) for f in files))
  121. if previous and len(counts) >= 2:
  122. return counts[-2]
  123. return counts[-1]
  124. def _path_started_at_utc(path: Path) -> datetime:
  125. st = path.stat()
  126. ts = getattr(st, "st_birthtime", None)
  127. if ts is None or ts <= 0:
  128. ts = st.st_mtime
  129. return datetime.fromtimestamp(ts, tz=timezone.utc)
  130. def restart_entries_from_main_log_files(
  131. files: List[Path],
  132. sidecar_names_by_restart: Optional[Dict[int, List[str]]] = None,
  133. ) -> List[ModelInstanceLogRestartEntry]:
  134. """Build restart entries from main log paths; one entry per restart_count.
  135. Entries are ordered by restart_count descending (newest first).
  136. The highest restart_count maps to ``previous=False`` (current);
  137. the second highest maps to ``previous=True``.
  138. If multiple files share a restart_count, use the lexicographically smallest
  139. name as the representative path for stat.
  140. Args:
  141. files: Main log file paths.
  142. sidecar_names_by_restart: Mapping from restart_count to sidecar
  143. container names found on disk for that restart.
  144. """
  145. by_count: Dict[int, List[Path]] = defaultdict(list)
  146. for f in files:
  147. by_count[extract_restart_count(f.name)].append(f)
  148. sorted_counts = sorted(by_count.keys(), reverse=True)
  149. entries: List[ModelInstanceLogRestartEntry] = []
  150. for i, rc in enumerate(sorted_counts):
  151. paths = sorted(by_count[rc], key=lambda p: p.name)
  152. path = paths[0]
  153. try:
  154. started_at = _path_started_at_utc(path)
  155. except OSError:
  156. started_at = None
  157. containers = (
  158. sidecar_names_by_restart.get(rc, []) if sidecar_names_by_restart else []
  159. )
  160. entries.append(
  161. ModelInstanceLogRestartEntry(
  162. previous=i > 0, started_at=started_at, containers=containers
  163. )
  164. )
  165. return entries
  166. def restart_entries_from_sidecar_log_files(
  167. files: List[Path],
  168. ) -> List[ModelInstanceLogRestartEntry]:
  169. """Build restart entries from sidecar container log paths.
  170. Same logic as restart_entries_from_main_log_files but uses
  171. extract_sidecar_container_restart_count for the file name pattern.
  172. """
  173. by_count: Dict[int, List[Path]] = defaultdict(list)
  174. for f in files:
  175. by_count[extract_sidecar_container_restart_count(f.name)].append(f)
  176. sorted_counts = sorted(by_count.keys(), reverse=True)
  177. entries: List[ModelInstanceLogRestartEntry] = []
  178. for i, rc in enumerate(sorted_counts):
  179. paths = sorted(by_count[rc], key=lambda p: p.name)
  180. path = paths[0]
  181. try:
  182. started_at = _path_started_at_utc(path)
  183. except OSError:
  184. started_at = None
  185. entries.append(
  186. ModelInstanceLogRestartEntry(previous=i > 0, started_at=started_at)
  187. )
  188. return entries
  189. async def historical_log_generator(
  190. log_dir: Path,
  191. model_instance_id: int,
  192. options: LogOptions,
  193. stop_event: Optional[asyncio.Event] = None,
  194. container: bool = False,
  195. restart_count: Optional[int] = None,
  196. container_name: Optional[str] = None,
  197. ):
  198. """Generate logs from historical log files.
  199. Args:
  200. log_dir: Directory containing log files
  201. model_instance_id: Model instance ID
  202. options: Log options (tail, follow)
  203. stop_event: Event to signal stopping
  204. container: If True, read container logs; if False, read main logs
  205. restart_count: Resolved restart count to filter log files
  206. container_name: If specified with container=True, read sidecar container logs.
  207. Yields:
  208. Log lines from log files
  209. """
  210. log_files = await get_all_log_files(
  211. log_dir,
  212. model_instance_id,
  213. container=container,
  214. restart_count=restart_count,
  215. container_name=container_name,
  216. )
  217. if not log_files:
  218. if container:
  219. logger.debug(
  220. f"No container log files found for model instance "
  221. f"{model_instance_id}"
  222. )
  223. return
  224. if options.tail > 0:
  225. # Only read the last N lines from the most recent log file
  226. if log_files:
  227. file_options = LogOptions(
  228. tail=options.tail, follow=options.follow, stop_event=stop_event
  229. )
  230. async for line in log_generator(str(log_files[-1]), file_options):
  231. if stop_event and stop_event.is_set():
  232. logger.debug(
  233. "Historical log generator stopping due to stop event 1"
  234. )
  235. return
  236. yield line
  237. else:
  238. # Read all logs in order
  239. for i, log_file in enumerate(log_files):
  240. # For all files except the last one, don't follow
  241. is_last_file = i == len(log_files) - 1
  242. file_options = LogOptions(
  243. tail=-1,
  244. follow=options.follow if is_last_file else False,
  245. stop_event=stop_event,
  246. )
  247. async for line in log_generator(str(log_file), file_options):
  248. if stop_event and stop_event.is_set():
  249. logger.debug(
  250. "Historical log generator stopping due to stop event 2"
  251. )
  252. return
  253. yield line
  254. async def merged_log_generator( # noqa: C901
  255. log_paths: List[str],
  256. options: LogOptions,
  257. stop_event: Optional[asyncio.Event] = None,
  258. ):
  259. """Merge multiple log sources and yield lines as they become available.
  260. Args:
  261. log_paths: List of log file paths to read
  262. options: Log options (tail, follow)
  263. stop_event: Event to signal stopping
  264. Yields:
  265. Log lines from all sources in the order they become available
  266. """
  267. if not log_paths:
  268. return
  269. queues: List[asyncio.Queue] = []
  270. async def read_to_queue(queue: asyncio.Queue, log_path: str, opts: LogOptions):
  271. try:
  272. async for line in log_generator(log_path, opts):
  273. if stop_event and stop_event.is_set():
  274. return
  275. await queue.put(("data", line))
  276. except Exception as e:
  277. logger.error(f"Error reading log {log_path}: {e}")
  278. await queue.put(("error", str(e)))
  279. finally:
  280. await queue.put(None) # Signal end of this source
  281. # Create tasks for all log generators
  282. tasks = []
  283. for path in log_paths:
  284. queue = asyncio.Queue()
  285. queues.append(queue)
  286. task = asyncio.create_task(read_to_queue(queue, path, options))
  287. tasks.append(task)
  288. get_tasks = {}
  289. for q in queues:
  290. task = asyncio.create_task(q.get())
  291. get_tasks[task] = q
  292. # Yield lines as they become available from any source
  293. active_count = len(queues)
  294. try:
  295. while active_count > 0:
  296. if stop_event and stop_event.is_set():
  297. break
  298. # Wait for any queue to have data
  299. # (with timeout to check stop_event periodically)
  300. done, _ = await asyncio.wait(
  301. get_tasks.keys(),
  302. return_when=asyncio.FIRST_COMPLETED,
  303. timeout=0.5,
  304. )
  305. # Check stop_event after timeout
  306. if stop_event and stop_event.is_set():
  307. break
  308. for future in done:
  309. queue = get_tasks.pop(future)
  310. try:
  311. result = future.result()
  312. except asyncio.CancelledError:
  313. continue
  314. if result is None:
  315. active_count -= 1
  316. else:
  317. msg_type, content = result
  318. if msg_type == "data":
  319. yield content
  320. # error type is logged in read_to_queue, continue streaming
  321. # Only recreate the task for the completed queue
  322. new_task = asyncio.create_task(queue.get())
  323. get_tasks[new_task] = queue
  324. finally:
  325. # Cancel all background tasks to prevent leaks when the generator
  326. # is closed early (e.g. client disconnect).
  327. all_tasks = list(tasks) + list(get_tasks.keys())
  328. for t in all_tasks:
  329. if not t.done():
  330. t.cancel()
  331. await asyncio.gather(*all_tasks, return_exceptions=True)
  332. async def combined_log_generator(
  333. log_dir: Path | str,
  334. model_instance_id: int,
  335. download_log_path: str,
  336. options: LogOptionsDep,
  337. model_instance_name: str,
  338. container_name: Optional[str] = None,
  339. ):
  340. """Unified log streaming from three file sources using LogSourceChain.
  341. Reads logs in order:
  342. 1) Download logs (if exists)
  343. 2) Historical main logs (all restart_count files)
  344. 3) Container logs (from persisted files)
  345. When container_name is specified, only sidecar container logs are streamed
  346. (download and main logs are skipped).
  347. Args:
  348. log_dir: Directory containing log files (Path or str)
  349. model_instance_id: Model instance ID
  350. download_log_path: Path to download log file
  351. options: Log options (tail, follow)
  352. model_instance_name: Model instance name (unused, kept for API compatibility)
  353. container_name: If specified, stream only this sidecar container's logs.
  354. """
  355. log_dir = Path(log_dir)
  356. restart_count = await resolve_restart_count(
  357. log_dir, model_instance_id, options.previous
  358. )
  359. # When a specific sidecar container is requested, stream only its logs.
  360. # "default" means the main container — fall through to the normal path.
  361. if container_name and container_name != "default":
  362. async for line in historical_log_generator(
  363. log_dir,
  364. model_instance_id,
  365. options,
  366. container=True,
  367. restart_count=restart_count,
  368. container_name=container_name,
  369. ):
  370. yield line
  371. return
  372. download_source = DownloadLogSource(download_log_path)
  373. main_source = MainLogSource(
  374. log_dir,
  375. model_instance_id,
  376. restart_count,
  377. get_all_log_files_fn=get_all_log_files,
  378. )
  379. container_source = ContainerLogSource(
  380. log_dir,
  381. model_instance_id,
  382. restart_count,
  383. get_all_log_files_fn=get_all_log_files,
  384. extract_restart_count_fn=extract_restart_count,
  385. )
  386. chain = LogSourceChain(
  387. [download_source, main_source],
  388. log_generator_fn=log_generator,
  389. )
  390. has_any_logs = False
  391. log_paths = []
  392. # Download log
  393. download_files = await download_source.wait_for_files_if_needed(
  394. follow=options.follow
  395. )
  396. if download_files:
  397. log_paths.append(str(download_files[0]))
  398. has_any_logs = True
  399. # Main logs
  400. main_log_files = await main_source.wait_for_files_if_needed(follow=options.follow)
  401. if main_log_files:
  402. log_paths.extend(str(f) for f in main_log_files)
  403. has_any_logs = True
  404. # Stream download + main logs (merged)
  405. if log_paths:
  406. stop_event = asyncio.Event()
  407. monitor_task = None
  408. container_has_content = await container_source.has_content()
  409. if not container_has_content and options.follow:
  410. monitor_task = asyncio.create_task(
  411. chain.monitor_container_content(container_source, stop_event)
  412. )
  413. merge_options = (
  414. LogOptions(tail=-1, follow=False)
  415. if container_has_content or not options.follow
  416. else options
  417. )
  418. try:
  419. async for line in merged_log_generator(
  420. log_paths, merge_options, stop_event
  421. ):
  422. yield line
  423. finally:
  424. if monitor_task and not monitor_task.done():
  425. monitor_task.cancel()
  426. try:
  427. await monitor_task
  428. except asyncio.CancelledError:
  429. pass
  430. # Container logs
  431. container_log_files = await container_source.wait_for_files_if_needed(
  432. follow=options.follow,
  433. main_log_files=main_log_files,
  434. )
  435. if container_log_files:
  436. has_any_logs = True
  437. async for line in historical_log_generator(
  438. log_dir,
  439. model_instance_id,
  440. options,
  441. container=True,
  442. restart_count=restart_count,
  443. ):
  444. yield line
  445. if not has_any_logs:
  446. raise NotFoundException(message="Log file not found")
  447. @router.get("/serveLogOptions/{id}", response_model=ServeLogOptionsResponse)
  448. async def get_serve_log_options(request: Request, id: int):
  449. """List restart_count values for which main serve log files exist locally."""
  450. log_dir = request.app.state.config.log_dir
  451. serve_log_dir = Path(log_dir) / "serve"
  452. files = await get_all_log_files(serve_log_dir, id, container=False)
  453. # Discover sidecar container names grouped by restart_count.
  454. container_pattern = f"{id}.container.*.*.log"
  455. all_sidecar_files = await asyncio.to_thread(
  456. lambda: list(serve_log_dir.glob(container_pattern))
  457. )
  458. sidecar_names_by_restart: Dict[int, List[str]] = defaultdict(list)
  459. seen: set = set()
  460. for f in all_sidecar_files:
  461. cname = extract_sidecar_container_name(f.name)
  462. if not cname:
  463. continue
  464. rc = extract_sidecar_container_restart_count(f.name)
  465. key = (rc, cname)
  466. if key not in seen:
  467. sidecar_names_by_restart[rc].append(cname)
  468. seen.add(key)
  469. # Build per-restart container lists: "default" (main) + sidecar names.
  470. # "default" is always included when container log files exist for that restart.
  471. container_log_pattern = f"{id}.container.*.log"
  472. all_container_files = await asyncio.to_thread(
  473. lambda: list(serve_log_dir.glob(container_log_pattern))
  474. )
  475. container_names_by_restart: Dict[int, List[str]] = defaultdict(list)
  476. for rc in sidecar_names_by_restart:
  477. container_names_by_restart[rc] = ["default"] + sorted(
  478. sidecar_names_by_restart[rc]
  479. )
  480. # Also add "default" for restarts that have container logs but no sidecars.
  481. default_container_rcs = {
  482. extract_container_restart_count(f.name)
  483. for f in all_container_files
  484. if not extract_sidecar_container_name(f.name)
  485. }
  486. for rc in default_container_rcs:
  487. if rc not in container_names_by_restart:
  488. container_names_by_restart[rc] = ["default"]
  489. restarts = await asyncio.to_thread(
  490. restart_entries_from_main_log_files, files, container_names_by_restart
  491. )
  492. return ServeLogOptionsResponse(restarts=restarts)
  493. @router.get("/serveLogs/{id}")
  494. async def get_serve_logs(
  495. request: Request,
  496. session: SessionDep,
  497. id: int,
  498. log_options: LogOptionsDep,
  499. model_instance_name: str = Query(default=""),
  500. model_file_id: Optional[int] = Query(default=None),
  501. container_name: Optional[str] = Query(default=None),
  502. ):
  503. log_dir = request.app.state.config.log_dir
  504. serve_log_dir = Path(log_dir) / "serve"
  505. download_log_path = ""
  506. # Use model file ID for shared download logs if provided
  507. if model_file_id is not None:
  508. download_log_path = str(
  509. serve_log_dir / f"model_file_{model_file_id}.download.log"
  510. )
  511. return StreamingResponse(
  512. combined_log_generator(
  513. serve_log_dir,
  514. id,
  515. download_log_path,
  516. log_options,
  517. model_instance_name,
  518. container_name=container_name,
  519. ),
  520. media_type="application/octet-stream",
  521. )
  522. @router.get("/benchmark_logs/{id}")
  523. async def get_benchmark_logs(
  524. request: Request,
  525. session: SessionDep,
  526. id: int,
  527. log_options: LogOptionsDep,
  528. benchmark_name: str = Query(default=""),
  529. ):
  530. log_dir = request.app.state.config.log_dir
  531. benchmark_log_path = Path(log_dir) / "benchmarks" / f"{id}.log"
  532. return StreamingResponse(
  533. log_generator(str(benchmark_log_path), log_options),
  534. media_type="application/octet-stream",
  535. )