#!/usr/bin/env python3 """ 简化的数据库初始化脚本 """ import sys import os # 添加项目根目录到Python路径 sys.path.append(os.path.join(os.path.dirname(__file__), 'src')) # 加载环境变量 from dotenv import load_dotenv load_dotenv() import pymysql from urllib.parse import urlparse import uuid from datetime import datetime def get_db_config(): """获取数据库配置""" database_url = os.getenv('DATABASE_URL', '') parsed = urlparse(database_url) return { 'host': parsed.hostname or 'localhost', 'port': parsed.port or 3306, 'user': parsed.username or 'root', 'password': parsed.password or '', 'database': parsed.path[1:] if parsed.path else 'sso_db', 'charset': 'utf8mb4' } def hash_password_simple(password): """简单的密码哈希""" import hashlib import secrets # 生成盐值 salt = secrets.token_hex(16) # 使用SHA256哈希 password_hash = hashlib.sha256((password + salt).encode()).hexdigest() return f"sha256${salt}${password_hash}" def generate_random_string(length=32): """生成随机字符串""" import secrets import string alphabet = string.ascii_letters + string.digits return ''.join(secrets.choice(alphabet) for _ in range(length)) def create_tables(connection): """创建数据库表""" cursor = connection.cursor() # 用户表 cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id VARCHAR(36) PRIMARY KEY COMMENT '用户ID', username VARCHAR(50) UNIQUE NOT NULL COMMENT '用户名', email VARCHAR(100) UNIQUE NOT NULL COMMENT '邮箱', phone VARCHAR(20) UNIQUE COMMENT '手机号', password_hash VARCHAR(255) NOT NULL COMMENT '密码哈希', avatar_url VARCHAR(500) COMMENT '头像URL', is_active BOOLEAN DEFAULT TRUE COMMENT '是否激活', is_superuser BOOLEAN DEFAULT FALSE COMMENT '是否超级管理员', last_login_at TIMESTAMP NULL COMMENT '最后登录时间', last_login_ip VARCHAR(45) COMMENT '最后登录IP', failed_login_attempts INT DEFAULT 0 COMMENT '失败登录次数', locked_until TIMESTAMP NULL COMMENT '锁定直到时间', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', is_deleted BOOLEAN DEFAULT FALSE COMMENT '是否删除', INDEX idx_username (username), INDEX idx_email (email), INDEX idx_phone (phone), INDEX idx_created_at (created_at) ) COMMENT='用户表' """) # 应用表 cursor.execute(""" CREATE TABLE IF NOT EXISTS apps ( id VARCHAR(36) PRIMARY KEY COMMENT '应用ID', name VARCHAR(100) NOT NULL COMMENT '应用名称', app_key VARCHAR(100) UNIQUE NOT NULL COMMENT '应用Key', app_secret VARCHAR(255) NOT NULL COMMENT '应用Secret', description TEXT COMMENT '应用描述', icon_url VARCHAR(500) COMMENT '应用图标', redirect_uris JSON NOT NULL COMMENT '回调URL列表', scope JSON COMMENT '权限范围', is_active BOOLEAN DEFAULT TRUE COMMENT '是否激活', is_trusted BOOLEAN DEFAULT FALSE COMMENT '是否受信任应用', access_token_expires INT DEFAULT 7200 COMMENT '访问令牌过期时间(秒)', refresh_token_expires INT DEFAULT 2592000 COMMENT '刷新令牌过期时间(秒)', created_by VARCHAR(36) COMMENT '创建者ID', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', is_deleted BOOLEAN DEFAULT FALSE COMMENT '是否删除', INDEX idx_app_key (app_key), INDEX idx_created_at (created_at) ) COMMENT='应用表' """) # 访问令牌表 cursor.execute(""" CREATE TABLE IF NOT EXISTS oauth_access_tokens ( id VARCHAR(36) PRIMARY KEY COMMENT '令牌ID', user_id VARCHAR(36) NOT NULL COMMENT '用户ID', app_id VARCHAR(36) COMMENT '应用ID', token VARCHAR(512) UNIQUE NOT NULL COMMENT '访问令牌', refresh_token VARCHAR(512) UNIQUE COMMENT '刷新令牌', token_type VARCHAR(50) DEFAULT 'Bearer' COMMENT '令牌类型', scope VARCHAR(500) COMMENT '权限范围', expires_at TIMESTAMP NOT NULL COMMENT '过期时间', revoked BOOLEAN DEFAULT FALSE COMMENT '是否撤销', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', last_used_at TIMESTAMP NULL COMMENT '最后使用时间', is_deleted BOOLEAN DEFAULT FALSE COMMENT '是否删除', INDEX idx_token (token), INDEX idx_refresh_token (refresh_token), INDEX idx_user_app (user_id, app_id), INDEX idx_expires_at (expires_at) ) COMMENT='访问令牌表' """) # 授权码表 cursor.execute(""" CREATE TABLE IF NOT EXISTS oauth_authorization_codes ( id VARCHAR(36) PRIMARY KEY COMMENT '授权码ID', user_id VARCHAR(36) NOT NULL COMMENT '用户ID', app_id VARCHAR(36) NOT NULL COMMENT '应用ID', code VARCHAR(100) UNIQUE NOT NULL COMMENT '授权码', redirect_uri VARCHAR(500) NOT NULL COMMENT '回调URL', scope VARCHAR(500) COMMENT '权限范围', expires_at TIMESTAMP NOT NULL COMMENT '过期时间', used BOOLEAN DEFAULT FALSE COMMENT '是否已使用', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', is_deleted BOOLEAN DEFAULT FALSE COMMENT '是否删除', INDEX idx_code (code), INDEX idx_user_app (user_id, app_id), INDEX idx_expires_at (expires_at) ) COMMENT='授权码表' """) # 登录日志表 cursor.execute(""" CREATE TABLE IF NOT EXISTS login_logs ( id VARCHAR(36) PRIMARY KEY COMMENT '日志ID', user_id VARCHAR(36) COMMENT '用户ID', username VARCHAR(50) NOT NULL COMMENT '用户名', login_type VARCHAR(20) COMMENT '登录方式', ip_address VARCHAR(45) COMMENT 'IP地址', user_agent TEXT COMMENT '用户代理', success BOOLEAN DEFAULT FALSE COMMENT '是否成功', failure_reason VARCHAR(200) COMMENT '失败原因', login_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '登录时间', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', is_deleted BOOLEAN DEFAULT FALSE COMMENT '是否删除', INDEX idx_user_id (user_id), INDEX idx_username (username), INDEX idx_login_at (login_at), INDEX idx_ip_address (ip_address) ) COMMENT='登录日志表' """) connection.commit() print("✅ 数据库表创建成功") def create_default_data(connection): """创建默认数据""" cursor = connection.cursor() # 检查是否已有管理员用户 cursor.execute("SELECT COUNT(*) FROM users WHERE username = 'admin'") if cursor.fetchone()[0] > 0: print("⚠️ 管理员用户已存在,跳过创建") # 获取现有的测试应用信息 cursor.execute("SELECT app_key, app_secret FROM apps WHERE name = '测试应用' LIMIT 1") result = cursor.fetchone() if result: return result[0], result[1] else: # 如果没有测试应用,创建一个 app_key = generate_random_string(32) app_secret = generate_random_string(64) return app_key, app_secret # 创建管理员用户 admin_id = str(uuid.uuid4()) admin_password = "Admin123456" password_hash = hash_password_simple(admin_password) cursor.execute(""" INSERT INTO users (id, username, email, password_hash, is_active, is_superuser, created_at, updated_at, is_deleted) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, (admin_id, "admin", "admin@example.com", password_hash, True, True, datetime.now(), datetime.now(), False)) # 创建测试应用 app_id = str(uuid.uuid4()) app_key = generate_random_string(32) app_secret = generate_random_string(64) cursor.execute(""" INSERT INTO apps (id, name, app_key, app_secret, description, redirect_uris, scope, is_active, is_trusted, access_token_expires, refresh_token_expires, created_by, created_at, updated_at, is_deleted) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( app_id, "测试应用", app_key, app_secret, "用于测试的默认应用", '["http://localhost:3000/callback", "http://localhost:8080/callback", "http://localhost:8001/auth/callback"]', '["profile", "email"]', True, True, 7200, 2592000, admin_id, datetime.now(), datetime.now(), False )) connection.commit() print("✅ 默认数据创建成功") print(f"管理员账号: admin") print(f"管理员密码: {admin_password}") print(f"测试应用Key: {app_key}") print(f"测试应用Secret: {app_secret}") return app_key, app_secret def main(): """主函数""" print("=" * 50) print("SSO数据库初始化") print("=" * 50) try: # 获取数据库配置 config = get_db_config() print(f"连接数据库: {config['host']}:{config['port']}/{config['database']}") # 连接数据库 connection = pymysql.connect(**config) print("✅ 数据库连接成功") # 创建表 create_tables(connection) # 创建默认数据 result = create_default_data(connection) if result: app_key, app_secret = result else: app_key, app_secret = "demo_key", "demo_secret" # 关闭连接 connection.close() print("\n" + "=" * 50) print("🎉 数据库初始化完成!") print("\n下一步:") print("1. 启动后端服务: cd src && python -m app.main") print("2. 启动前端服务: cd ../sso-frontend && npm run dev") print(f"3. 配置子系统: CLIENT_ID={app_key}") print(f" CLIENT_SECRET={app_secret}") except Exception as e: print(f"❌ 数据库初始化失败: {e}") sys.exit(1) if __name__ == "__main__": main()