logs.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import asyncio
  2. from dataclasses import dataclass
  3. import os
  4. import logging
  5. from typing import Annotated, Optional
  6. import aiofiles
  7. from aiofiles.threadpool.text import AsyncTextIOWrapper
  8. from fastapi import Depends, Query
  9. logger = logging.getLogger(__name__)
  10. @dataclass
  11. class LogOptions:
  12. tail: int = -1 # -1 by default means read all logs
  13. follow: bool = False
  14. stop_event: Optional[asyncio.Event] = None
  15. previous: bool = False
  16. def url_encode(self):
  17. params = f"tail={self.tail}&follow={self.follow}"
  18. if self.previous:
  19. params += "&previous=true"
  20. return params
  21. default_tail = Query(
  22. default=-1, description="Number of lines to read from the end of the log"
  23. )
  24. default_follow = Query(default=False, description="Whether to follow the log output")
  25. default_previous = Query(
  26. default=False, description="Whether to fetch logs from the previous restart"
  27. )
  28. def get_log_options(
  29. tail: int = default_tail,
  30. follow: bool = default_follow,
  31. previous: bool = default_previous,
  32. ) -> LogOptions:
  33. return LogOptions(tail=tail, follow=follow, previous=previous)
  34. LogOptionsDep = Annotated[LogOptions, Depends(get_log_options)]
  35. async def log_generator(path: str, options: LogOptions):
  36. logger.debug(f"Reading logs from {path} with options {options}")
  37. try:
  38. # By default, universal newline mode is used, which means that all of
  39. # \n, \r, or \r\n are recognized as end-of-line characters.
  40. # We use os.linesep to ensure that \r is reserved. It's useful for showing progress bars.
  41. async with aiofiles.open(
  42. path, "r", encoding="utf-8", errors="ignore", newline=os.linesep
  43. ) as file:
  44. if options.tail > 0:
  45. # Move to the end of the file and read the last 'tail' lines
  46. await file.seek(0, os.SEEK_END)
  47. file_size = await file.tell()
  48. buffer = []
  49. BLOCK_SIZE = 2**16 # 64KB
  50. while file_size > 0 and len(buffer) <= options.tail:
  51. await file.seek(max(0, file_size - BLOCK_SIZE), os.SEEK_SET)
  52. buffer = await file.readlines()
  53. file_size -= BLOCK_SIZE
  54. for line in buffer[-options.tail :]:
  55. yield line
  56. else:
  57. async for line in read_all_lines(file):
  58. yield line
  59. if options.follow:
  60. async for line in follow_file(file, options.stop_event):
  61. yield line
  62. except Exception as e:
  63. logger.error(f"Failed to read logs from {path}. {e}")
  64. async def read_all_lines(file: AsyncTextIOWrapper):
  65. """Read all lines from the file."""
  66. while True:
  67. line = await file.readline()
  68. if not line:
  69. break
  70. yield line
  71. async def follow_file(
  72. file: AsyncTextIOWrapper, stop_event: Optional[asyncio.Event] = None
  73. ):
  74. """Follow the file and yield new lines as they are written."""
  75. while True:
  76. if stop_event and stop_event.is_set():
  77. return
  78. line = await file.readline()
  79. if not line:
  80. await asyncio.sleep(0.1) # wait before retrying
  81. continue
  82. yield line