| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- """
- 异步日志队列模块
- 实现异步日志队列和批量写入逻辑,避免日志写入阻塞主请求处理。
- 需求引用: 6.1, 6.2, 6.3, 6.4
- """
- import asyncio
- from collections import deque
- import json
- from typing import Dict, Any, List, Optional, Callable
- from datetime import datetime
- import logging
- logger = logging.getLogger(__name__)
- class AsyncLogQueue:
- """异步日志队列
-
- 实现非阻塞的日志记录机制:
- - enqueue 方法将日志加入内存队列(非阻塞)
- - 后台任务定时批量写入数据库
- - 队列满时丢弃最旧日志并记录告警
- - 写入失败不影响主请求
-
- Attributes:
- queue: 日志队列(使用 deque 实现固定大小)
- batch_size: 每次批量写入的日志数量
- flush_interval: 刷新间隔(秒)
- """
-
- def __init__(
- self,
- max_size: int = 10000,
- batch_size: int = 100,
- flush_interval: float = 5.0
- ):
- """初始化异步日志队列
-
- Args:
- max_size: 队列最大容量,超出时丢弃最旧日志
- batch_size: 每次批量写入的日志数量
- flush_interval: 刷新间隔(秒)
- """
- self._max_size = max_size
- self.queue: deque = deque(maxlen=max_size)
- self.batch_size = batch_size
- self.flush_interval = flush_interval
- self._running = False
- self._task: Optional[asyncio.Task] = None
- self._db_factory: Optional[Callable] = None
- self._overflow_count = 0 # 溢出计数器
-
- async def start(self, db_session_factory: Callable):
- """启动异步日志处理
-
- Args:
- db_session_factory: 异步数据库会话工厂函数
- """
- if self._running:
- logger.warning("异步日志队列已在运行中")
- return
-
- self._running = True
- self._db_factory = db_session_factory
- self._task = asyncio.create_task(self._flush_loop())
- logger.info(f"异步日志队列已启动 (max_size={self._max_size}, batch_size={self.batch_size}, flush_interval={self.flush_interval}s)")
-
- async def stop(self):
- """停止异步日志处理
-
- 优雅关闭:取消后台任务并刷新剩余日志
- """
- if not self._running:
- return
-
- self._running = False
-
- # 取消后台任务
- if self._task:
- self._task.cancel()
- try:
- await self._task
- except asyncio.CancelledError:
- pass
- self._task = None
-
- # 最后刷新一次,确保剩余日志被写入
- remaining = len(self.queue)
- if remaining > 0:
- logger.info(f"正在刷新剩余 {remaining} 条日志...")
- await self._flush_all()
-
- logger.info("异步日志队列已停止")
-
- def enqueue(self, log_data: Dict[str, Any]):
- """将日志加入队列(非阻塞)
-
- 此方法是非阻塞的,不会影响主请求处理。
- 当队列满时,deque 会自动丢弃最旧的日志。
-
- Args:
- log_data: 日志数据字典,包含以下字段:
- - user_id: 用户ID
- - api_path: API路径
- - method: 请求方法
- - status_code: 响应状态码
- - duration_ms: 请求耗时(毫秒)
- - request_params: 请求参数(可选)
- - request_ip: 请求IP(可选)
- """
- # 检查队列是否即将溢出
- if len(self.queue) >= self._max_size:
- self._overflow_count += 1
- # 每丢弃100条日志记录一次告警
- if self._overflow_count % 100 == 1:
- logger.warning(f"异步日志队列已满,正在丢弃旧日志 (累计丢弃: {self._overflow_count})")
-
- # 添加入队时间戳
- log_data["queued_at"] = datetime.now()
- self.queue.append(log_data)
-
- async def _flush_loop(self):
- """定时刷新循环
-
- 后台任务,定时将队列中的日志批量写入数据库
- """
- while self._running:
- try:
- await asyncio.sleep(self.flush_interval)
- await self._flush()
- except asyncio.CancelledError:
- break
- except Exception as e:
- logger.error(f"日志刷新循环异常: {e}")
-
- async def _flush(self):
- """批量写入数据库
-
- 从队列中取出一批日志并写入数据库。
- 写入失败时记录错误但不影响主请求。
- """
- if not self.queue or not self._db_factory:
- return
-
- # 取出一批日志
- batch: List[Dict[str, Any]] = []
- while self.queue and len(batch) < self.batch_size:
- try:
- batch.append(self.queue.popleft())
- except IndexError:
- break
-
- if not batch:
- return
-
- try:
- async with self._db_factory() as session:
- await self._batch_insert(session, batch)
- await session.commit()
- logger.debug(f"成功写入 {len(batch)} 条日志")
- except Exception as e:
- logger.error(f"批量写入日志失败 ({len(batch)} 条): {e}")
- # 写入失败不重新入队,避免无限循环
-
- async def _flush_all(self):
- """刷新所有剩余日志
-
- 用于优雅关闭时,确保所有日志都被写入
- """
- while self.queue:
- await self._flush()
-
- async def _batch_insert(self, session, logs: List[Dict[str, Any]]):
- """批量插入日志记录
-
- 使用参数化查询防止SQL注入,提高安全性。
-
- Args:
- session: 异步数据库会话
- logs: 日志数据列表
- """
- if not logs:
- return
-
- from sqlalchemy import text
-
- # 构建批量插入SQL(使用参数化查询)
- values_placeholders = []
- params = {}
-
- for i, log in enumerate(logs):
- placeholder = f"(:user_id_{i}, :username_{i}, :api_path_{i}, :request_method_{i}, :request_params_{i}, :response_status_{i}, :response_time_{i}, NOW())"
- values_placeholders.append(placeholder)
- params[f"user_id_{i}"] = log.get("user_id")
- params[f"username_{i}"] = log.get("username")
- params[f"api_path_{i}"] = log.get("api_path", "")
- params[f"request_method_{i}"] = log.get("method", "GET")
- # request_params stored as JSONB in DB; pass as JSON string (double-quoted) or None
- params[f"request_params_{i}"] = (json.dumps(log.get("request_params")) if log.get("request_params") is not None else None)
- params[f"response_status_{i}"] = log.get("status_code", 200)
- params[f"response_time_{i}"] = log.get("duration_ms", 0)
-
- sql = f"""
- INSERT INTO aigcspace.api_log
- (user_id, username, api_path, request_method, request_params, response_status, response_time, created_at)
- VALUES {', '.join(values_placeholders)}
- """
-
- await session.execute(text(sql), params)
-
- @property
- def pending_count(self) -> int:
- """获取待处理的日志数量"""
- return len(self.queue)
-
- @property
- def overflow_count(self) -> int:
- """获取溢出(丢弃)的日志数量"""
- return self._overflow_count
-
- @property
- def is_running(self) -> bool:
- """检查队列是否正在运行"""
- return self._running
- # 全局单例
- async_log_queue = AsyncLogQueue()
|