| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- """API Key 管理服务。
- 功能:
- - 创建 API Key(格式 sk-xxxxx)
- - 列出用户的 API Key
- - 吊销 API Key
- - 验证 API Key
- """
- import secrets
- import uuid
- from datetime import datetime
- from typing import Any
- from sqlalchemy import select, update
- from app.core.db import ApiKeyModel, DeployTaskModel, async_session
- from app.core.logging import logger
- def _generate_api_key() -> str:
- """生成 sk- 前缀的 API Key,类似 OpenAI 格式。"""
- return f"sk-{secrets.token_urlsafe(32)}"
- async def create_api_key(user_id: str, name: str = "default") -> dict[str, Any]:
- """创建新的 API Key,返回完整 key(仅展示一次)。"""
- key = _generate_api_key()
- api_key = ApiKeyModel(
- id=str(uuid.uuid4()),
- user_id=user_id,
- key=key,
- name=name,
- status="active",
- )
- async with async_session() as session:
- session.add(api_key)
- await session.commit()
- logger.info(f"API key created: user={user_id} name={name}")
- return {
- "id": api_key.id,
- "key": key, # 完整 key,只在创建时返回一次
- "name": api_key.name,
- "created_at": api_key.created_at.isoformat() if api_key.created_at else None,
- }
- async def list_api_keys(user_id: str) -> list[dict[str, Any]]:
- """列出用户的所有 API Key(隐藏 key 中间部分)。"""
- async with async_session() as session:
- result = await session.execute(
- select(ApiKeyModel)
- .where(ApiKeyModel.user_id == user_id)
- .order_by(ApiKeyModel.created_at.desc())
- )
- keys = result.scalars().all()
- items = []
- for k in keys:
- # 隐藏中间部分:显示前 7 位 + **** + 后 4 位
- masked = k.key[:7] + "****" + k.key[-4:] if len(k.key) > 11 else "****"
- items.append({
- "id": k.id,
- "key": masked,
- "name": k.name,
- "status": k.status,
- "last_used_at": k.last_used_at.isoformat() if k.last_used_at else None,
- "created_at": k.created_at.isoformat() if k.created_at else None,
- })
- return items
- async def revoke_api_key(key_id: str, user_id: str) -> dict[str, Any]:
- """吊销指定 API Key。"""
- async with async_session() as session:
- result = await session.execute(
- select(ApiKeyModel).where(
- ApiKeyModel.id == key_id,
- ApiKeyModel.user_id == user_id,
- )
- )
- record = result.scalar_one_or_none()
- if not record:
- return {"error": "API Key 不存在"}
- record.status = "revoked"
- await session.commit()
- logger.info(f"API key revoked: id={key_id} user={user_id}")
- return {"id": key_id, "status": "revoked"}
- async def validate_api_key(key: str) -> dict[str, Any] | None:
- """验证 API Key,返回 {user_id, key_id} 或 None。"""
- async with async_session() as session:
- result = await session.execute(
- select(ApiKeyModel).where(
- ApiKeyModel.key == key,
- ApiKeyModel.status == "active",
- )
- )
- record = result.scalar_one_or_none()
- if not record:
- return None
- # 更新最后使用时间
- record.last_used_at = datetime.utcnow()
- await session.commit()
- return {"user_id": record.user_id, "key_id": record.id}
- async def check_deploy_ownership(task_id: str, user_id: str) -> bool:
- """检查用户是否拥有指定的部署任务。"""
- async with async_session() as session:
- result = await session.execute(
- select(DeployTaskModel).where(
- DeployTaskModel.id == task_id,
- DeployTaskModel.user_id == user_id,
- )
- )
- return result.scalar_one_or_none() is not None
|