| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- """
- 平台API Key服务层
- 提供平台API Key的CRUD业务逻辑处理
- 需求: 7.1, 7.2, 7.3, 7.6, 9.4
- """
- from datetime import datetime, timedelta
- from typing import List, Optional, Tuple
- from sqlalchemy.orm import Session
- from fastapi import HTTPException
- from app.models.platform_api_key import PlatformApiKey
- from app.services.crypto_utils import (
- generate_platform_api_key,
- hash_api_key
- )
- class PlatformApiKeyService:
- """平台API Key业务服务类"""
- # 每用户最大API Key数量限制
- MAX_API_KEYS_PER_USER = 5
- def __init__(self, db: Session):
- self.db = db
- def create_api_key(
- self,
- user_id: str,
- name: Optional[str] = None,
- key_type: str = "public"
- ) -> dict:
- """
- 创建API Key
-
- 生成以 "sk-aigc-" 为前缀、总长度48字符的密钥
- 返回完整密钥(仅此一次)
-
- Args:
- user_id: 用户ID
- name: 备注名称(可选)
- key_type: 密钥类型:public(公钥,访问云端模型)/ local(本地私钥,访问本地模型)
-
- Returns:
- 包含完整密钥的字典:
- {
- "id": int,
- "api_key": str, # 完整密钥(仅此一次显示)
- "api_key_prefix": str, # 显示前缀
- "name": str,
- "status": str,
- "key_type": str,
- "created_at": datetime
- }
-
- Raises:
- HTTPException: 超过数量限制时抛出400错误
-
- 需求: 7.1, 7.2, 7.5
- 新增: 支持设置密钥类型
- """
- # 检查用户API Key数量限制(需求: 7.4)
- active_count = self._get_user_active_key_count(user_id)
- if active_count >= self.MAX_API_KEYS_PER_USER:
- raise HTTPException(
- status_code=400,
- detail=f"已达到API Key数量上限(最多{self.MAX_API_KEYS_PER_USER}个有效密钥)"
- )
-
- # 生成API Key(需求: 7.1)
- full_key, display_prefix = generate_platform_api_key()
-
- # 对API Key进行SHA256哈希存储(需求: 7.7)
- hashed_key = hash_api_key(full_key)
-
- # 创建API Key记录
- api_key_record = PlatformApiKey(
- user_id=user_id,
- api_key=hashed_key,
- api_key_prefix=display_prefix,
- name=name,
- status="active",
- key_type=key_type
- )
-
- self.db.add(api_key_record)
- self.db.commit()
- self.db.refresh(api_key_record)
-
- # 返回包含完整密钥的响应(仅此一次显示,需求: 7.2)
- return {
- "id": api_key_record.id,
- "api_key": full_key, # 完整密钥仅在创建时返回
- "api_key_prefix": display_prefix,
- "name": api_key_record.name,
- "status": api_key_record.status,
- "key_type": api_key_record.key_type,
- "created_at": api_key_record.created_at
- }
- def get_user_api_keys(self, user_id: str) -> List[PlatformApiKey]:
- """
- 获取用户的API Key列表
-
- 返回用户所有API Key,仅显示前缀(脱敏显示)
-
- Args:
- user_id: 用户ID
-
- Returns:
- 用户的API Key列表(不包含完整密钥)
-
- 需求: 7.3
- """
- return self.db.query(PlatformApiKey).filter(
- PlatformApiKey.user_id == user_id
- ).order_by(PlatformApiKey.created_at.desc()).all()
- def update_api_key_status(
- self,
- key_id: int,
- user_id: str,
- status: str
- ) -> PlatformApiKey:
- """
- 更新API Key状态(启用/禁用)
-
- Args:
- key_id: API Key ID
- user_id: 用户ID(用于权限验证)
- status: 新状态("active" 或 "disabled")
-
- Returns:
- 更新后的API Key对象
-
- Raises:
- HTTPException: API Key不存在或无权限时抛出错误
-
- 需求: 7.6
- """
- # 验证状态值
- if status not in ("active", "disabled"):
- raise HTTPException(
- status_code=400,
- detail="状态值无效,必须是 'active' 或 'disabled'"
- )
-
- # 查询API Key并验证权限
- api_key_record = self.db.query(PlatformApiKey).filter(
- PlatformApiKey.id == key_id,
- PlatformApiKey.user_id == user_id
- ).first()
-
- if not api_key_record:
- raise HTTPException(
- status_code=404,
- detail="API Key不存在或无权限访问"
- )
-
- # 更新状态
- api_key_record.status = status
-
- self.db.commit()
- self.db.refresh(api_key_record)
-
- return api_key_record
- def delete_api_key(self, key_id: int, user_id: str) -> bool:
- """
- 删除API Key
-
- Args:
- key_id: API Key ID
- user_id: 用户ID(用于权限验证)
-
- Returns:
- 删除是否成功
-
- Raises:
- HTTPException: API Key不存在或无权限时抛出错误
-
- 需求: 7.6
- """
- # 查询API Key并验证权限
- api_key_record = self.db.query(PlatformApiKey).filter(
- PlatformApiKey.id == key_id,
- PlatformApiKey.user_id == user_id
- ).first()
-
- if not api_key_record:
- raise HTTPException(
- status_code=404,
- detail="API Key不存在或无权限访问"
- )
-
- self.db.delete(api_key_record)
- self.db.commit()
-
- return True
- def _get_user_active_key_count(self, user_id: str) -> int:
- """
- 获取用户有效API Key数量
-
- Args:
- user_id: 用户ID
-
- Returns:
- 有效API Key数量
- """
- return self.db.query(PlatformApiKey).filter(
- PlatformApiKey.user_id == user_id,
- PlatformApiKey.status == "active"
- ).count()
- def get_api_key_by_id(self, key_id: int, user_id: str) -> PlatformApiKey:
- """
- 根据ID获取API Key
-
- Args:
- key_id: API Key ID
- user_id: 用户ID(用于权限验证)
-
- Returns:
- API Key对象
-
- Raises:
- HTTPException: API Key不存在或无权限时抛出错误
- """
- api_key_record = self.db.query(PlatformApiKey).filter(
- PlatformApiKey.id == key_id,
- PlatformApiKey.user_id == user_id
- ).first()
-
- if not api_key_record:
- raise HTTPException(
- status_code=404,
- detail="API Key不存在或无权限访问"
- )
-
- return api_key_record
- def verify_api_key(self, api_key: str) -> Optional[Tuple[str, int]]:
- """
- 验证API Key,返回(user_id, key_id)
-
- 对传入的API Key进行SHA256哈希后在数据库中查找匹配记录,
- 验证状态是否为active,成功后限频更新last_used_at时间
-
- 性能优化(2026-05-15):
- - 高并发场景下,多人共用同一 API Key 会导致每次请求都触发
- UPDATE last_used_at,对该行造成大量写锁竞争。
- - 改为限频更新:距离上次更新超过 60 秒才更新,否则跳过。
- - 用单条 UPDATE 配合 WHERE 条件实现,避免 SELECT-then-UPDATE 的竞态。
-
- Args:
- api_key: 需要验证的API Key明文
-
- Returns:
- 验证成功返回 (user_id, key_id) 元组
- 验证失败返回 None
-
- 需求: 9.4
- """
- if not api_key:
- return None
-
- # 对API Key进行SHA256哈希
- hashed_key = hash_api_key(api_key)
-
- # 在数据库中查找匹配的记录
- api_key_record = self.db.query(PlatformApiKey).filter(
- PlatformApiKey.api_key == hashed_key
- ).first()
-
- # 未找到记录
- if not api_key_record:
- return None
-
- # 检查状态是否为active
- if api_key_record.status != "active":
- return None
- # 限频更新 last_used_at:超过 60 秒才更新,避免热点行的写锁竞争
- try:
- now = datetime.now()
- last = api_key_record.last_used_at
- if last is None or (now - last).total_seconds() > 60:
- # 用条件 UPDATE 避免与其他请求的写锁竞争
- from sqlalchemy import update
- stmt = (
- update(PlatformApiKey)
- .where(PlatformApiKey.id == api_key_record.id)
- .where(
- (PlatformApiKey.last_used_at.is_(None))
- | (PlatformApiKey.last_used_at < now - timedelta(seconds=60))
- )
- .values(last_used_at=now)
- )
- self.db.execute(stmt)
- self.db.commit()
- except Exception:
- # last_used_at 更新失败不影响验证结果
- try:
- self.db.rollback()
- except Exception:
- pass
-
- return (api_key_record.user_id, api_key_record.id)
|