# coding=utf-8 """ 平台API Key服务层 提供平台API Key的CRUD业务逻辑处理 """ from datetime import datetime from typing import Optional from django.utils import timezone from models_provider.models.platform_api_key import PlatformApiKey, PlatformApiKeyStatus from models_provider.services.crypto_utils import ( generate_platform_api_key, hash_api_key, ) # 每用户最大API Key数量限制 MAX_API_KEYS_PER_USER = 5 def create_api_key(user, name=None): """ 创建API Key 生成以 "sk-aigc-" 为前缀、总长度48字符的密钥 返回完整密钥(仅此一次) """ active_count = PlatformApiKey.objects.filter( user=user, status=PlatformApiKeyStatus.ACTIVE ).count() if active_count >= MAX_API_KEYS_PER_USER: raise ValueError(f"已达到API Key数量上限(最多{MAX_API_KEYS_PER_USER}个有效密钥)") full_key, display_prefix = generate_platform_api_key() hashed_key = hash_api_key(full_key) api_key_record = PlatformApiKey.objects.create( user=user, api_key_hash=hashed_key, api_key_prefix=display_prefix, name=name, status=PlatformApiKeyStatus.ACTIVE, ) return { "id": str(api_key_record.id), "api_key": full_key, "api_key_prefix": display_prefix, "name": api_key_record.name, "status": api_key_record.status, "create_time": api_key_record.create_time, } def get_user_api_keys(user): """获取用户的API Key列表(脱敏)""" return PlatformApiKey.objects.filter(user=user).order_by("-create_time") def update_api_key_status(key_id, user, status): """更新API Key状态(启用/禁用)""" if status not in ("active", "disabled"): raise ValueError("状态值无效,必须是 'active' 或 'disabled'") api_key_record = PlatformApiKey.objects.filter(id=key_id, user=user).first() if not api_key_record: raise ValueError("API Key不存在或无权限访问") api_key_record.status = status api_key_record.save(update_fields=["status", "update_time"]) return api_key_record def delete_api_key(key_id, user): """删除API Key""" api_key_record = PlatformApiKey.objects.filter(id=key_id, user=user).first() if not api_key_record: raise ValueError("API Key不存在或无权限访问") api_key_record.delete() return True def verify_api_key(api_key_str): """ 验证API Key,返回 (user_id, key_id) 或 None """ if not api_key_str: return None hashed_key = hash_api_key(api_key_str) api_key_record = PlatformApiKey.objects.filter(api_key_hash=hashed_key).first() if not api_key_record: return None if api_key_record.status != PlatformApiKeyStatus.ACTIVE: return None api_key_record.last_used_at = timezone.now() api_key_record.save(update_fields=["last_used_at"]) return (str(api_key_record.user_id), str(api_key_record.id))