migrate_sqlite_to_mysql.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. #!/usr/bin/env python
  2. """
  3. SQLite 到 MySQL 数据迁移脚本
  4. 用法:
  5. python scripts/migrate_sqlite_to_mysql.py [sqlite_db_path]
  6. 示例:
  7. python scripts/migrate_sqlite_to_mysql.py annotation_platform.db
  8. """
  9. import sys
  10. import os
  11. import sqlite3
  12. # 添加 backend 目录到路径
  13. sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
  14. import pymysql
  15. from config import settings
  16. def get_sqlite_connection(db_path: str):
  17. """获取 SQLite 连接"""
  18. conn = sqlite3.connect(db_path)
  19. conn.row_factory = sqlite3.Row
  20. return conn
  21. def get_mysql_connection():
  22. """获取 MySQL 连接"""
  23. return pymysql.connect(
  24. host=settings.MYSQL_HOST,
  25. port=settings.MYSQL_PORT,
  26. user=settings.MYSQL_USER,
  27. password=settings.MYSQL_PASSWORD,
  28. database=settings.MYSQL_DATABASE,
  29. charset='utf8mb4',
  30. cursorclass=pymysql.cursors.DictCursor
  31. )
  32. def migrate_table(sqlite_conn, mysql_conn, table_name: str, columns: list):
  33. """迁移单个表的数据"""
  34. sqlite_cursor = sqlite_conn.cursor()
  35. mysql_cursor = mysql_conn.cursor()
  36. # 查询 SQLite 数据
  37. sqlite_cursor.execute(f"SELECT * FROM {table_name}")
  38. rows = sqlite_cursor.fetchall()
  39. if not rows:
  40. print(f" {table_name}: 无数据")
  41. return 0
  42. # 构建 INSERT 语句
  43. placeholders = ', '.join(['%s'] * len(columns))
  44. columns_str = ', '.join(columns)
  45. insert_sql = f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders})"
  46. # 批量插入
  47. count = 0
  48. for row in rows:
  49. try:
  50. values = tuple(row[col] for col in columns)
  51. mysql_cursor.execute(insert_sql, values)
  52. count += 1
  53. except pymysql.err.IntegrityError as e:
  54. if e.args[0] == 1062: # Duplicate entry
  55. print(f" 跳过重复记录: {row['id']}")
  56. else:
  57. raise
  58. mysql_conn.commit()
  59. print(f" {table_name}: 迁移 {count} 条记录")
  60. return count
  61. def main():
  62. # 获取 SQLite 数据库路径
  63. if len(sys.argv) > 1:
  64. sqlite_path = sys.argv[1]
  65. else:
  66. sqlite_path = os.path.join(os.path.dirname(__file__), '..', 'annotation_platform.db')
  67. if not os.path.exists(sqlite_path):
  68. print(f"错误: SQLite 数据库不存在: {sqlite_path}")
  69. sys.exit(1)
  70. print(f"源数据库: {sqlite_path}")
  71. print(f"目标数据库: {settings.MYSQL_HOST}:{settings.MYSQL_PORT}/{settings.MYSQL_DATABASE}")
  72. print()
  73. # 连接数据库
  74. sqlite_conn = get_sqlite_connection(sqlite_path)
  75. mysql_conn = get_mysql_connection()
  76. try:
  77. # 表结构和列定义
  78. tables = {
  79. 'users': ['id', 'username', 'email', 'password_hash', 'role',
  80. 'oauth_provider', 'oauth_id', 'created_at', 'updated_at'],
  81. 'projects': ['id', 'name', 'description', 'config', 'created_at'],
  82. 'tasks': ['id', 'project_id', 'name', 'data', 'status',
  83. 'assigned_to', 'created_at'],
  84. 'annotations': ['id', 'task_id', 'user_id', 'result',
  85. 'created_at', 'updated_at'],
  86. }
  87. print("开始迁移...")
  88. total = 0
  89. # 按顺序迁移(考虑外键约束)
  90. for table_name in ['users', 'projects', 'tasks', 'annotations']:
  91. columns = tables[table_name]
  92. count = migrate_table(sqlite_conn, mysql_conn, table_name, columns)
  93. total += count
  94. print()
  95. print(f"迁移完成!共迁移 {total} 条记录")
  96. except Exception as e:
  97. print(f"迁移失败: {e}")
  98. mysql_conn.rollback()
  99. sys.exit(1)
  100. finally:
  101. sqlite_conn.close()
  102. mysql_conn.close()
  103. if __name__ == '__main__':
  104. main()