| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- """
- Log source strategies for unified log streaming.
- This module provides a strategy pattern + chain of responsibility approach
- for handling different log sources (download logs, main logs, container logs).
- """
- import asyncio
- import logging
- from abc import ABC, abstractmethod
- from pathlib import Path
- from typing import AsyncGenerator, Callable, List, Optional
- from gpustack.utils import file
- logger = logging.getLogger(__name__)
- async def has_log_content(log_file: Path) -> bool:
- """Check if log file has any actual content.
- Args:
- log_file: Path to log file
- Returns:
- True if file exists and has size > 0
- """
- return await asyncio.to_thread(
- lambda: log_file.exists() and log_file.stat().st_size > 0
- )
- class LogSource(ABC):
- """Abstract base class for log sources.
- Each log source knows how to get its files and wait for them if needed.
- This follows the Strategy pattern used in the project
- (similar to LoadBalancingStrategy).
- """
- @abstractmethod
- async def get_files(self) -> List[Path]:
- """Get log files for this source.
- Returns:
- List of log file paths (may be empty)
- """
- pass
- @abstractmethod
- def get_file_pattern(self) -> str:
- """Get the file pattern for this source (for logging purposes).
- Returns:
- Pattern string for identification
- """
- pass
- async def is_valid_source(self) -> bool:
- """Check if this source is valid and should be waited for.
- Override this method to add preconditions for waiting.
- For example, DownloadLogSource returns False if log_path is None.
- Returns:
- True if the source is valid and should be waited for
- """
- return True
- async def get_files_with_log(self) -> List[Path]:
- """Get files with debug logging.
- Returns:
- List of log file paths
- """
- files = await self.get_files()
- if files:
- logger.debug(f"Found files for {self.get_file_pattern()}: {files}")
- return files
- async def wait_for_files(self, timeout: int = 300, **kwargs) -> List[Path]:
- """Wait for log files to appear.
- Uses the project's check_with_retries utility for consistent retry behavior.
- Subclasses should override is_valid_source() instead of this method
- unless they need completely different waiting logic.
- Args:
- timeout: Maximum time to wait in seconds
- **kwargs: Additional arguments for subclass implementations
- Returns:
- List of log file paths
- """
- if not await self.is_valid_source():
- return []
- async def check():
- files = await self.get_files()
- if not files:
- raise FileNotFoundError(
- f"Log files not found for source: {self.get_file_pattern()}"
- )
- return files
- files = await file.check_with_retries(check, timeout=timeout)
- logger.debug(f"Found files after waiting: {self.get_file_pattern()}")
- return files
- async def wait_for_files_if_needed(
- self,
- follow: bool,
- timeout: int = 300,
- **kwargs,
- ) -> List[Path]:
- """Get files, waiting if necessary in follow mode.
- Args:
- follow: Whether in follow mode (triggers waiting if files not found)
- timeout: Maximum time to wait in seconds
- **kwargs: Additional arguments for wait_for_files
- Returns:
- List of log file paths
- """
- files = await self.get_files_with_log()
- if files:
- return files
- if follow and await self.is_valid_source():
- try:
- files = await self.wait_for_files(timeout=timeout, **kwargs)
- except Exception:
- pass
- return files
- class DownloadLogSource(LogSource):
- """Log source for download logs."""
- def __init__(self, log_path: Optional[str]):
- self.log_path = Path(log_path) if log_path else None
- async def get_files(self) -> List[Path]:
- if not self.log_path:
- return []
- if await asyncio.to_thread(self.log_path.exists):
- return [self.log_path]
- return []
- def get_file_pattern(self) -> str:
- return str(self.log_path) if self.log_path else "download_log"
- async def is_valid_source(self) -> bool:
- return self.log_path is not None
- class MainLogSource(LogSource):
- """Log source for main (historical) logs."""
- def __init__(
- self,
- log_dir: Path,
- model_instance_id: int,
- restart_count: Optional[int] = None,
- get_all_log_files_fn: Optional[Callable] = None,
- ):
- self.log_dir = log_dir
- self.model_instance_id = model_instance_id
- self.restart_count = restart_count
- self._get_all_log_files = get_all_log_files_fn
- async def get_files(self) -> List[Path]:
- if self._get_all_log_files:
- return await self._get_all_log_files(
- self.log_dir,
- self.model_instance_id,
- restart_count=self.restart_count,
- )
- return []
- def get_file_pattern(self) -> str:
- if self.restart_count is not None:
- return f"{self.model_instance_id}.*.{self.restart_count}.log"
- return f"{self.model_instance_id}.*.log"
- class ContainerLogSource(LogSource):
- """Log source for container logs."""
- def __init__(
- self,
- log_dir: Path,
- model_instance_id: int,
- restart_count: Optional[int] = None,
- get_all_log_files_fn: Optional[Callable] = None,
- extract_restart_count_fn: Optional[Callable] = None,
- ):
- self.log_dir = log_dir
- self.model_instance_id = model_instance_id
- self.restart_count = restart_count
- self._get_all_log_files = get_all_log_files_fn
- self._extract_restart_count = extract_restart_count_fn
- async def get_files(self) -> List[Path]:
- if self._get_all_log_files:
- return await self._get_all_log_files(
- self.log_dir,
- self.model_instance_id,
- container=True,
- restart_count=self.restart_count,
- )
- return []
- def get_file_pattern(self) -> str:
- if self.restart_count is not None:
- return f"{self.model_instance_id}.container.{self.restart_count}.log"
- return f"{self.model_instance_id}.container.*.log"
- def _get_expected_file(self, main_log_files: List[Path]) -> Optional[Path]:
- """Infer expected container log file from main log files.
- Args:
- main_log_files: Main log files to infer container log name from
- Returns:
- Expected container log file path, or None if cannot infer
- """
- if not main_log_files or not self._extract_restart_count:
- return None
- expected_restart = self._extract_restart_count(main_log_files[-1].name)
- return self.log_dir / (
- f"{self.model_instance_id}.container.{expected_restart}.log"
- )
- async def wait_for_files(
- self,
- timeout: int = 300,
- main_log_files: Optional[List[Path]] = None,
- ) -> List[Path]:
- """Wait for container log files to appear.
- Container logs need special handling because the expected file name
- depends on the main log's restart count.
- Args:
- timeout: Maximum time to wait in seconds
- main_log_files: Main log files to infer expected container log name
- Returns:
- List of container log file paths
- """
- # First check if files already exist
- files = await self.get_files()
- if files:
- return files
- # Try to infer expected file from main logs
- expected_file = self._get_expected_file(main_log_files)
- if not expected_file:
- return []
- # Reuse base class retry logic with custom check function
- async def check():
- if not await asyncio.to_thread(expected_file.exists):
- raise FileNotFoundError(
- f"Container log file not found: {expected_file}"
- )
- return [expected_file]
- files = await file.check_with_retries(check, timeout=timeout)
- logger.debug(f"Found container log after waiting: {expected_file}")
- return files
- async def wait_for_files_if_needed(
- self,
- follow: bool,
- timeout: int = 300,
- main_log_files: Optional[List[Path]] = None,
- ) -> List[Path]:
- """Get files, waiting if necessary in follow mode.
- Args:
- follow: Whether in follow mode (triggers waiting if files not found)
- timeout: Maximum time to wait in seconds
- main_log_files: Main log files to infer expected container log name
- Returns:
- List of log file paths
- """
- files = await self.get_files_with_log()
- if files:
- return files
- if follow:
- files = await self.wait_for_files(
- timeout=timeout, main_log_files=main_log_files
- )
- return files
- async def has_content(self) -> bool:
- """Check if any container log file has actual content."""
- files = await self.get_files()
- for f in files:
- if await has_log_content(f):
- return True
- return False
- class LogSourceChain:
- """Chain of log sources for sequential log streaming.
- Similar to WorkerFilterChain in the project, this applies sources
- in order and yields log lines from each.
- """
- def __init__(
- self,
- sources: List[LogSource],
- log_generator_fn: Optional[Callable] = None,
- ):
- self.sources = sources
- self._log_generator = log_generator_fn
- async def stream_source(
- self,
- source: LogSource,
- options,
- stop_event: Optional[asyncio.Event] = None,
- follow: bool = True,
- ) -> AsyncGenerator[str, None]:
- """Stream logs from a single source.
- Args:
- source: Log source to stream from
- options: Log options (tail, follow)
- stop_event: Event to signal stopping
- follow: Whether to follow the log file
- Yields:
- Log lines from the source
- """
- from gpustack.worker.logs import LogOptions
- files = await source.get_files()
- if not files and follow:
- # Wait for files to appear in follow mode
- if isinstance(source, ContainerLogSource):
- return # Container log waiting handled separately
- try:
- files = await source.wait_for_files()
- except Exception:
- logger.debug(f"Timeout waiting for files: {source.get_file_pattern()}")
- return
- if not files:
- return
- for log_file in files:
- if stop_event and stop_event.is_set():
- return
- file_options = LogOptions(
- tail=options.tail if files[-1] == log_file else -1,
- follow=options.follow if files[-1] == log_file else False,
- stop_event=stop_event,
- )
- if self._log_generator:
- async for line in self._log_generator(str(log_file), file_options):
- if stop_event and stop_event.is_set():
- return
- yield line
- async def monitor_container_content(
- self,
- container_source: ContainerLogSource,
- stop_event: asyncio.Event,
- check_interval: float = 1.0,
- ) -> None:
- """Monitor container logs for content and signal stop when found.
- Args:
- container_source: Container log source to monitor
- stop_event: Event to set when container log has content
- check_interval: Time between checks in seconds
- """
- logger.debug("Starting background task to monitor container logs for content")
- while not stop_event.is_set():
- await asyncio.sleep(check_interval)
- if stop_event.is_set():
- return
- if await container_source.has_content():
- logger.debug("Container log now has content, stopping main log follow")
- stop_event.set()
- return
|