async_logger.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. """
  2. 异步日志队列模块
  3. 实现异步日志队列和批量写入逻辑,避免日志写入阻塞主请求处理。
  4. 需求引用: 6.1, 6.2, 6.3, 6.4
  5. """
  6. import asyncio
  7. from collections import deque
  8. import json
  9. from typing import Dict, Any, List, Optional, Callable
  10. from datetime import datetime
  11. import logging
  12. logger = logging.getLogger(__name__)
  13. class AsyncLogQueue:
  14. """异步日志队列
  15. 实现非阻塞的日志记录机制:
  16. - enqueue 方法将日志加入内存队列(非阻塞)
  17. - 后台任务定时批量写入数据库
  18. - 队列满时丢弃最旧日志并记录告警
  19. - 写入失败不影响主请求
  20. Attributes:
  21. queue: 日志队列(使用 deque 实现固定大小)
  22. batch_size: 每次批量写入的日志数量
  23. flush_interval: 刷新间隔(秒)
  24. """
  25. def __init__(
  26. self,
  27. max_size: int = 10000,
  28. batch_size: int = 100,
  29. flush_interval: float = 5.0
  30. ):
  31. """初始化异步日志队列
  32. Args:
  33. max_size: 队列最大容量,超出时丢弃最旧日志
  34. batch_size: 每次批量写入的日志数量
  35. flush_interval: 刷新间隔(秒)
  36. """
  37. self._max_size = max_size
  38. self.queue: deque = deque(maxlen=max_size)
  39. self.batch_size = batch_size
  40. self.flush_interval = flush_interval
  41. self._running = False
  42. self._task: Optional[asyncio.Task] = None
  43. self._db_factory: Optional[Callable] = None
  44. self._overflow_count = 0 # 溢出计数器
  45. async def start(self, db_session_factory: Callable):
  46. """启动异步日志处理
  47. Args:
  48. db_session_factory: 异步数据库会话工厂函数
  49. """
  50. if self._running:
  51. logger.warning("异步日志队列已在运行中")
  52. return
  53. self._running = True
  54. self._db_factory = db_session_factory
  55. self._task = asyncio.create_task(self._flush_loop())
  56. logger.info(f"异步日志队列已启动 (max_size={self._max_size}, batch_size={self.batch_size}, flush_interval={self.flush_interval}s)")
  57. async def stop(self):
  58. """停止异步日志处理
  59. 优雅关闭:取消后台任务并刷新剩余日志
  60. """
  61. if not self._running:
  62. return
  63. self._running = False
  64. # 取消后台任务
  65. if self._task:
  66. self._task.cancel()
  67. try:
  68. await self._task
  69. except asyncio.CancelledError:
  70. pass
  71. self._task = None
  72. # 最后刷新一次,确保剩余日志被写入
  73. remaining = len(self.queue)
  74. if remaining > 0:
  75. logger.info(f"正在刷新剩余 {remaining} 条日志...")
  76. await self._flush_all()
  77. logger.info("异步日志队列已停止")
  78. def enqueue(self, log_data: Dict[str, Any]):
  79. """将日志加入队列(非阻塞)
  80. 此方法是非阻塞的,不会影响主请求处理。
  81. 当队列满时,deque 会自动丢弃最旧的日志。
  82. Args:
  83. log_data: 日志数据字典,包含以下字段:
  84. - user_id: 用户ID
  85. - api_path: API路径
  86. - method: 请求方法
  87. - status_code: 响应状态码
  88. - duration_ms: 请求耗时(毫秒)
  89. - request_params: 请求参数(可选)
  90. - request_ip: 请求IP(可选)
  91. """
  92. # 检查队列是否即将溢出
  93. if len(self.queue) >= self._max_size:
  94. self._overflow_count += 1
  95. # 每丢弃100条日志记录一次告警
  96. if self._overflow_count % 100 == 1:
  97. logger.warning(f"异步日志队列已满,正在丢弃旧日志 (累计丢弃: {self._overflow_count})")
  98. # 添加入队时间戳
  99. log_data["queued_at"] = datetime.now()
  100. self.queue.append(log_data)
  101. async def _flush_loop(self):
  102. """定时刷新循环
  103. 后台任务,定时将队列中的日志批量写入数据库
  104. """
  105. while self._running:
  106. try:
  107. await asyncio.sleep(self.flush_interval)
  108. await self._flush()
  109. except asyncio.CancelledError:
  110. break
  111. except Exception as e:
  112. logger.error(f"日志刷新循环异常: {e}")
  113. async def _flush(self):
  114. """批量写入数据库
  115. 从队列中取出一批日志并写入数据库。
  116. 写入失败时记录错误但不影响主请求。
  117. """
  118. if not self.queue or not self._db_factory:
  119. return
  120. # 取出一批日志
  121. batch: List[Dict[str, Any]] = []
  122. while self.queue and len(batch) < self.batch_size:
  123. try:
  124. batch.append(self.queue.popleft())
  125. except IndexError:
  126. break
  127. if not batch:
  128. return
  129. try:
  130. async with self._db_factory() as session:
  131. await self._batch_insert(session, batch)
  132. await session.commit()
  133. logger.debug(f"成功写入 {len(batch)} 条日志")
  134. except Exception as e:
  135. logger.error(f"批量写入日志失败 ({len(batch)} 条): {e}")
  136. # 写入失败不重新入队,避免无限循环
  137. async def _flush_all(self):
  138. """刷新所有剩余日志
  139. 用于优雅关闭时,确保所有日志都被写入
  140. """
  141. while self.queue:
  142. await self._flush()
  143. async def _batch_insert(self, session, logs: List[Dict[str, Any]]):
  144. """批量插入日志记录
  145. 使用参数化查询防止SQL注入,提高安全性。
  146. Args:
  147. session: 异步数据库会话
  148. logs: 日志数据列表
  149. """
  150. if not logs:
  151. return
  152. from sqlalchemy import text
  153. # 构建批量插入SQL(使用参数化查询)
  154. values_placeholders = []
  155. params = {}
  156. for i, log in enumerate(logs):
  157. placeholder = f"(:user_id_{i}, :username_{i}, :api_path_{i}, :request_method_{i}, :request_params_{i}, :response_status_{i}, :response_time_{i}, NOW())"
  158. values_placeholders.append(placeholder)
  159. params[f"user_id_{i}"] = log.get("user_id")
  160. params[f"username_{i}"] = log.get("username")
  161. params[f"api_path_{i}"] = log.get("api_path", "")
  162. params[f"request_method_{i}"] = log.get("method", "GET")
  163. # request_params stored as JSONB in DB; pass as JSON string (double-quoted) or None
  164. params[f"request_params_{i}"] = (json.dumps(log.get("request_params")) if log.get("request_params") is not None else None)
  165. params[f"response_status_{i}"] = log.get("status_code", 200)
  166. params[f"response_time_{i}"] = log.get("duration_ms", 0)
  167. sql = f"""
  168. INSERT INTO aigcspace.api_log
  169. (user_id, username, api_path, request_method, request_params, response_status, response_time, created_at)
  170. VALUES {', '.join(values_placeholders)}
  171. """
  172. await session.execute(text(sql), params)
  173. @property
  174. def pending_count(self) -> int:
  175. """获取待处理的日志数量"""
  176. return len(self.queue)
  177. @property
  178. def overflow_count(self) -> int:
  179. """获取溢出(丢弃)的日志数量"""
  180. return self._overflow_count
  181. @property
  182. def is_running(self) -> bool:
  183. """检查队列是否正在运行"""
  184. return self._running
  185. # 全局单例
  186. async_log_queue = AsyncLogQueue()