""" 平台API Key服务层 提供平台API Key的CRUD业务逻辑处理 需求: 7.1, 7.2, 7.3, 7.6, 9.4 """ from datetime import datetime, timedelta from typing import List, Optional, Tuple from sqlalchemy.orm import Session from fastapi import HTTPException from app.models.platform_api_key import PlatformApiKey from app.services.crypto_utils import ( generate_platform_api_key, hash_api_key ) class PlatformApiKeyService: """平台API Key业务服务类""" # 每用户最大API Key数量限制 MAX_API_KEYS_PER_USER = 5 def __init__(self, db: Session): self.db = db def create_api_key( self, user_id: str, name: Optional[str] = None, key_type: str = "public" ) -> dict: """ 创建API Key 生成以 "sk-aigc-" 为前缀、总长度48字符的密钥 返回完整密钥(仅此一次) Args: user_id: 用户ID name: 备注名称(可选) key_type: 密钥类型:public(公钥,访问云端模型)/ local(本地私钥,访问本地模型) Returns: 包含完整密钥的字典: { "id": int, "api_key": str, # 完整密钥(仅此一次显示) "api_key_prefix": str, # 显示前缀 "name": str, "status": str, "key_type": str, "created_at": datetime } Raises: HTTPException: 超过数量限制时抛出400错误 需求: 7.1, 7.2, 7.5 新增: 支持设置密钥类型 """ # 检查用户API Key数量限制(需求: 7.4) active_count = self._get_user_active_key_count(user_id) if active_count >= self.MAX_API_KEYS_PER_USER: raise HTTPException( status_code=400, detail=f"已达到API Key数量上限(最多{self.MAX_API_KEYS_PER_USER}个有效密钥)" ) # 生成API Key(需求: 7.1) full_key, display_prefix = generate_platform_api_key() # 对API Key进行SHA256哈希存储(需求: 7.7) hashed_key = hash_api_key(full_key) # 创建API Key记录 api_key_record = PlatformApiKey( user_id=user_id, api_key=hashed_key, api_key_prefix=display_prefix, name=name, status="active", key_type=key_type ) self.db.add(api_key_record) self.db.commit() self.db.refresh(api_key_record) # 返回包含完整密钥的响应(仅此一次显示,需求: 7.2) return { "id": api_key_record.id, "api_key": full_key, # 完整密钥仅在创建时返回 "api_key_prefix": display_prefix, "name": api_key_record.name, "status": api_key_record.status, "key_type": api_key_record.key_type, "created_at": api_key_record.created_at } def get_user_api_keys(self, user_id: str) -> List[PlatformApiKey]: """ 获取用户的API Key列表 返回用户所有API Key,仅显示前缀(脱敏显示) Args: user_id: 用户ID Returns: 用户的API Key列表(不包含完整密钥) 需求: 7.3 """ return self.db.query(PlatformApiKey).filter( PlatformApiKey.user_id == user_id ).order_by(PlatformApiKey.created_at.desc()).all() def update_api_key_status( self, key_id: int, user_id: str, status: str ) -> PlatformApiKey: """ 更新API Key状态(启用/禁用) Args: key_id: API Key ID user_id: 用户ID(用于权限验证) status: 新状态("active" 或 "disabled") Returns: 更新后的API Key对象 Raises: HTTPException: API Key不存在或无权限时抛出错误 需求: 7.6 """ # 验证状态值 if status not in ("active", "disabled"): raise HTTPException( status_code=400, detail="状态值无效,必须是 'active' 或 'disabled'" ) # 查询API Key并验证权限 api_key_record = self.db.query(PlatformApiKey).filter( PlatformApiKey.id == key_id, PlatformApiKey.user_id == user_id ).first() if not api_key_record: raise HTTPException( status_code=404, detail="API Key不存在或无权限访问" ) # 更新状态 api_key_record.status = status self.db.commit() self.db.refresh(api_key_record) return api_key_record def delete_api_key(self, key_id: int, user_id: str) -> bool: """ 删除API Key Args: key_id: API Key ID user_id: 用户ID(用于权限验证) Returns: 删除是否成功 Raises: HTTPException: API Key不存在或无权限时抛出错误 需求: 7.6 """ # 查询API Key并验证权限 api_key_record = self.db.query(PlatformApiKey).filter( PlatformApiKey.id == key_id, PlatformApiKey.user_id == user_id ).first() if not api_key_record: raise HTTPException( status_code=404, detail="API Key不存在或无权限访问" ) self.db.delete(api_key_record) self.db.commit() return True def _get_user_active_key_count(self, user_id: str) -> int: """ 获取用户有效API Key数量 Args: user_id: 用户ID Returns: 有效API Key数量 """ return self.db.query(PlatformApiKey).filter( PlatformApiKey.user_id == user_id, PlatformApiKey.status == "active" ).count() def get_api_key_by_id(self, key_id: int, user_id: str) -> PlatformApiKey: """ 根据ID获取API Key Args: key_id: API Key ID user_id: 用户ID(用于权限验证) Returns: API Key对象 Raises: HTTPException: API Key不存在或无权限时抛出错误 """ api_key_record = self.db.query(PlatformApiKey).filter( PlatformApiKey.id == key_id, PlatformApiKey.user_id == user_id ).first() if not api_key_record: raise HTTPException( status_code=404, detail="API Key不存在或无权限访问" ) return api_key_record def verify_api_key(self, api_key: str) -> Optional[Tuple[str, int]]: """ 验证API Key,返回(user_id, key_id) 对传入的API Key进行SHA256哈希后在数据库中查找匹配记录, 验证状态是否为active,成功后限频更新last_used_at时间 性能优化(2026-05-15): - 高并发场景下,多人共用同一 API Key 会导致每次请求都触发 UPDATE last_used_at,对该行造成大量写锁竞争。 - 改为限频更新:距离上次更新超过 60 秒才更新,否则跳过。 - 用单条 UPDATE 配合 WHERE 条件实现,避免 SELECT-then-UPDATE 的竞态。 Args: api_key: 需要验证的API Key明文 Returns: 验证成功返回 (user_id, key_id) 元组 验证失败返回 None 需求: 9.4 """ if not api_key: return None # 对API Key进行SHA256哈希 hashed_key = hash_api_key(api_key) # 在数据库中查找匹配的记录 api_key_record = self.db.query(PlatformApiKey).filter( PlatformApiKey.api_key == hashed_key ).first() # 未找到记录 if not api_key_record: return None # 检查状态是否为active if api_key_record.status != "active": return None # 限频更新 last_used_at:超过 60 秒才更新,避免热点行的写锁竞争 try: now = datetime.now() last = api_key_record.last_used_at if last is None or (now - last).total_seconds() > 60: # 用条件 UPDATE 避免与其他请求的写锁竞争 from sqlalchemy import update stmt = ( update(PlatformApiKey) .where(PlatformApiKey.id == api_key_record.id) .where( (PlatformApiKey.last_used_at.is_(None)) | (PlatformApiKey.last_used_at < now - timedelta(seconds=60)) ) .values(last_used_at=now) ) self.db.execute(stmt) self.db.commit() except Exception: # last_used_at 更新失败不影响验证结果 try: self.db.rollback() except Exception: pass return (api_key_record.user_id, api_key_record.id)