api_key_service.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. """API Key 管理服务。
  2. 功能:
  3. - 创建 API Key(格式 sk-xxxxx)
  4. - 列出用户的 API Key
  5. - 吊销 API Key
  6. - 验证 API Key
  7. """
  8. import secrets
  9. import uuid
  10. from datetime import datetime
  11. from typing import Any
  12. from sqlalchemy import select, update
  13. from app.core.db import ApiKeyModel, DeployTaskModel, async_session
  14. from app.core.logging import logger
  15. def _generate_api_key() -> str:
  16. """生成 sk- 前缀的 API Key,类似 OpenAI 格式。"""
  17. return f"sk-{secrets.token_urlsafe(32)}"
  18. async def create_api_key(user_id: str, name: str = "default") -> dict[str, Any]:
  19. """创建新的 API Key,返回完整 key(仅展示一次)。"""
  20. key = _generate_api_key()
  21. api_key = ApiKeyModel(
  22. id=str(uuid.uuid4()),
  23. user_id=user_id,
  24. key=key,
  25. name=name,
  26. status="active",
  27. )
  28. async with async_session() as session:
  29. session.add(api_key)
  30. await session.commit()
  31. logger.info(f"API key created: user={user_id} name={name}")
  32. return {
  33. "id": api_key.id,
  34. "key": key, # 完整 key,只在创建时返回一次
  35. "name": api_key.name,
  36. "created_at": api_key.created_at.isoformat() if api_key.created_at else None,
  37. }
  38. async def list_api_keys(user_id: str) -> list[dict[str, Any]]:
  39. """列出用户的所有 API Key(隐藏 key 中间部分)。"""
  40. async with async_session() as session:
  41. result = await session.execute(
  42. select(ApiKeyModel)
  43. .where(ApiKeyModel.user_id == user_id)
  44. .order_by(ApiKeyModel.created_at.desc())
  45. )
  46. keys = result.scalars().all()
  47. items = []
  48. for k in keys:
  49. # 隐藏中间部分:显示前 7 位 + **** + 后 4 位
  50. masked = k.key[:7] + "****" + k.key[-4:] if len(k.key) > 11 else "****"
  51. items.append({
  52. "id": k.id,
  53. "key": masked,
  54. "name": k.name,
  55. "status": k.status,
  56. "last_used_at": k.last_used_at.isoformat() if k.last_used_at else None,
  57. "created_at": k.created_at.isoformat() if k.created_at else None,
  58. })
  59. return items
  60. async def revoke_api_key(key_id: str, user_id: str) -> dict[str, Any]:
  61. """吊销指定 API Key。"""
  62. async with async_session() as session:
  63. result = await session.execute(
  64. select(ApiKeyModel).where(
  65. ApiKeyModel.id == key_id,
  66. ApiKeyModel.user_id == user_id,
  67. )
  68. )
  69. record = result.scalar_one_or_none()
  70. if not record:
  71. return {"error": "API Key 不存在"}
  72. record.status = "revoked"
  73. await session.commit()
  74. logger.info(f"API key revoked: id={key_id} user={user_id}")
  75. return {"id": key_id, "status": "revoked"}
  76. async def validate_api_key(key: str) -> dict[str, Any] | None:
  77. """验证 API Key,返回 {user_id, key_id} 或 None。"""
  78. async with async_session() as session:
  79. result = await session.execute(
  80. select(ApiKeyModel).where(
  81. ApiKeyModel.key == key,
  82. ApiKeyModel.status == "active",
  83. )
  84. )
  85. record = result.scalar_one_or_none()
  86. if not record:
  87. return None
  88. # 更新最后使用时间
  89. record.last_used_at = datetime.utcnow()
  90. await session.commit()
  91. return {"user_id": record.user_id, "key_id": record.id}
  92. async def check_deploy_ownership(task_id: str, user_id: str) -> bool:
  93. """检查用户是否拥有指定的部署任务。"""
  94. async with async_session() as session:
  95. result = await session.execute(
  96. select(DeployTaskModel).where(
  97. DeployTaskModel.id == task_id,
  98. DeployTaskModel.user_id == user_id,
  99. )
  100. )
  101. return result.scalar_one_or_none() is not None