""" 数据库迁移管理脚本 执行方式: # 执行所有待执行的迁移 python scripts/run_migrations.py # 执行特定迁移 python scripts/run_migrations.py --migration migrate_add_soft_delete # 查看迁移状态 python scripts/run_migrations.py --status # 回滚最后一次迁移 python scripts/run_migrations.py --rollback """ import sys import os from datetime import datetime # 添加项目根目录到 Python 路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from sqlalchemy import text from app.database import SessionLocal # 迁移列表(按执行顺序) MIGRATIONS = [ { 'name': 'migrate_add_soft_delete', 'description': '为图像翻译表添加软删除字段', 'module': 'scripts.migrate_add_soft_delete', 'version': '20260122_001' }, # 在这里添加新的迁移... ] def ensure_migration_table(): """确保迁移记录表存在""" db = SessionLocal() try: db.execute(text(""" CREATE TABLE IF NOT EXISTS aigcspace.schema_migrations ( id SERIAL PRIMARY KEY, version VARCHAR(50) UNIQUE NOT NULL, name VARCHAR(255) NOT NULL, description TEXT, executed_at TIMESTAMP NOT NULL DEFAULT NOW(), execution_time_ms INTEGER, status VARCHAR(20) NOT NULL DEFAULT 'success' ) """)) db.commit() print("✓ 迁移记录表已就绪") except Exception as e: print(f"✗ 创建迁移记录表失败: {e}") db.rollback() raise finally: db.close() def is_migration_executed(version): """检查迁移是否已执行""" db = SessionLocal() try: result = db.execute( text("SELECT COUNT(*) FROM aigcspace.schema_migrations WHERE version = :version"), {"version": version} ) count = result.scalar() return count > 0 finally: db.close() def record_migration(version, name, description, execution_time_ms, status='success'): """记录迁移执行""" db = SessionLocal() try: db.execute( text(""" INSERT INTO aigcspace.schema_migrations (version, name, description, execution_time_ms, status) VALUES (:version, :name, :description, :execution_time_ms, :status) """), { "version": version, "name": name, "description": description, "execution_time_ms": execution_time_ms, "status": status } ) db.commit() except Exception as e: print(f"✗ 记录迁移失败: {e}") db.rollback() finally: db.close() def run_migration(migration_info): """执行单个迁移""" version = migration_info['version'] name = migration_info['name'] description = migration_info['description'] module_name = migration_info['module'] print("\n" + "=" * 70) print(f"执行迁移: {name}") print(f"版本: {version}") print(f"描述: {description}") print("=" * 70) # 检查是否已执行 if is_migration_executed(version): print(f"⚠ 迁移 {version} 已执行,跳过") return True try: # 动态导入迁移模块 module = __import__(module_name, fromlist=['run_migration']) # 执行迁移 start_time = datetime.now() success = module.run_migration() end_time = datetime.now() execution_time_ms = int((end_time - start_time).total_seconds() * 1000) if success: # 记录成功的迁移 record_migration(version, name, description, execution_time_ms, 'success') print(f"\n✓ 迁移 {name} 执行成功 (耗时: {execution_time_ms}ms)") return True else: # 记录失败的迁移 record_migration(version, name, description, execution_time_ms, 'failed') print(f"\n✗ 迁移 {name} 执行失败") return False except Exception as e: print(f"\n✗ 迁移 {name} 执行异常: {e}") import traceback traceback.print_exc() # 记录失败 record_migration(version, name, description, 0, 'error') return False def run_all_migrations(): """执行所有待执行的迁移""" print("\n" + "=" * 70) print("开始执行数据库迁移") print("=" * 70) # 确保迁移表存在 ensure_migration_table() # 统计 total = len(MIGRATIONS) executed = 0 skipped = 0 failed = 0 # 执行每个迁移 for migration in MIGRATIONS: if is_migration_executed(migration['version']): skipped += 1 print(f"\n⚠ 跳过已执行的迁移: {migration['name']} ({migration['version']})") continue success = run_migration(migration) if success: executed += 1 else: failed += 1 print(f"\n✗ 迁移失败,停止执行后续迁移") break # 输出总结 print("\n" + "=" * 70) print("迁移执行总结") print("=" * 70) print(f"总迁移数: {total}") print(f"已执行: {executed}") print(f"已跳过: {skipped}") print(f"失败: {failed}") print("=" * 70) return failed == 0 def show_migration_status(): """显示迁移状态""" print("\n" + "=" * 70) print("数据库迁移状态") print("=" * 70) # 确保迁移表存在 ensure_migration_table() db = SessionLocal() try: # 查询已执行的迁移 result = db.execute(text(""" SELECT version, name, description, executed_at, execution_time_ms, status FROM aigcspace.schema_migrations ORDER BY executed_at DESC """)) executed_migrations = {row[0]: row for row in result.fetchall()} print(f"\n待执行的迁移:") print("-" * 70) pending_count = 0 for migration in MIGRATIONS: version = migration['version'] if version not in executed_migrations: pending_count += 1 print(f" [{version}] {migration['name']}") print(f" 描述: {migration['description']}") print() if pending_count == 0: print(" 无待执行的迁移") print(f"\n已执行的迁移:") print("-" * 70) if executed_migrations: for migration in MIGRATIONS: version = migration['version'] if version in executed_migrations: row = executed_migrations[version] status_icon = "✓" if row[5] == 'success' else "✗" print(f" {status_icon} [{version}] {row[1]}") print(f" 执行时间: {row[3]}") print(f" 耗时: {row[4]}ms") print(f" 状态: {row[5]}") print() else: print(" 无已执行的迁移") print("=" * 70) finally: db.close() def rollback_last_migration(): """回滚最后一次迁移""" print("\n" + "=" * 70) print("警告:准备回滚最后一次迁移") print("=" * 70) db = SessionLocal() try: # 查询最后一次迁移 result = db.execute(text(""" SELECT version, name, description FROM aigcspace.schema_migrations ORDER BY executed_at DESC LIMIT 1 """)) last_migration = result.fetchone() if not last_migration: print("\n没有可回滚的迁移") return version, name, description = last_migration print(f"\n最后一次迁移:") print(f" 版本: {version}") print(f" 名称: {name}") print(f" 描述: {description}") confirm = input("\n确定要回滚此迁移吗?(yes/no): ") if confirm.lower() != 'yes': print("已取消回滚") return # 查找对应的迁移配置 migration_info = None for m in MIGRATIONS: if m['version'] == version: migration_info = m break if not migration_info: print(f"\n✗ 未找到迁移配置: {version}") return # 动态导入迁移模块 module_name = migration_info['module'] module = __import__(module_name, fromlist=['rollback_migration']) if not hasattr(module, 'rollback_migration'): print(f"\n✗ 迁移 {name} 不支持回滚") return # 执行回滚 module.rollback_migration() # 删除迁移记录 db.execute( text("DELETE FROM aigcspace.schema_migrations WHERE version = :version"), {"version": version} ) db.commit() print(f"\n✓ 迁移 {name} 已回滚") except Exception as e: print(f"\n✗ 回滚失败: {e}") import traceback traceback.print_exc() db.rollback() finally: db.close() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='数据库迁移管理工具') parser.add_argument( '--migration', type=str, help='执行指定的迁移' ) parser.add_argument( '--status', action='store_true', help='显示迁移状态' ) parser.add_argument( '--rollback', action='store_true', help='回滚最后一次迁移' ) args = parser.parse_args() try: if args.status: show_migration_status() elif args.rollback: rollback_last_migration() elif args.migration: # 执行指定迁移 migration_info = None for m in MIGRATIONS: if m['name'] == args.migration: migration_info = m break if migration_info: ensure_migration_table() success = run_migration(migration_info) sys.exit(0 if success else 1) else: print(f"✗ 未找到迁移: {args.migration}") sys.exit(1) else: # 执行所有待执行的迁移 success = run_all_migrations() sys.exit(0 if success else 1) except KeyboardInterrupt: print("\n\n用户中断执行") sys.exit(1) except Exception as e: print(f"\n✗ 执行失败: {e}") import traceback traceback.print_exc() sys.exit(1)