| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- # coding=utf-8
- """
- 平台API Key服务层
- 提供平台API Key的CRUD业务逻辑处理
- """
- from datetime import datetime
- from typing import Optional
- from django.utils import timezone
- from models_provider.models.platform_api_key import PlatformApiKey, PlatformApiKeyStatus
- from models_provider.services.crypto_utils import (
- generate_platform_api_key,
- hash_api_key,
- )
- # 每用户最大API Key数量限制
- MAX_API_KEYS_PER_USER = 5
- def create_api_key(user, name=None):
- """
- 创建API Key
- 生成以 "sk-aigc-" 为前缀、总长度48字符的密钥
- 返回完整密钥(仅此一次)
- """
- active_count = PlatformApiKey.objects.filter(
- user=user, status=PlatformApiKeyStatus.ACTIVE
- ).count()
- if active_count >= MAX_API_KEYS_PER_USER:
- raise ValueError(f"已达到API Key数量上限(最多{MAX_API_KEYS_PER_USER}个有效密钥)")
- full_key, display_prefix = generate_platform_api_key()
- hashed_key = hash_api_key(full_key)
- api_key_record = PlatformApiKey.objects.create(
- user=user,
- api_key_hash=hashed_key,
- api_key_prefix=display_prefix,
- name=name,
- status=PlatformApiKeyStatus.ACTIVE,
- )
- return {
- "id": str(api_key_record.id),
- "api_key": full_key,
- "api_key_prefix": display_prefix,
- "name": api_key_record.name,
- "status": api_key_record.status,
- "create_time": api_key_record.create_time,
- }
- def get_user_api_keys(user):
- """获取用户的API Key列表(脱敏)"""
- return PlatformApiKey.objects.filter(user=user).order_by("-create_time")
- def update_api_key_status(key_id, user, status):
- """更新API Key状态(启用/禁用)"""
- if status not in ("active", "disabled"):
- raise ValueError("状态值无效,必须是 'active' 或 'disabled'")
- api_key_record = PlatformApiKey.objects.filter(id=key_id, user=user).first()
- if not api_key_record:
- raise ValueError("API Key不存在或无权限访问")
- api_key_record.status = status
- api_key_record.save(update_fields=["status", "update_time"])
- return api_key_record
- def delete_api_key(key_id, user):
- """删除API Key"""
- api_key_record = PlatformApiKey.objects.filter(id=key_id, user=user).first()
- if not api_key_record:
- raise ValueError("API Key不存在或无权限访问")
- api_key_record.delete()
- return True
- def verify_api_key(api_key_str):
- """
- 验证API Key,返回 (user_id, key_id) 或 None
- """
- if not api_key_str:
- return None
- hashed_key = hash_api_key(api_key_str)
- api_key_record = PlatformApiKey.objects.filter(api_key_hash=hashed_key).first()
- if not api_key_record:
- return None
- if api_key_record.status != PlatformApiKeyStatus.ACTIVE:
- return None
- api_key_record.last_used_at = timezone.now()
- api_key_record.save(update_fields=["last_used_at"])
- return (str(api_key_record.user_id), str(api_key_record.id))
|