log_sources.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. """
  2. Log source strategies for unified log streaming.
  3. This module provides a strategy pattern + chain of responsibility approach
  4. for handling different log sources (download logs, main logs, container logs).
  5. """
  6. import asyncio
  7. import logging
  8. from abc import ABC, abstractmethod
  9. from pathlib import Path
  10. from typing import AsyncGenerator, Callable, List, Optional
  11. from gpustack.utils import file
  12. logger = logging.getLogger(__name__)
  13. async def has_log_content(log_file: Path) -> bool:
  14. """Check if log file has any actual content.
  15. Args:
  16. log_file: Path to log file
  17. Returns:
  18. True if file exists and has size > 0
  19. """
  20. return await asyncio.to_thread(
  21. lambda: log_file.exists() and log_file.stat().st_size > 0
  22. )
  23. class LogSource(ABC):
  24. """Abstract base class for log sources.
  25. Each log source knows how to get its files and wait for them if needed.
  26. This follows the Strategy pattern used in the project
  27. (similar to LoadBalancingStrategy).
  28. """
  29. @abstractmethod
  30. async def get_files(self) -> List[Path]:
  31. """Get log files for this source.
  32. Returns:
  33. List of log file paths (may be empty)
  34. """
  35. pass
  36. @abstractmethod
  37. def get_file_pattern(self) -> str:
  38. """Get the file pattern for this source (for logging purposes).
  39. Returns:
  40. Pattern string for identification
  41. """
  42. pass
  43. async def is_valid_source(self) -> bool:
  44. """Check if this source is valid and should be waited for.
  45. Override this method to add preconditions for waiting.
  46. For example, DownloadLogSource returns False if log_path is None.
  47. Returns:
  48. True if the source is valid and should be waited for
  49. """
  50. return True
  51. async def get_files_with_log(self) -> List[Path]:
  52. """Get files with debug logging.
  53. Returns:
  54. List of log file paths
  55. """
  56. files = await self.get_files()
  57. if files:
  58. logger.debug(f"Found files for {self.get_file_pattern()}: {files}")
  59. return files
  60. async def wait_for_files(self, timeout: int = 300, **kwargs) -> List[Path]:
  61. """Wait for log files to appear.
  62. Uses the project's check_with_retries utility for consistent retry behavior.
  63. Subclasses should override is_valid_source() instead of this method
  64. unless they need completely different waiting logic.
  65. Args:
  66. timeout: Maximum time to wait in seconds
  67. **kwargs: Additional arguments for subclass implementations
  68. Returns:
  69. List of log file paths
  70. """
  71. if not await self.is_valid_source():
  72. return []
  73. async def check():
  74. files = await self.get_files()
  75. if not files:
  76. raise FileNotFoundError(
  77. f"Log files not found for source: {self.get_file_pattern()}"
  78. )
  79. return files
  80. files = await file.check_with_retries(check, timeout=timeout)
  81. logger.debug(f"Found files after waiting: {self.get_file_pattern()}")
  82. return files
  83. async def wait_for_files_if_needed(
  84. self,
  85. follow: bool,
  86. timeout: int = 300,
  87. **kwargs,
  88. ) -> List[Path]:
  89. """Get files, waiting if necessary in follow mode.
  90. Args:
  91. follow: Whether in follow mode (triggers waiting if files not found)
  92. timeout: Maximum time to wait in seconds
  93. **kwargs: Additional arguments for wait_for_files
  94. Returns:
  95. List of log file paths
  96. """
  97. files = await self.get_files_with_log()
  98. if files:
  99. return files
  100. if follow and await self.is_valid_source():
  101. try:
  102. files = await self.wait_for_files(timeout=timeout, **kwargs)
  103. except Exception:
  104. pass
  105. return files
  106. class DownloadLogSource(LogSource):
  107. """Log source for download logs."""
  108. def __init__(self, log_path: Optional[str]):
  109. self.log_path = Path(log_path) if log_path else None
  110. async def get_files(self) -> List[Path]:
  111. if not self.log_path:
  112. return []
  113. if await asyncio.to_thread(self.log_path.exists):
  114. return [self.log_path]
  115. return []
  116. def get_file_pattern(self) -> str:
  117. return str(self.log_path) if self.log_path else "download_log"
  118. async def is_valid_source(self) -> bool:
  119. return self.log_path is not None
  120. class MainLogSource(LogSource):
  121. """Log source for main (historical) logs."""
  122. def __init__(
  123. self,
  124. log_dir: Path,
  125. model_instance_id: int,
  126. restart_count: Optional[int] = None,
  127. get_all_log_files_fn: Optional[Callable] = None,
  128. ):
  129. self.log_dir = log_dir
  130. self.model_instance_id = model_instance_id
  131. self.restart_count = restart_count
  132. self._get_all_log_files = get_all_log_files_fn
  133. async def get_files(self) -> List[Path]:
  134. if self._get_all_log_files:
  135. return await self._get_all_log_files(
  136. self.log_dir,
  137. self.model_instance_id,
  138. restart_count=self.restart_count,
  139. )
  140. return []
  141. def get_file_pattern(self) -> str:
  142. if self.restart_count is not None:
  143. return f"{self.model_instance_id}.*.{self.restart_count}.log"
  144. return f"{self.model_instance_id}.*.log"
  145. class ContainerLogSource(LogSource):
  146. """Log source for container logs."""
  147. def __init__(
  148. self,
  149. log_dir: Path,
  150. model_instance_id: int,
  151. restart_count: Optional[int] = None,
  152. get_all_log_files_fn: Optional[Callable] = None,
  153. extract_restart_count_fn: Optional[Callable] = None,
  154. ):
  155. self.log_dir = log_dir
  156. self.model_instance_id = model_instance_id
  157. self.restart_count = restart_count
  158. self._get_all_log_files = get_all_log_files_fn
  159. self._extract_restart_count = extract_restart_count_fn
  160. async def get_files(self) -> List[Path]:
  161. if self._get_all_log_files:
  162. return await self._get_all_log_files(
  163. self.log_dir,
  164. self.model_instance_id,
  165. container=True,
  166. restart_count=self.restart_count,
  167. )
  168. return []
  169. def get_file_pattern(self) -> str:
  170. if self.restart_count is not None:
  171. return f"{self.model_instance_id}.container.{self.restart_count}.log"
  172. return f"{self.model_instance_id}.container.*.log"
  173. def _get_expected_file(self, main_log_files: List[Path]) -> Optional[Path]:
  174. """Infer expected container log file from main log files.
  175. Args:
  176. main_log_files: Main log files to infer container log name from
  177. Returns:
  178. Expected container log file path, or None if cannot infer
  179. """
  180. if not main_log_files or not self._extract_restart_count:
  181. return None
  182. expected_restart = self._extract_restart_count(main_log_files[-1].name)
  183. return self.log_dir / (
  184. f"{self.model_instance_id}.container.{expected_restart}.log"
  185. )
  186. async def wait_for_files(
  187. self,
  188. timeout: int = 300,
  189. main_log_files: Optional[List[Path]] = None,
  190. ) -> List[Path]:
  191. """Wait for container log files to appear.
  192. Container logs need special handling because the expected file name
  193. depends on the main log's restart count.
  194. Args:
  195. timeout: Maximum time to wait in seconds
  196. main_log_files: Main log files to infer expected container log name
  197. Returns:
  198. List of container log file paths
  199. """
  200. # First check if files already exist
  201. files = await self.get_files()
  202. if files:
  203. return files
  204. # Try to infer expected file from main logs
  205. expected_file = self._get_expected_file(main_log_files)
  206. if not expected_file:
  207. return []
  208. # Reuse base class retry logic with custom check function
  209. async def check():
  210. if not await asyncio.to_thread(expected_file.exists):
  211. raise FileNotFoundError(
  212. f"Container log file not found: {expected_file}"
  213. )
  214. return [expected_file]
  215. files = await file.check_with_retries(check, timeout=timeout)
  216. logger.debug(f"Found container log after waiting: {expected_file}")
  217. return files
  218. async def wait_for_files_if_needed(
  219. self,
  220. follow: bool,
  221. timeout: int = 300,
  222. main_log_files: Optional[List[Path]] = None,
  223. ) -> List[Path]:
  224. """Get files, waiting if necessary in follow mode.
  225. Args:
  226. follow: Whether in follow mode (triggers waiting if files not found)
  227. timeout: Maximum time to wait in seconds
  228. main_log_files: Main log files to infer expected container log name
  229. Returns:
  230. List of log file paths
  231. """
  232. files = await self.get_files_with_log()
  233. if files:
  234. return files
  235. if follow:
  236. files = await self.wait_for_files(
  237. timeout=timeout, main_log_files=main_log_files
  238. )
  239. return files
  240. async def has_content(self) -> bool:
  241. """Check if any container log file has actual content."""
  242. files = await self.get_files()
  243. for f in files:
  244. if await has_log_content(f):
  245. return True
  246. return False
  247. class LogSourceChain:
  248. """Chain of log sources for sequential log streaming.
  249. Similar to WorkerFilterChain in the project, this applies sources
  250. in order and yields log lines from each.
  251. """
  252. def __init__(
  253. self,
  254. sources: List[LogSource],
  255. log_generator_fn: Optional[Callable] = None,
  256. ):
  257. self.sources = sources
  258. self._log_generator = log_generator_fn
  259. async def stream_source(
  260. self,
  261. source: LogSource,
  262. options,
  263. stop_event: Optional[asyncio.Event] = None,
  264. follow: bool = True,
  265. ) -> AsyncGenerator[str, None]:
  266. """Stream logs from a single source.
  267. Args:
  268. source: Log source to stream from
  269. options: Log options (tail, follow)
  270. stop_event: Event to signal stopping
  271. follow: Whether to follow the log file
  272. Yields:
  273. Log lines from the source
  274. """
  275. from gpustack.worker.logs import LogOptions
  276. files = await source.get_files()
  277. if not files and follow:
  278. # Wait for files to appear in follow mode
  279. if isinstance(source, ContainerLogSource):
  280. return # Container log waiting handled separately
  281. try:
  282. files = await source.wait_for_files()
  283. except Exception:
  284. logger.debug(f"Timeout waiting for files: {source.get_file_pattern()}")
  285. return
  286. if not files:
  287. return
  288. for log_file in files:
  289. if stop_event and stop_event.is_set():
  290. return
  291. file_options = LogOptions(
  292. tail=options.tail if files[-1] == log_file else -1,
  293. follow=options.follow if files[-1] == log_file else False,
  294. stop_event=stop_event,
  295. )
  296. if self._log_generator:
  297. async for line in self._log_generator(str(log_file), file_options):
  298. if stop_event and stop_event.is_set():
  299. return
  300. yield line
  301. async def monitor_container_content(
  302. self,
  303. container_source: ContainerLogSource,
  304. stop_event: asyncio.Event,
  305. check_interval: float = 1.0,
  306. ) -> None:
  307. """Monitor container logs for content and signal stop when found.
  308. Args:
  309. container_source: Container log source to monitor
  310. stop_event: Event to set when container log has content
  311. check_interval: Time between checks in seconds
  312. """
  313. logger.debug("Starting background task to monitor container logs for content")
  314. while not stop_event.is_set():
  315. await asyncio.sleep(check_interval)
  316. if stop_event.is_set():
  317. return
  318. if await container_source.has_content():
  319. logger.debug("Container log now has content, stopping main log follow")
  320. stop_event.set()
  321. return