platform_api_key_service.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. """
  2. 平台API Key服务层
  3. 提供平台API Key的CRUD业务逻辑处理
  4. 需求: 7.1, 7.2, 7.3, 7.6, 9.4
  5. """
  6. from datetime import datetime, timedelta
  7. from typing import List, Optional, Tuple
  8. from sqlalchemy.orm import Session
  9. from fastapi import HTTPException
  10. from app.models.platform_api_key import PlatformApiKey
  11. from app.services.crypto_utils import (
  12. generate_platform_api_key,
  13. hash_api_key
  14. )
  15. class PlatformApiKeyService:
  16. """平台API Key业务服务类"""
  17. # 每用户最大API Key数量限制
  18. MAX_API_KEYS_PER_USER = 5
  19. def __init__(self, db: Session):
  20. self.db = db
  21. def create_api_key(
  22. self,
  23. user_id: str,
  24. name: Optional[str] = None,
  25. key_type: str = "public"
  26. ) -> dict:
  27. """
  28. 创建API Key
  29. 生成以 "sk-aigc-" 为前缀、总长度48字符的密钥
  30. 返回完整密钥(仅此一次)
  31. Args:
  32. user_id: 用户ID
  33. name: 备注名称(可选)
  34. key_type: 密钥类型:public(公钥,访问云端模型)/ local(本地私钥,访问本地模型)
  35. Returns:
  36. 包含完整密钥的字典:
  37. {
  38. "id": int,
  39. "api_key": str, # 完整密钥(仅此一次显示)
  40. "api_key_prefix": str, # 显示前缀
  41. "name": str,
  42. "status": str,
  43. "key_type": str,
  44. "created_at": datetime
  45. }
  46. Raises:
  47. HTTPException: 超过数量限制时抛出400错误
  48. 需求: 7.1, 7.2, 7.5
  49. 新增: 支持设置密钥类型
  50. """
  51. # 检查用户API Key数量限制(需求: 7.4)
  52. active_count = self._get_user_active_key_count(user_id)
  53. if active_count >= self.MAX_API_KEYS_PER_USER:
  54. raise HTTPException(
  55. status_code=400,
  56. detail=f"已达到API Key数量上限(最多{self.MAX_API_KEYS_PER_USER}个有效密钥)"
  57. )
  58. # 生成API Key(需求: 7.1)
  59. full_key, display_prefix = generate_platform_api_key()
  60. # 对API Key进行SHA256哈希存储(需求: 7.7)
  61. hashed_key = hash_api_key(full_key)
  62. # 创建API Key记录
  63. api_key_record = PlatformApiKey(
  64. user_id=user_id,
  65. api_key=hashed_key,
  66. api_key_prefix=display_prefix,
  67. name=name,
  68. status="active",
  69. key_type=key_type
  70. )
  71. self.db.add(api_key_record)
  72. self.db.commit()
  73. self.db.refresh(api_key_record)
  74. # 返回包含完整密钥的响应(仅此一次显示,需求: 7.2)
  75. return {
  76. "id": api_key_record.id,
  77. "api_key": full_key, # 完整密钥仅在创建时返回
  78. "api_key_prefix": display_prefix,
  79. "name": api_key_record.name,
  80. "status": api_key_record.status,
  81. "key_type": api_key_record.key_type,
  82. "created_at": api_key_record.created_at
  83. }
  84. def get_user_api_keys(self, user_id: str) -> List[PlatformApiKey]:
  85. """
  86. 获取用户的API Key列表
  87. 返回用户所有API Key,仅显示前缀(脱敏显示)
  88. Args:
  89. user_id: 用户ID
  90. Returns:
  91. 用户的API Key列表(不包含完整密钥)
  92. 需求: 7.3
  93. """
  94. return self.db.query(PlatformApiKey).filter(
  95. PlatformApiKey.user_id == user_id
  96. ).order_by(PlatformApiKey.created_at.desc()).all()
  97. def update_api_key_status(
  98. self,
  99. key_id: int,
  100. user_id: str,
  101. status: str
  102. ) -> PlatformApiKey:
  103. """
  104. 更新API Key状态(启用/禁用)
  105. Args:
  106. key_id: API Key ID
  107. user_id: 用户ID(用于权限验证)
  108. status: 新状态("active" 或 "disabled")
  109. Returns:
  110. 更新后的API Key对象
  111. Raises:
  112. HTTPException: API Key不存在或无权限时抛出错误
  113. 需求: 7.6
  114. """
  115. # 验证状态值
  116. if status not in ("active", "disabled"):
  117. raise HTTPException(
  118. status_code=400,
  119. detail="状态值无效,必须是 'active' 或 'disabled'"
  120. )
  121. # 查询API Key并验证权限
  122. api_key_record = self.db.query(PlatformApiKey).filter(
  123. PlatformApiKey.id == key_id,
  124. PlatformApiKey.user_id == user_id
  125. ).first()
  126. if not api_key_record:
  127. raise HTTPException(
  128. status_code=404,
  129. detail="API Key不存在或无权限访问"
  130. )
  131. # 更新状态
  132. api_key_record.status = status
  133. self.db.commit()
  134. self.db.refresh(api_key_record)
  135. return api_key_record
  136. def delete_api_key(self, key_id: int, user_id: str) -> bool:
  137. """
  138. 删除API Key
  139. Args:
  140. key_id: API Key ID
  141. user_id: 用户ID(用于权限验证)
  142. Returns:
  143. 删除是否成功
  144. Raises:
  145. HTTPException: API Key不存在或无权限时抛出错误
  146. 需求: 7.6
  147. """
  148. # 查询API Key并验证权限
  149. api_key_record = self.db.query(PlatformApiKey).filter(
  150. PlatformApiKey.id == key_id,
  151. PlatformApiKey.user_id == user_id
  152. ).first()
  153. if not api_key_record:
  154. raise HTTPException(
  155. status_code=404,
  156. detail="API Key不存在或无权限访问"
  157. )
  158. self.db.delete(api_key_record)
  159. self.db.commit()
  160. return True
  161. def _get_user_active_key_count(self, user_id: str) -> int:
  162. """
  163. 获取用户有效API Key数量
  164. Args:
  165. user_id: 用户ID
  166. Returns:
  167. 有效API Key数量
  168. """
  169. return self.db.query(PlatformApiKey).filter(
  170. PlatformApiKey.user_id == user_id,
  171. PlatformApiKey.status == "active"
  172. ).count()
  173. def get_api_key_by_id(self, key_id: int, user_id: str) -> PlatformApiKey:
  174. """
  175. 根据ID获取API Key
  176. Args:
  177. key_id: API Key ID
  178. user_id: 用户ID(用于权限验证)
  179. Returns:
  180. API Key对象
  181. Raises:
  182. HTTPException: API Key不存在或无权限时抛出错误
  183. """
  184. api_key_record = self.db.query(PlatformApiKey).filter(
  185. PlatformApiKey.id == key_id,
  186. PlatformApiKey.user_id == user_id
  187. ).first()
  188. if not api_key_record:
  189. raise HTTPException(
  190. status_code=404,
  191. detail="API Key不存在或无权限访问"
  192. )
  193. return api_key_record
  194. def verify_api_key(self, api_key: str) -> Optional[Tuple[str, int]]:
  195. """
  196. 验证API Key,返回(user_id, key_id)
  197. 对传入的API Key进行SHA256哈希后在数据库中查找匹配记录,
  198. 验证状态是否为active,成功后限频更新last_used_at时间
  199. 性能优化(2026-05-15):
  200. - 高并发场景下,多人共用同一 API Key 会导致每次请求都触发
  201. UPDATE last_used_at,对该行造成大量写锁竞争。
  202. - 改为限频更新:距离上次更新超过 60 秒才更新,否则跳过。
  203. - 用单条 UPDATE 配合 WHERE 条件实现,避免 SELECT-then-UPDATE 的竞态。
  204. Args:
  205. api_key: 需要验证的API Key明文
  206. Returns:
  207. 验证成功返回 (user_id, key_id) 元组
  208. 验证失败返回 None
  209. 需求: 9.4
  210. """
  211. if not api_key:
  212. return None
  213. # 对API Key进行SHA256哈希
  214. hashed_key = hash_api_key(api_key)
  215. # 在数据库中查找匹配的记录
  216. api_key_record = self.db.query(PlatformApiKey).filter(
  217. PlatformApiKey.api_key == hashed_key
  218. ).first()
  219. # 未找到记录
  220. if not api_key_record:
  221. return None
  222. # 检查状态是否为active
  223. if api_key_record.status != "active":
  224. return None
  225. # 限频更新 last_used_at:超过 60 秒才更新,避免热点行的写锁竞争
  226. try:
  227. now = datetime.now()
  228. last = api_key_record.last_used_at
  229. if last is None or (now - last).total_seconds() > 60:
  230. # 用条件 UPDATE 避免与其他请求的写锁竞争
  231. from sqlalchemy import update
  232. stmt = (
  233. update(PlatformApiKey)
  234. .where(PlatformApiKey.id == api_key_record.id)
  235. .where(
  236. (PlatformApiKey.last_used_at.is_(None))
  237. | (PlatformApiKey.last_used_at < now - timedelta(seconds=60))
  238. )
  239. .values(last_used_at=now)
  240. )
  241. self.db.execute(stmt)
  242. self.db.commit()
  243. except Exception:
  244. # last_used_at 更新失败不影响验证结果
  245. try:
  246. self.db.rollback()
  247. except Exception:
  248. pass
  249. return (api_key_record.user_id, api_key_record.id)