# coding=utf-8 """ 应用 API 限流中间件 基于滑动窗口算法实现 API 调用限流 """ import time from collections import defaultdict from django.http import JsonResponse from django.utils.deprecation import MiddlewareMixin from common.utils.logger import maxkb_logger class RateLimitMiddleware(MiddlewareMixin): """ 应用 API 限流中间件 使用内存中的滑动窗口算法实现限流 适用于单实例部署场景 """ def __init__(self, get_response=None): super().__init__(get_response) # 内存存储:{app_id: [(timestamp, count)]} self._requests = defaultdict(list) self._last_cleanup = time.time() def process_request(self, request): """ 处理请求限流检查 """ # 只对应用 API 进行限流检查 path = request.path if not self._is_app_api(path): return None # 获取应用 ID app_id = self._extract_app_id(path, request) if not app_id: return None # 获取或创建限流配置 rate_config = self._get_rate_config(app_id) if not rate_config or not rate_config.get('is_enabled'): return None # 检查是否超过限流 client_ip = self._get_client_ip(request) is_limited = self._check_rate_limit( app_id, rate_config['max_requests'], rate_config['window_seconds'] ) if is_limited: maxkb_logger.warning( f"Rate limit exceeded for app {app_id} from {client_ip}" ) return JsonResponse({ 'code': 429, 'message': '请求过于频繁,请稍后再试', 'data': { 'retry_after': rate_config.get('window_seconds', 60) } }, status=429) # 记录请求 self._record_request(app_id, client_ip, path) return None def _is_app_api(self, path: str) -> bool: """判断是否为应用 API""" return '/api/workspace/' in path and '/application/' in path def _extract_app_id(self, path: str, request) -> str: """从路径或请求中提取应用 ID""" # 尝试从路径中提取 parts = path.split('/') for i, part in enumerate(parts): if part == 'application' and i + 1 < len(parts): return parts[i + 1] return None def _get_client_ip(self, request) -> str: """获取客户端 IP""" x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: return x_forwarded_for.split(',')[0].strip() return request.META.get('REMOTE_ADDR', '') def _get_rate_config(self, app_id: str) -> dict: """ 获取应用的限流配置 从数据库查询,实际生产中应使用缓存 """ try: from application.models.rate_limit import RateLimit from application.models.application import Application try: application = Application.objects.get(id=app_id) except Application.DoesNotExist: return None try: rate_limit = RateLimit.objects.get(application=application) return { 'is_enabled': rate_limit.is_enabled, 'max_requests': rate_limit.max_requests, 'window_seconds': rate_limit.window_seconds, } except RateLimit.DoesNotExist: return None except Exception as e: maxkb_logger.error(f"Failed to get rate config: {e}") return None def _check_rate_limit(self, app_id: str, max_requests: int, window_seconds: int) -> bool: """ 检查是否超过限流 使用滑动窗口算法 """ now = time.time() window_start = now - window_seconds # 清理过期记录 self._requests[app_id] = [ ts for ts in self._requests[app_id] if ts > window_start ] # 检查是否超过限制 return len(self._requests[app_id]) >= max_requests def _record_request(self, app_id: str, client_ip: str, path: str): """记录请求""" now = time.time() self._requests[app_id].append(now) # 定期清理过期数据 if now - self._last_cleanup > 300: # 5分钟清理一次 self._cleanup_old_requests() self._last_cleanup = now def _cleanup_old_requests(self): """清理所有过期的请求记录""" now = time.time() for app_id in list(self._requests.keys()): self._requests[app_id] = [ ts for ts in self._requests[app_id] if now - ts < 3600 # 保留1小时内的记录 ] if not self._requests[app_id]: del self._requests[app_id]