run_043_parsed_pricing_migration.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. #!/usr/bin/env python3
  2. """
  3. 仅执行 043_create_parsed_pricing_tables.sql
  4. 用法:
  5. python run_043_parsed_pricing_migration.py
  6. python run_043_parsed_pricing_migration.py --schema aigcspace
  7. """
  8. import sys
  9. from pathlib import Path
  10. from sqlalchemy import create_engine, text
  11. from sqlalchemy.exc import SQLAlchemyError
  12. from dotenv import load_dotenv
  13. # 确保可导入项目模块
  14. PROJECT_ROOT = Path(__file__).resolve().parents[1]
  15. if str(PROJECT_ROOT) not in sys.path:
  16. sys.path.insert(0, str(PROJECT_ROOT))
  17. from app.database import DATABASE_URL
  18. def load_env() -> None:
  19. env_path = PROJECT_ROOT / ".env"
  20. load_dotenv(dotenv_path=env_path)
  21. def main() -> None:
  22. import argparse
  23. load_env()
  24. parser = argparse.ArgumentParser(description="执行 043_create_parsed_pricing_tables.sql")
  25. parser.add_argument("--schema", default="aigcspace", help="schema 名称")
  26. args = parser.parse_args()
  27. migration_path = PROJECT_ROOT / "migrations" / "043_create_parsed_pricing_tables.sql"
  28. if not migration_path.exists():
  29. raise SystemExit(f"未找到迁移文件: {migration_path}")
  30. sql = migration_path.read_text(encoding="utf-8")
  31. if not sql.strip():
  32. print("迁移文件为空,跳过。")
  33. return
  34. engine = create_engine(DATABASE_URL)
  35. try:
  36. with engine.begin() as conn:
  37. conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {args.schema}"))
  38. conn.execute(text(sql))
  39. print("迁移执行完成。")
  40. except SQLAlchemyError as exc:
  41. print(f"迁移执行失败: {exc}")
  42. raise SystemExit(1)
  43. if __name__ == "__main__":
  44. main()