| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- # coding=utf-8
- """
- 应用 API 限流中间件
- 基于滑动窗口算法实现 API 调用限流
- """
- import time
- from collections import defaultdict
- from django.http import JsonResponse
- from django.utils.deprecation import MiddlewareMixin
- from common.utils.logger import maxkb_logger
- class RateLimitMiddleware(MiddlewareMixin):
- """
- 应用 API 限流中间件
- 使用内存中的滑动窗口算法实现限流
- 适用于单实例部署场景
- """
- def __init__(self, get_response=None):
- super().__init__(get_response)
- # 内存存储:{app_id: [(timestamp, count)]}
- self._requests = defaultdict(list)
- self._last_cleanup = time.time()
- def process_request(self, request):
- """
- 处理请求限流检查
- """
- # 只对应用 API 进行限流检查
- path = request.path
- if not self._is_app_api(path):
- return None
- # 获取应用 ID
- app_id = self._extract_app_id(path, request)
- if not app_id:
- return None
- # 获取或创建限流配置
- rate_config = self._get_rate_config(app_id)
- if not rate_config or not rate_config.get('is_enabled'):
- return None
- # 检查是否超过限流
- client_ip = self._get_client_ip(request)
- is_limited = self._check_rate_limit(
- app_id,
- rate_config['max_requests'],
- rate_config['window_seconds']
- )
- if is_limited:
- maxkb_logger.warning(
- f"Rate limit exceeded for app {app_id} from {client_ip}"
- )
- return JsonResponse({
- 'code': 429,
- 'message': '请求过于频繁,请稍后再试',
- 'data': {
- 'retry_after': rate_config.get('window_seconds', 60)
- }
- }, status=429)
- # 记录请求
- self._record_request(app_id, client_ip, path)
- return None
- def _is_app_api(self, path: str) -> bool:
- """判断是否为应用 API"""
- return '/api/workspace/' in path and '/application/' in path
- def _extract_app_id(self, path: str, request) -> str:
- """从路径或请求中提取应用 ID"""
- # 尝试从路径中提取
- parts = path.split('/')
- for i, part in enumerate(parts):
- if part == 'application' and i + 1 < len(parts):
- return parts[i + 1]
- return None
- def _get_client_ip(self, request) -> str:
- """获取客户端 IP"""
- x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
- if x_forwarded_for:
- return x_forwarded_for.split(',')[0].strip()
- return request.META.get('REMOTE_ADDR', '')
- def _get_rate_config(self, app_id: str) -> dict:
- """
- 获取应用的限流配置
- 从数据库查询,实际生产中应使用缓存
- """
- try:
- from application.models.rate_limit import RateLimit
- from application.models.application import Application
- try:
- application = Application.objects.get(id=app_id)
- except Application.DoesNotExist:
- return None
- try:
- rate_limit = RateLimit.objects.get(application=application)
- return {
- 'is_enabled': rate_limit.is_enabled,
- 'max_requests': rate_limit.max_requests,
- 'window_seconds': rate_limit.window_seconds,
- }
- except RateLimit.DoesNotExist:
- return None
- except Exception as e:
- maxkb_logger.error(f"Failed to get rate config: {e}")
- return None
- def _check_rate_limit(self, app_id: str, max_requests: int,
- window_seconds: int) -> bool:
- """
- 检查是否超过限流
- 使用滑动窗口算法
- """
- now = time.time()
- window_start = now - window_seconds
- # 清理过期记录
- self._requests[app_id] = [
- ts for ts in self._requests[app_id]
- if ts > window_start
- ]
- # 检查是否超过限制
- return len(self._requests[app_id]) >= max_requests
- def _record_request(self, app_id: str, client_ip: str, path: str):
- """记录请求"""
- now = time.time()
- self._requests[app_id].append(now)
- # 定期清理过期数据
- if now - self._last_cleanup > 300: # 5分钟清理一次
- self._cleanup_old_requests()
- self._last_cleanup = now
- def _cleanup_old_requests(self):
- """清理所有过期的请求记录"""
- now = time.time()
- for app_id in list(self._requests.keys()):
- self._requests[app_id] = [
- ts for ts in self._requests[app_id]
- if now - ts < 3600 # 保留1小时内的记录
- ]
- if not self._requests[app_id]:
- del self._requests[app_id]
|