generate_admin_token.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. #!/usr/bin/env python3
  2. """
  3. 生成长期有效的管理员 Token 脚本
  4. 功能:
  5. 1. 查找管理员用户
  6. 2. 生成一个随机的长期 Token
  7. 3. 将 Token 持久化到数据库的 admin_tokens 表中
  8. 使用方式:
  9. cd backend
  10. python scripts/generate_admin_token.py
  11. 注意:需要在 backend 目录下运行,以确保正确加载配置
  12. """
  13. import sys
  14. import os
  15. # 添加 backend 目录到路径
  16. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  17. from datetime import datetime, timedelta, timezone
  18. import secrets
  19. from config import settings
  20. from database import get_db_connection
  21. def find_admin_user():
  22. """查找管理员用户"""
  23. with get_db_connection() as conn:
  24. cursor = conn.cursor()
  25. cursor.execute("""
  26. SELECT id, username, email, role
  27. FROM users
  28. WHERE role = 'admin'
  29. LIMIT 1
  30. """)
  31. row = cursor.fetchone()
  32. if row:
  33. return {
  34. "id": row["id"],
  35. "username": row["username"],
  36. "email": row["email"],
  37. "role": row["role"]
  38. }
  39. return None
  40. def ensure_admin_tokens_table():
  41. """确保 admin_tokens 表存在"""
  42. with get_db_connection() as conn:
  43. cursor = conn.cursor()
  44. cursor.execute("""
  45. CREATE TABLE IF NOT EXISTS admin_tokens (
  46. id VARCHAR(36) PRIMARY KEY,
  47. user_id VARCHAR(36) NOT NULL,
  48. token VARCHAR(255) NOT NULL UNIQUE,
  49. expires_at TIMESTAMP NOT NULL,
  50. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  51. INDEX idx_admin_tokens_token (token),
  52. INDEX idx_admin_tokens_user_id (user_id)
  53. )
  54. """)
  55. conn.commit()
  56. def create_long_term_token(user_data: dict, years: int = 10) -> str:
  57. """
  58. 创建长期有效的 Token(持久化到数据库)
  59. Args:
  60. user_data: 用户信息字典
  61. years: 有效年数,默认 10 年
  62. Returns:
  63. str: Token 字符串
  64. """
  65. # 生成随机 token
  66. token = f"admin_token_{secrets.token_urlsafe(32)}"
  67. token_id = f"token_{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}_{secrets.token_hex(4)}"
  68. expire = datetime.now(timezone.utc) + timedelta(days=years * 365)
  69. # 持久化到数据库
  70. with get_db_connection() as conn:
  71. cursor = conn.cursor()
  72. cursor.execute("""
  73. INSERT INTO admin_tokens (id, user_id, token, expires_at)
  74. VALUES (%s, %s, %s, %s)
  75. """, (token_id, user_data["id"], token, expire.replace(microsecond=0)))
  76. conn.commit()
  77. return token
  78. def verify_token(token: str) -> dict:
  79. """
  80. 验证 Token 有效性(从数据库查询)
  81. Args:
  82. token: Token 字符串
  83. Returns:
  84. dict: 用户信息字典
  85. Raises:
  86. Exception: Token 无效或已过期
  87. """
  88. with get_db_connection() as conn:
  89. cursor = conn.cursor()
  90. cursor.execute("""
  91. SELECT at.user_id, u.username, u.email, u.role, at.expires_at
  92. FROM admin_tokens at
  93. JOIN users u ON at.user_id = u.id
  94. WHERE at.token = %s AND at.expires_at > %s
  95. """, (token, datetime.now(timezone.utc)))
  96. row = cursor.fetchone()
  97. if not row:
  98. raise Exception("Token 无效或已过期")
  99. return {
  100. "id": row["user_id"],
  101. "username": row["username"],
  102. "email": row["email"],
  103. "role": row["role"],
  104. "expires_at": row["expires_at"]
  105. }
  106. def main():
  107. print("=" * 60)
  108. print("管理员长期 Token 生成工具")
  109. print("=" * 60)
  110. print()
  111. # 确保表存在
  112. print("正在检查数据库表...")
  113. ensure_admin_tokens_table()
  114. print("[OK] 数据库表检查完成")
  115. print()
  116. # 查找管理员用户
  117. print("正在查找管理员用户...")
  118. admin_user = find_admin_user()
  119. if not admin_user:
  120. print("\n[ERROR] 未找到管理员用户!")
  121. print("\n请先创建管理员用户,可以使用以下方式:")
  122. print(" 1. 运行 python create_test_user.py 创建测试用户")
  123. print(" 2. 或通过 API 注册用户后在数据库中将 role 改为 admin")
  124. sys.exit(1)
  125. print(f"[OK] 找到管理员用户:{admin_user['username']} ({admin_user['email']})")
  126. print()
  127. # 生成 Token
  128. print("正在生成 10 年有效期的 Token...")
  129. token = create_long_term_token(admin_user, years=10)
  130. print("[OK] Token 生成成功!")
  131. print()
  132. # 验证 Token
  133. print("正在验证 Token 有效性...")
  134. try:
  135. user_info = verify_token(token)
  136. print(f"[OK] Token 验证通过!")
  137. print(f" - 用户 ID: {user_info['id']}")
  138. print(f" - 用户名:{user_info['username']}")
  139. print(f" - 角色:{user_info['role']}")
  140. print(f" - 过期时间:{user_info['expires_at'].strftime('%Y-%m-%d %H:%M:%S')}")
  141. except Exception as e:
  142. print(f"[ERROR] Token 验证失败:{str(e)}")
  143. sys.exit(1)
  144. print()
  145. print("=" * 60)
  146. print("生成的管理员 Token (请妥善保管):")
  147. print("=" * 60)
  148. print()
  149. print(token)
  150. print()
  151. print("=" * 60)
  152. print()
  153. print("使用方式:")
  154. print(" 在 HTTP 请求头中添加:")
  155. print(f" Authorization: Bearer {token[:50]}...")
  156. print()
  157. print("注意:")
  158. print(" - 此 Token 持久化存储在数据库中")
  159. print(" - 服务器重启后 Token 仍然有效")
  160. print(" - Token 过期时间:10 年")
  161. print()
  162. if __name__ == "__main__":
  163. main()