| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- """
- 数据库迁移管理脚本
- 执行方式:
- # 执行所有待执行的迁移
- python scripts/run_migrations.py
-
- # 执行特定迁移
- python scripts/run_migrations.py --migration migrate_add_soft_delete
-
- # 查看迁移状态
- python scripts/run_migrations.py --status
-
- # 回滚最后一次迁移
- python scripts/run_migrations.py --rollback
- """
- import sys
- import os
- from datetime import datetime
- # 添加项目根目录到 Python 路径
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from sqlalchemy import text
- from app.database import SessionLocal
- # 迁移列表(按执行顺序)
- MIGRATIONS = [
- {
- 'name': 'migrate_add_soft_delete',
- 'description': '为图像翻译表添加软删除字段',
- 'module': 'scripts.migrate_add_soft_delete',
- 'version': '20260122_001'
- },
- # 在这里添加新的迁移...
- ]
- def ensure_migration_table():
- """确保迁移记录表存在"""
- db = SessionLocal()
- try:
- db.execute(text("""
- CREATE TABLE IF NOT EXISTS aigcspace.schema_migrations (
- id SERIAL PRIMARY KEY,
- version VARCHAR(50) UNIQUE NOT NULL,
- name VARCHAR(255) NOT NULL,
- description TEXT,
- executed_at TIMESTAMP NOT NULL DEFAULT NOW(),
- execution_time_ms INTEGER,
- status VARCHAR(20) NOT NULL DEFAULT 'success'
- )
- """))
- db.commit()
- print("✓ 迁移记录表已就绪")
- except Exception as e:
- print(f"✗ 创建迁移记录表失败: {e}")
- db.rollback()
- raise
- finally:
- db.close()
- def is_migration_executed(version):
- """检查迁移是否已执行"""
- db = SessionLocal()
- try:
- result = db.execute(
- text("SELECT COUNT(*) FROM aigcspace.schema_migrations WHERE version = :version"),
- {"version": version}
- )
- count = result.scalar()
- return count > 0
- finally:
- db.close()
- def record_migration(version, name, description, execution_time_ms, status='success'):
- """记录迁移执行"""
- db = SessionLocal()
- try:
- db.execute(
- text("""
- INSERT INTO aigcspace.schema_migrations
- (version, name, description, execution_time_ms, status)
- VALUES (:version, :name, :description, :execution_time_ms, :status)
- """),
- {
- "version": version,
- "name": name,
- "description": description,
- "execution_time_ms": execution_time_ms,
- "status": status
- }
- )
- db.commit()
- except Exception as e:
- print(f"✗ 记录迁移失败: {e}")
- db.rollback()
- finally:
- db.close()
- def run_migration(migration_info):
- """执行单个迁移"""
- version = migration_info['version']
- name = migration_info['name']
- description = migration_info['description']
- module_name = migration_info['module']
-
- print("\n" + "=" * 70)
- print(f"执行迁移: {name}")
- print(f"版本: {version}")
- print(f"描述: {description}")
- print("=" * 70)
-
- # 检查是否已执行
- if is_migration_executed(version):
- print(f"⚠ 迁移 {version} 已执行,跳过")
- return True
-
- try:
- # 动态导入迁移模块
- module = __import__(module_name, fromlist=['run_migration'])
-
- # 执行迁移
- start_time = datetime.now()
- success = module.run_migration()
- end_time = datetime.now()
-
- execution_time_ms = int((end_time - start_time).total_seconds() * 1000)
-
- if success:
- # 记录成功的迁移
- record_migration(version, name, description, execution_time_ms, 'success')
- print(f"\n✓ 迁移 {name} 执行成功 (耗时: {execution_time_ms}ms)")
- return True
- else:
- # 记录失败的迁移
- record_migration(version, name, description, execution_time_ms, 'failed')
- print(f"\n✗ 迁移 {name} 执行失败")
- return False
-
- except Exception as e:
- print(f"\n✗ 迁移 {name} 执行异常: {e}")
- import traceback
- traceback.print_exc()
-
- # 记录失败
- record_migration(version, name, description, 0, 'error')
- return False
- def run_all_migrations():
- """执行所有待执行的迁移"""
- print("\n" + "=" * 70)
- print("开始执行数据库迁移")
- print("=" * 70)
-
- # 确保迁移表存在
- ensure_migration_table()
-
- # 统计
- total = len(MIGRATIONS)
- executed = 0
- skipped = 0
- failed = 0
-
- # 执行每个迁移
- for migration in MIGRATIONS:
- if is_migration_executed(migration['version']):
- skipped += 1
- print(f"\n⚠ 跳过已执行的迁移: {migration['name']} ({migration['version']})")
- continue
-
- success = run_migration(migration)
- if success:
- executed += 1
- else:
- failed += 1
- print(f"\n✗ 迁移失败,停止执行后续迁移")
- break
-
- # 输出总结
- print("\n" + "=" * 70)
- print("迁移执行总结")
- print("=" * 70)
- print(f"总迁移数: {total}")
- print(f"已执行: {executed}")
- print(f"已跳过: {skipped}")
- print(f"失败: {failed}")
- print("=" * 70)
-
- return failed == 0
- def show_migration_status():
- """显示迁移状态"""
- print("\n" + "=" * 70)
- print("数据库迁移状态")
- print("=" * 70)
-
- # 确保迁移表存在
- ensure_migration_table()
-
- db = SessionLocal()
- try:
- # 查询已执行的迁移
- result = db.execute(text("""
- SELECT version, name, description, executed_at, execution_time_ms, status
- FROM aigcspace.schema_migrations
- ORDER BY executed_at DESC
- """))
-
- executed_migrations = {row[0]: row for row in result.fetchall()}
-
- print(f"\n待执行的迁移:")
- print("-" * 70)
-
- pending_count = 0
- for migration in MIGRATIONS:
- version = migration['version']
- if version not in executed_migrations:
- pending_count += 1
- print(f" [{version}] {migration['name']}")
- print(f" 描述: {migration['description']}")
- print()
-
- if pending_count == 0:
- print(" 无待执行的迁移")
-
- print(f"\n已执行的迁移:")
- print("-" * 70)
-
- if executed_migrations:
- for migration in MIGRATIONS:
- version = migration['version']
- if version in executed_migrations:
- row = executed_migrations[version]
- status_icon = "✓" if row[5] == 'success' else "✗"
- print(f" {status_icon} [{version}] {row[1]}")
- print(f" 执行时间: {row[3]}")
- print(f" 耗时: {row[4]}ms")
- print(f" 状态: {row[5]}")
- print()
- else:
- print(" 无已执行的迁移")
-
- print("=" * 70)
-
- finally:
- db.close()
- def rollback_last_migration():
- """回滚最后一次迁移"""
- print("\n" + "=" * 70)
- print("警告:准备回滚最后一次迁移")
- print("=" * 70)
-
- db = SessionLocal()
- try:
- # 查询最后一次迁移
- result = db.execute(text("""
- SELECT version, name, description
- FROM aigcspace.schema_migrations
- ORDER BY executed_at DESC
- LIMIT 1
- """))
-
- last_migration = result.fetchone()
- if not last_migration:
- print("\n没有可回滚的迁移")
- return
-
- version, name, description = last_migration
- print(f"\n最后一次迁移:")
- print(f" 版本: {version}")
- print(f" 名称: {name}")
- print(f" 描述: {description}")
-
- confirm = input("\n确定要回滚此迁移吗?(yes/no): ")
- if confirm.lower() != 'yes':
- print("已取消回滚")
- return
-
- # 查找对应的迁移配置
- migration_info = None
- for m in MIGRATIONS:
- if m['version'] == version:
- migration_info = m
- break
-
- if not migration_info:
- print(f"\n✗ 未找到迁移配置: {version}")
- return
-
- # 动态导入迁移模块
- module_name = migration_info['module']
- module = __import__(module_name, fromlist=['rollback_migration'])
-
- if not hasattr(module, 'rollback_migration'):
- print(f"\n✗ 迁移 {name} 不支持回滚")
- return
-
- # 执行回滚
- module.rollback_migration()
-
- # 删除迁移记录
- db.execute(
- text("DELETE FROM aigcspace.schema_migrations WHERE version = :version"),
- {"version": version}
- )
- db.commit()
-
- print(f"\n✓ 迁移 {name} 已回滚")
-
- except Exception as e:
- print(f"\n✗ 回滚失败: {e}")
- import traceback
- traceback.print_exc()
- db.rollback()
-
- finally:
- db.close()
- if __name__ == "__main__":
- import argparse
-
- parser = argparse.ArgumentParser(description='数据库迁移管理工具')
- parser.add_argument(
- '--migration',
- type=str,
- help='执行指定的迁移'
- )
- parser.add_argument(
- '--status',
- action='store_true',
- help='显示迁移状态'
- )
- parser.add_argument(
- '--rollback',
- action='store_true',
- help='回滚最后一次迁移'
- )
-
- args = parser.parse_args()
-
- try:
- if args.status:
- show_migration_status()
- elif args.rollback:
- rollback_last_migration()
- elif args.migration:
- # 执行指定迁移
- migration_info = None
- for m in MIGRATIONS:
- if m['name'] == args.migration:
- migration_info = m
- break
-
- if migration_info:
- ensure_migration_table()
- success = run_migration(migration_info)
- sys.exit(0 if success else 1)
- else:
- print(f"✗ 未找到迁移: {args.migration}")
- sys.exit(1)
- else:
- # 执行所有待执行的迁移
- success = run_all_migrations()
- sys.exit(0 if success else 1)
-
- except KeyboardInterrupt:
- print("\n\n用户中断执行")
- sys.exit(1)
- except Exception as e:
- print(f"\n✗ 执行失败: {e}")
- import traceback
- traceback.print_exc()
- sys.exit(1)
|