| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- #!/usr/bin/env python3
- """
- 生成长期有效的管理员 Token 脚本
- 功能:
- 1. 查找管理员用户
- 2. 生成一个随机的长期 Token
- 3. 将 Token 持久化到数据库的 admin_tokens 表中
- 使用方式:
- cd backend
- python scripts/generate_admin_token.py
- 注意:需要在 backend 目录下运行,以确保正确加载配置
- """
- import sys
- import os
- # 添加 backend 目录到路径
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from datetime import datetime, timedelta, timezone
- import secrets
- from config import settings
- from database import get_db_connection
- def find_admin_user():
- """查找管理员用户"""
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("""
- SELECT id, username, email, role
- FROM users
- WHERE role = 'admin'
- LIMIT 1
- """)
- row = cursor.fetchone()
- if row:
- return {
- "id": row["id"],
- "username": row["username"],
- "email": row["email"],
- "role": row["role"]
- }
- return None
- def ensure_admin_tokens_table():
- """确保 admin_tokens 表存在"""
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS admin_tokens (
- id VARCHAR(36) PRIMARY KEY,
- user_id VARCHAR(36) NOT NULL,
- token VARCHAR(255) NOT NULL UNIQUE,
- expires_at TIMESTAMP NOT NULL,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- INDEX idx_admin_tokens_token (token),
- INDEX idx_admin_tokens_user_id (user_id)
- )
- """)
- conn.commit()
- def create_long_term_token(user_data: dict, years: int = 10) -> str:
- """
- 创建长期有效的 Token(持久化到数据库)
- Args:
- user_data: 用户信息字典
- years: 有效年数,默认 10 年
- Returns:
- str: Token 字符串
- """
- # 生成随机 token
- token = f"admin_token_{secrets.token_urlsafe(32)}"
- token_id = f"token_{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}_{secrets.token_hex(4)}"
- expire = datetime.now(timezone.utc) + timedelta(days=years * 365)
- # 持久化到数据库
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("""
- INSERT INTO admin_tokens (id, user_id, token, expires_at)
- VALUES (%s, %s, %s, %s)
- """, (token_id, user_data["id"], token, expire.replace(microsecond=0)))
- conn.commit()
- return token
- def verify_token(token: str) -> dict:
- """
- 验证 Token 有效性(从数据库查询)
- Args:
- token: Token 字符串
- Returns:
- dict: 用户信息字典
- Raises:
- Exception: Token 无效或已过期
- """
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("""
- SELECT at.user_id, u.username, u.email, u.role, at.expires_at
- FROM admin_tokens at
- JOIN users u ON at.user_id = u.id
- WHERE at.token = %s AND at.expires_at > %s
- """, (token, datetime.now(timezone.utc)))
- row = cursor.fetchone()
- if not row:
- raise Exception("Token 无效或已过期")
- return {
- "id": row["user_id"],
- "username": row["username"],
- "email": row["email"],
- "role": row["role"],
- "expires_at": row["expires_at"]
- }
- def main():
- print("=" * 60)
- print("管理员长期 Token 生成工具")
- print("=" * 60)
- print()
- # 确保表存在
- print("正在检查数据库表...")
- ensure_admin_tokens_table()
- print("[OK] 数据库表检查完成")
- print()
- # 查找管理员用户
- print("正在查找管理员用户...")
- admin_user = find_admin_user()
- if not admin_user:
- print("\n[ERROR] 未找到管理员用户!")
- print("\n请先创建管理员用户,可以使用以下方式:")
- print(" 1. 运行 python create_test_user.py 创建测试用户")
- print(" 2. 或通过 API 注册用户后在数据库中将 role 改为 admin")
- sys.exit(1)
- print(f"[OK] 找到管理员用户:{admin_user['username']} ({admin_user['email']})")
- print()
- # 生成 Token
- print("正在生成 10 年有效期的 Token...")
- token = create_long_term_token(admin_user, years=10)
- print("[OK] Token 生成成功!")
- print()
- # 验证 Token
- print("正在验证 Token 有效性...")
- try:
- user_info = verify_token(token)
- print(f"[OK] Token 验证通过!")
- print(f" - 用户 ID: {user_info['id']}")
- print(f" - 用户名:{user_info['username']}")
- print(f" - 角色:{user_info['role']}")
- print(f" - 过期时间:{user_info['expires_at'].strftime('%Y-%m-%d %H:%M:%S')}")
- except Exception as e:
- print(f"[ERROR] Token 验证失败:{str(e)}")
- sys.exit(1)
- print()
- print("=" * 60)
- print("生成的管理员 Token (请妥善保管):")
- print("=" * 60)
- print()
- print(token)
- print()
- print("=" * 60)
- print()
- print("使用方式:")
- print(" 在 HTTP 请求头中添加:")
- print(f" Authorization: Bearer {token[:50]}...")
- print()
- print("注意:")
- print(" - 此 Token 持久化存储在数据库中")
- print(" - 服务器重启后 Token 仍然有效")
- print(" - Token 过期时间:10 年")
- print()
- if __name__ == "__main__":
- main()
|