| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- import asyncio
- from dataclasses import dataclass
- import os
- import logging
- from typing import Annotated, Optional
- import aiofiles
- from aiofiles.threadpool.text import AsyncTextIOWrapper
- from fastapi import Depends, Query
- logger = logging.getLogger(__name__)
- @dataclass
- class LogOptions:
- tail: int = -1 # -1 by default means read all logs
- follow: bool = False
- stop_event: Optional[asyncio.Event] = None
- previous: bool = False
- def url_encode(self):
- params = f"tail={self.tail}&follow={self.follow}"
- if self.previous:
- params += "&previous=true"
- return params
- default_tail = Query(
- default=-1, description="Number of lines to read from the end of the log"
- )
- default_follow = Query(default=False, description="Whether to follow the log output")
- default_previous = Query(
- default=False, description="Whether to fetch logs from the previous restart"
- )
- def get_log_options(
- tail: int = default_tail,
- follow: bool = default_follow,
- previous: bool = default_previous,
- ) -> LogOptions:
- return LogOptions(tail=tail, follow=follow, previous=previous)
- LogOptionsDep = Annotated[LogOptions, Depends(get_log_options)]
- async def log_generator(path: str, options: LogOptions):
- logger.debug(f"Reading logs from {path} with options {options}")
- try:
- # By default, universal newline mode is used, which means that all of
- # \n, \r, or \r\n are recognized as end-of-line characters.
- # We use os.linesep to ensure that \r is reserved. It's useful for showing progress bars.
- async with aiofiles.open(
- path, "r", encoding="utf-8", errors="ignore", newline=os.linesep
- ) as file:
- if options.tail > 0:
- # Move to the end of the file and read the last 'tail' lines
- await file.seek(0, os.SEEK_END)
- file_size = await file.tell()
- buffer = []
- BLOCK_SIZE = 2**16 # 64KB
- while file_size > 0 and len(buffer) <= options.tail:
- await file.seek(max(0, file_size - BLOCK_SIZE), os.SEEK_SET)
- buffer = await file.readlines()
- file_size -= BLOCK_SIZE
- for line in buffer[-options.tail :]:
- yield line
- else:
- async for line in read_all_lines(file):
- yield line
- if options.follow:
- async for line in follow_file(file, options.stop_event):
- yield line
- except Exception as e:
- logger.error(f"Failed to read logs from {path}. {e}")
- async def read_all_lines(file: AsyncTextIOWrapper):
- """Read all lines from the file."""
- while True:
- line = await file.readline()
- if not line:
- break
- yield line
- async def follow_file(
- file: AsyncTextIOWrapper, stop_event: Optional[asyncio.Event] = None
- ):
- """Follow the file and yield new lines as they are written."""
- while True:
- if stop_event and stop_event.is_set():
- return
- line = await file.readline()
- if not line:
- await asyncio.sleep(0.1) # wait before retrying
- continue
- yield line
|