platform_api_key_service.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. # coding=utf-8
  2. """
  3. 平台API Key服务层
  4. 提供平台API Key的CRUD业务逻辑处理
  5. """
  6. from datetime import datetime
  7. from typing import Optional
  8. from django.utils import timezone
  9. from models_provider.models.platform_api_key import PlatformApiKey, PlatformApiKeyStatus
  10. from models_provider.services.crypto_utils import (
  11. generate_platform_api_key,
  12. hash_api_key,
  13. )
  14. # 每用户最大API Key数量限制
  15. MAX_API_KEYS_PER_USER = 5
  16. def create_api_key(user, name=None):
  17. """
  18. 创建API Key
  19. 生成以 "sk-aigc-" 为前缀、总长度48字符的密钥
  20. 返回完整密钥(仅此一次)
  21. """
  22. active_count = PlatformApiKey.objects.filter(
  23. user=user, status=PlatformApiKeyStatus.ACTIVE
  24. ).count()
  25. if active_count >= MAX_API_KEYS_PER_USER:
  26. raise ValueError(f"已达到API Key数量上限(最多{MAX_API_KEYS_PER_USER}个有效密钥)")
  27. full_key, display_prefix = generate_platform_api_key()
  28. hashed_key = hash_api_key(full_key)
  29. api_key_record = PlatformApiKey.objects.create(
  30. user=user,
  31. api_key_hash=hashed_key,
  32. api_key_prefix=display_prefix,
  33. name=name,
  34. status=PlatformApiKeyStatus.ACTIVE,
  35. )
  36. return {
  37. "id": str(api_key_record.id),
  38. "api_key": full_key,
  39. "api_key_prefix": display_prefix,
  40. "name": api_key_record.name,
  41. "status": api_key_record.status,
  42. "create_time": api_key_record.create_time,
  43. }
  44. def get_user_api_keys(user):
  45. """获取用户的API Key列表(脱敏)"""
  46. return PlatformApiKey.objects.filter(user=user).order_by("-create_time")
  47. def update_api_key_status(key_id, user, status):
  48. """更新API Key状态(启用/禁用)"""
  49. if status not in ("active", "disabled"):
  50. raise ValueError("状态值无效,必须是 'active' 或 'disabled'")
  51. api_key_record = PlatformApiKey.objects.filter(id=key_id, user=user).first()
  52. if not api_key_record:
  53. raise ValueError("API Key不存在或无权限访问")
  54. api_key_record.status = status
  55. api_key_record.save(update_fields=["status", "update_time"])
  56. return api_key_record
  57. def delete_api_key(key_id, user):
  58. """删除API Key"""
  59. api_key_record = PlatformApiKey.objects.filter(id=key_id, user=user).first()
  60. if not api_key_record:
  61. raise ValueError("API Key不存在或无权限访问")
  62. api_key_record.delete()
  63. return True
  64. def verify_api_key(api_key_str):
  65. """
  66. 验证API Key,返回 (user_id, key_id) 或 None
  67. """
  68. if not api_key_str:
  69. return None
  70. hashed_key = hash_api_key(api_key_str)
  71. api_key_record = PlatformApiKey.objects.filter(api_key_hash=hashed_key).first()
  72. if not api_key_record:
  73. return None
  74. if api_key_record.status != PlatformApiKeyStatus.ACTIVE:
  75. return None
  76. api_key_record.last_used_at = timezone.now()
  77. api_key_record.save(update_fields=["last_used_at"])
  78. return (str(api_key_record.user_id), str(api_key_record.id))