||
- #!/usr/bin/env python3
- """
- 简化的数据库初始化脚本
- """
- import sys
- import os
- # 添加项目根目录到Python路径
- sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))
- # 加载环境变量
- from dotenv import load_dotenv
- load_dotenv()
- import pymysql
- from urllib.parse import urlparse
- import uuid
- from datetime import datetime
- def get_db_config():
- """获取数据库配置"""
- database_url = os.getenv('DATABASE_URL', '')
- parsed = urlparse(database_url)
-
- return {
- 'host': parsed.hostname or 'localhost',
- 'port': parsed.port or 3306,
- 'user': parsed.username or 'root',
- 'password': parsed.password or '',
- 'database': parsed.path[1:] if parsed.path else 'sso_db',
- 'charset': 'utf8mb4'
- }
- def hash_password_simple(password):
- """简单的密码哈希"""
- import hashlib
- import secrets
-
- # 生成盐值
- salt = secrets.token_hex(16)
-
- # 使用SHA256哈希
- password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
-
- return f"sha256${salt}${password_hash}"
- def generate_random_string(length=32):
- """生成随机字符串"""
- import secrets
- import string
- alphabet = string.ascii_letters + string.digits
- return ''.join(secrets.choice(alphabet) for _ in range(length))
- def create_tables(connection):
- """创建数据库表"""
- cursor = connection.cursor()
-
- # 用户表
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS users (
- id VARCHAR(36) PRIMARY KEY COMMENT '用户ID',
- username VARCHAR(50) UNIQUE NOT NULL COMMENT '用户名',
- email VARCHAR(100) UNIQUE NOT NULL COMMENT '邮箱',
- phone VARCHAR(20) UNIQUE COMMENT '手机号',
- password_hash VARCHAR(255) NOT NULL COMMENT '密码哈希',
- avatar_url VARCHAR(500) COMMENT '头像URL',
- is_active BOOLEAN DEFAULT TRUE COMMENT '是否激活',
- is_superuser BOOLEAN DEFAULT FALSE COMMENT '是否超级管理员',
- last_login_at TIMESTAMP NULL COMMENT '最后登录时间',
- last_login_ip VARCHAR(45) COMMENT '最后登录IP',
- failed_login_attempts INT DEFAULT 0 COMMENT '失败登录次数',
- locked_until TIMESTAMP NULL COMMENT '锁定直到时间',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
- is_deleted BOOLEAN DEFAULT FALSE COMMENT '是否删除',
-
- INDEX idx_username (username),
- INDEX idx_email (email),
- INDEX idx_phone (phone),
- INDEX idx_created_at (created_at)
- ) COMMENT='用户表'
- """)
-
- # 应用表
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS apps (
- id VARCHAR(36) PRIMARY KEY COMMENT '应用ID',
- name VARCHAR(100) NOT NULL COMMENT '应用名称',
- app_key VARCHAR(100) UNIQUE NOT NULL COMMENT '应用Key',
- app_secret VARCHAR(255) NOT NULL COMMENT '应用Secret',
- description TEXT COMMENT '应用描述',
- icon_url VARCHAR(500) COMMENT '应用图标',
- redirect_uris JSON NOT NULL COMMENT '回调URL列表',
- scope JSON COMMENT '权限范围',
- is_active BOOLEAN DEFAULT TRUE COMMENT '是否激活',
- is_trusted BOOLEAN DEFAULT FALSE COMMENT '是否受信任应用',
- access_token_expires INT DEFAULT 7200 COMMENT '访问令牌过期时间(秒)',
- refresh_token_expires INT DEFAULT 2592000 COMMENT '刷新令牌过期时间(秒)',
- created_by VARCHAR(36) COMMENT '创建者ID',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
- is_deleted BOOLEAN DEFAULT FALSE COMMENT '是否删除',
-
- INDEX idx_app_key (app_key),
- INDEX idx_created_at (created_at)
- ) COMMENT='应用表'
- """)
-
- # 访问令牌表
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS oauth_access_tokens (
- id VARCHAR(36) PRIMARY KEY COMMENT '令牌ID',
- user_id VARCHAR(36) NOT NULL COMMENT '用户ID',
- app_id VARCHAR(36) COMMENT '应用ID',
- token VARCHAR(512) UNIQUE NOT NULL COMMENT '访问令牌',
- refresh_token VARCHAR(512) UNIQUE COMMENT '刷新令牌',
- token_type VARCHAR(50) DEFAULT 'Bearer' COMMENT '令牌类型',
- scope VARCHAR(500) COMMENT '权限范围',
- expires_at TIMESTAMP NOT NULL COMMENT '过期时间',
- revoked BOOLEAN DEFAULT FALSE COMMENT '是否撤销',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
- last_used_at TIMESTAMP NULL COMMENT '最后使用时间',
- is_deleted BOOLEAN DEFAULT FALSE COMMENT '是否删除',
-
- INDEX idx_token (token),
- INDEX idx_refresh_token (refresh_token),
- INDEX idx_user_app (user_id, app_id),
- INDEX idx_expires_at (expires_at)
- ) COMMENT='访问令牌表'
- """)
-
- # 授权码表
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS oauth_authorization_codes (
- id VARCHAR(36) PRIMARY KEY COMMENT '授权码ID',
- user_id VARCHAR(36) NOT NULL COMMENT '用户ID',
- app_id VARCHAR(36) NOT NULL COMMENT '应用ID',
- code VARCHAR(100) UNIQUE NOT NULL COMMENT '授权码',
- redirect_uri VARCHAR(500) NOT NULL COMMENT '回调URL',
- scope VARCHAR(500) COMMENT '权限范围',
- expires_at TIMESTAMP NOT NULL COMMENT '过期时间',
- used BOOLEAN DEFAULT FALSE COMMENT '是否已使用',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
- is_deleted BOOLEAN DEFAULT FALSE COMMENT '是否删除',
-
- INDEX idx_code (code),
- INDEX idx_user_app (user_id, app_id),
- INDEX idx_expires_at (expires_at)
- ) COMMENT='授权码表'
- """)
-
- # 登录日志表
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS login_logs (
- id VARCHAR(36) PRIMARY KEY COMMENT '日志ID',
- user_id VARCHAR(36) COMMENT '用户ID',
- username VARCHAR(50) NOT NULL COMMENT '用户名',
- login_type VARCHAR(20) COMMENT '登录方式',
- ip_address VARCHAR(45) COMMENT 'IP地址',
- user_agent TEXT COMMENT '用户代理',
- success BOOLEAN DEFAULT FALSE COMMENT '是否成功',
- failure_reason VARCHAR(200) COMMENT '失败原因',
- login_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '登录时间',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
- is_deleted BOOLEAN DEFAULT FALSE COMMENT '是否删除',
-
- INDEX idx_user_id (user_id),
- INDEX idx_username (username),
- INDEX idx_login_at (login_at),
- INDEX idx_ip_address (ip_address)
- ) COMMENT='登录日志表'
- """)
-
- connection.commit()
- print("✅ 数据库表创建成功")
- def create_default_data(connection):
- """创建默认数据"""
- cursor = connection.cursor()
-
- # 检查是否已有管理员用户
- cursor.execute("SELECT COUNT(*) FROM users WHERE username = 'admin'")
- if cursor.fetchone()[0] > 0:
- print("⚠️ 管理员用户已存在,跳过创建")
-
- # 获取现有的测试应用信息
- cursor.execute("SELECT app_key, app_secret FROM apps WHERE name = '测试应用' LIMIT 1")
- result = cursor.fetchone()
- if result:
- return result[0], result[1]
- else:
- # 如果没有测试应用,创建一个
- app_key = generate_random_string(32)
- app_secret = generate_random_string(64)
- return app_key, app_secret
-
- # 创建管理员用户
- admin_id = str(uuid.uuid4())
- admin_password = "Admin123456"
- password_hash = hash_password_simple(admin_password)
-
- cursor.execute("""
- INSERT INTO users (id, username, email, password_hash, is_active, is_superuser, created_at, updated_at, is_deleted)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
- """, (admin_id, "admin", "admin@example.com", password_hash, True, True, datetime.now(), datetime.now(), False))
-
- # 创建测试应用
- app_id = str(uuid.uuid4())
- app_key = generate_random_string(32)
- app_secret = generate_random_string(64)
-
- cursor.execute("""
- INSERT INTO apps (id, name, app_key, app_secret, description, redirect_uris, scope, is_active, is_trusted,
- access_token_expires, refresh_token_expires, created_by, created_at, updated_at, is_deleted)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """, (
- app_id, "测试应用", app_key, app_secret, "用于测试的默认应用",
- '["http://localhost:3000/callback", "http://localhost:8080/callback", "http://localhost:8001/auth/callback"]',
- '["profile", "email"]', True, True, 7200, 2592000, admin_id,
- datetime.now(), datetime.now(), False
- ))
-
- connection.commit()
-
- print("✅ 默认数据创建成功")
- print(f"管理员账号: admin")
- print(f"管理员密码: {admin_password}")
- print(f"测试应用Key: {app_key}")
- print(f"测试应用Secret: {app_secret}")
-
- return app_key, app_secret
- def main():
- """主函数"""
- print("=" * 50)
- print("SSO数据库初始化")
- print("=" * 50)
-
- try:
- # 获取数据库配置
- config = get_db_config()
- print(f"连接数据库: {config['host']}:{config['port']}/{config['database']}")
-
- # 连接数据库
- connection = pymysql.connect(**config)
- print("✅ 数据库连接成功")
-
- # 创建表
- create_tables(connection)
-
- # 创建默认数据
- result = create_default_data(connection)
- if result:
- app_key, app_secret = result
- else:
- app_key, app_secret = "demo_key", "demo_secret"
-
- # 关闭连接
- connection.close()
-
- print("\n" + "=" * 50)
- print("🎉 数据库初始化完成!")
- print("\n下一步:")
- print("1. 启动后端服务: cd src && python -m app.main")
- print("2. 启动前端服务: cd ../sso-frontend && npm run dev")
- print(f"3. 配置子系统: CLIENT_ID={app_key}")
- print(f" CLIENT_SECRET={app_secret}")
-
- except Exception as e:
- print(f"❌ 数据库初始化失败: {e}")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|