"""API Key 管理服务。 功能: - 创建 API Key(格式 sk-xxxxx) - 列出用户的 API Key - 吊销 API Key - 验证 API Key """ import secrets import uuid from datetime import datetime from typing import Any from sqlalchemy import select, update from app.core.db import ApiKeyModel, DeployTaskModel, async_session from app.core.logging import logger def _generate_api_key() -> str: """生成 sk- 前缀的 API Key,类似 OpenAI 格式。""" return f"sk-{secrets.token_urlsafe(32)}" async def create_api_key(user_id: str, name: str = "default") -> dict[str, Any]: """创建新的 API Key,返回完整 key(仅展示一次)。""" key = _generate_api_key() api_key = ApiKeyModel( id=str(uuid.uuid4()), user_id=user_id, key=key, name=name, status="active", ) async with async_session() as session: session.add(api_key) await session.commit() logger.info(f"API key created: user={user_id} name={name}") return { "id": api_key.id, "key": key, # 完整 key,只在创建时返回一次 "name": api_key.name, "created_at": api_key.created_at.isoformat() if api_key.created_at else None, } async def list_api_keys(user_id: str) -> list[dict[str, Any]]: """列出用户的所有 API Key(隐藏 key 中间部分)。""" async with async_session() as session: result = await session.execute( select(ApiKeyModel) .where(ApiKeyModel.user_id == user_id) .order_by(ApiKeyModel.created_at.desc()) ) keys = result.scalars().all() items = [] for k in keys: # 隐藏中间部分:显示前 7 位 + **** + 后 4 位 masked = k.key[:7] + "****" + k.key[-4:] if len(k.key) > 11 else "****" items.append({ "id": k.id, "key": masked, "name": k.name, "status": k.status, "last_used_at": k.last_used_at.isoformat() if k.last_used_at else None, "created_at": k.created_at.isoformat() if k.created_at else None, }) return items async def revoke_api_key(key_id: str, user_id: str) -> dict[str, Any]: """吊销指定 API Key。""" async with async_session() as session: result = await session.execute( select(ApiKeyModel).where( ApiKeyModel.id == key_id, ApiKeyModel.user_id == user_id, ) ) record = result.scalar_one_or_none() if not record: return {"error": "API Key 不存在"} record.status = "revoked" await session.commit() logger.info(f"API key revoked: id={key_id} user={user_id}") return {"id": key_id, "status": "revoked"} async def validate_api_key(key: str) -> dict[str, Any] | None: """验证 API Key,返回 {user_id, key_id} 或 None。""" async with async_session() as session: result = await session.execute( select(ApiKeyModel).where( ApiKeyModel.key == key, ApiKeyModel.status == "active", ) ) record = result.scalar_one_or_none() if not record: return None # 更新最后使用时间 record.last_used_at = datetime.utcnow() await session.commit() return {"user_id": record.user_id, "key_id": record.id} async def check_deploy_ownership(task_id: str, user_id: str) -> bool: """检查用户是否拥有指定的部署任务。""" async with async_session() as session: result = await session.execute( select(DeployTaskModel).where( DeployTaskModel.id == task_id, DeployTaskModel.user_id == user_id, ) ) return result.scalar_one_or_none() is not None