""" 异步日志队列模块 实现异步日志队列和批量写入逻辑,避免日志写入阻塞主请求处理。 需求引用: 6.1, 6.2, 6.3, 6.4 """ import asyncio from collections import deque import json from typing import Dict, Any, List, Optional, Callable from datetime import datetime import logging logger = logging.getLogger(__name__) class AsyncLogQueue: """异步日志队列 实现非阻塞的日志记录机制: - enqueue 方法将日志加入内存队列(非阻塞) - 后台任务定时批量写入数据库 - 队列满时丢弃最旧日志并记录告警 - 写入失败不影响主请求 Attributes: queue: 日志队列(使用 deque 实现固定大小) batch_size: 每次批量写入的日志数量 flush_interval: 刷新间隔(秒) """ def __init__( self, max_size: int = 10000, batch_size: int = 100, flush_interval: float = 5.0 ): """初始化异步日志队列 Args: max_size: 队列最大容量,超出时丢弃最旧日志 batch_size: 每次批量写入的日志数量 flush_interval: 刷新间隔(秒) """ self._max_size = max_size self.queue: deque = deque(maxlen=max_size) self.batch_size = batch_size self.flush_interval = flush_interval self._running = False self._task: Optional[asyncio.Task] = None self._db_factory: Optional[Callable] = None self._overflow_count = 0 # 溢出计数器 async def start(self, db_session_factory: Callable): """启动异步日志处理 Args: db_session_factory: 异步数据库会话工厂函数 """ if self._running: logger.warning("异步日志队列已在运行中") return self._running = True self._db_factory = db_session_factory self._task = asyncio.create_task(self._flush_loop()) logger.info(f"异步日志队列已启动 (max_size={self._max_size}, batch_size={self.batch_size}, flush_interval={self.flush_interval}s)") async def stop(self): """停止异步日志处理 优雅关闭:取消后台任务并刷新剩余日志 """ if not self._running: return self._running = False # 取消后台任务 if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None # 最后刷新一次,确保剩余日志被写入 remaining = len(self.queue) if remaining > 0: logger.info(f"正在刷新剩余 {remaining} 条日志...") await self._flush_all() logger.info("异步日志队列已停止") def enqueue(self, log_data: Dict[str, Any]): """将日志加入队列(非阻塞) 此方法是非阻塞的,不会影响主请求处理。 当队列满时,deque 会自动丢弃最旧的日志。 Args: log_data: 日志数据字典,包含以下字段: - user_id: 用户ID - api_path: API路径 - method: 请求方法 - status_code: 响应状态码 - duration_ms: 请求耗时(毫秒) - request_params: 请求参数(可选) - request_ip: 请求IP(可选) """ # 检查队列是否即将溢出 if len(self.queue) >= self._max_size: self._overflow_count += 1 # 每丢弃100条日志记录一次告警 if self._overflow_count % 100 == 1: logger.warning(f"异步日志队列已满,正在丢弃旧日志 (累计丢弃: {self._overflow_count})") # 添加入队时间戳 log_data["queued_at"] = datetime.now() self.queue.append(log_data) async def _flush_loop(self): """定时刷新循环 后台任务,定时将队列中的日志批量写入数据库 """ while self._running: try: await asyncio.sleep(self.flush_interval) await self._flush() except asyncio.CancelledError: break except Exception as e: logger.error(f"日志刷新循环异常: {e}") async def _flush(self): """批量写入数据库 从队列中取出一批日志并写入数据库。 写入失败时记录错误但不影响主请求。 """ if not self.queue or not self._db_factory: return # 取出一批日志 batch: List[Dict[str, Any]] = [] while self.queue and len(batch) < self.batch_size: try: batch.append(self.queue.popleft()) except IndexError: break if not batch: return try: async with self._db_factory() as session: await self._batch_insert(session, batch) await session.commit() logger.debug(f"成功写入 {len(batch)} 条日志") except Exception as e: logger.error(f"批量写入日志失败 ({len(batch)} 条): {e}") # 写入失败不重新入队,避免无限循环 async def _flush_all(self): """刷新所有剩余日志 用于优雅关闭时,确保所有日志都被写入 """ while self.queue: await self._flush() async def _batch_insert(self, session, logs: List[Dict[str, Any]]): """批量插入日志记录 使用参数化查询防止SQL注入,提高安全性。 Args: session: 异步数据库会话 logs: 日志数据列表 """ if not logs: return from sqlalchemy import text # 构建批量插入SQL(使用参数化查询) values_placeholders = [] params = {} for i, log in enumerate(logs): placeholder = f"(:user_id_{i}, :username_{i}, :api_path_{i}, :request_method_{i}, :request_params_{i}, :response_status_{i}, :response_time_{i}, NOW())" values_placeholders.append(placeholder) params[f"user_id_{i}"] = log.get("user_id") params[f"username_{i}"] = log.get("username") params[f"api_path_{i}"] = log.get("api_path", "") params[f"request_method_{i}"] = log.get("method", "GET") # request_params stored as JSONB in DB; pass as JSON string (double-quoted) or None params[f"request_params_{i}"] = (json.dumps(log.get("request_params")) if log.get("request_params") is not None else None) params[f"response_status_{i}"] = log.get("status_code", 200) params[f"response_time_{i}"] = log.get("duration_ms", 0) sql = f""" INSERT INTO aigcspace.api_log (user_id, username, api_path, request_method, request_params, response_status, response_time, created_at) VALUES {', '.join(values_placeholders)} """ await session.execute(text(sql), params) @property def pending_count(self) -> int: """获取待处理的日志数量""" return len(self.queue) @property def overflow_count(self) -> int: """获取溢出(丢弃)的日志数量""" return self._overflow_count @property def is_running(self) -> bool: """检查队列是否正在运行""" return self._running # 全局单例 async_log_queue = AsyncLogQueue()