| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- #!/usr/bin/env python
- """
- SQLite 到 MySQL 数据迁移脚本
- 用法:
- python scripts/migrate_sqlite_to_mysql.py [sqlite_db_path]
- 示例:
- python scripts/migrate_sqlite_to_mysql.py annotation_platform.db
- """
- import sys
- import os
- import sqlite3
- # 添加 backend 目录到路径
- sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
- import pymysql
- from config import settings
- def get_sqlite_connection(db_path: str):
- """获取 SQLite 连接"""
- conn = sqlite3.connect(db_path)
- conn.row_factory = sqlite3.Row
- return conn
- def get_mysql_connection():
- """获取 MySQL 连接"""
- return pymysql.connect(
- host=settings.MYSQL_HOST,
- port=settings.MYSQL_PORT,
- user=settings.MYSQL_USER,
- password=settings.MYSQL_PASSWORD,
- database=settings.MYSQL_DATABASE,
- charset='utf8mb4',
- cursorclass=pymysql.cursors.DictCursor
- )
- def migrate_table(sqlite_conn, mysql_conn, table_name: str, columns: list):
- """迁移单个表的数据"""
- sqlite_cursor = sqlite_conn.cursor()
- mysql_cursor = mysql_conn.cursor()
-
- # 查询 SQLite 数据
- sqlite_cursor.execute(f"SELECT * FROM {table_name}")
- rows = sqlite_cursor.fetchall()
-
- if not rows:
- print(f" {table_name}: 无数据")
- return 0
-
- # 构建 INSERT 语句
- placeholders = ', '.join(['%s'] * len(columns))
- columns_str = ', '.join(columns)
- insert_sql = f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders})"
-
- # 批量插入
- count = 0
- for row in rows:
- try:
- values = tuple(row[col] for col in columns)
- mysql_cursor.execute(insert_sql, values)
- count += 1
- except pymysql.err.IntegrityError as e:
- if e.args[0] == 1062: # Duplicate entry
- print(f" 跳过重复记录: {row['id']}")
- else:
- raise
-
- mysql_conn.commit()
- print(f" {table_name}: 迁移 {count} 条记录")
- return count
- def main():
- # 获取 SQLite 数据库路径
- if len(sys.argv) > 1:
- sqlite_path = sys.argv[1]
- else:
- sqlite_path = os.path.join(os.path.dirname(__file__), '..', 'annotation_platform.db')
-
- if not os.path.exists(sqlite_path):
- print(f"错误: SQLite 数据库不存在: {sqlite_path}")
- sys.exit(1)
-
- print(f"源数据库: {sqlite_path}")
- print(f"目标数据库: {settings.MYSQL_HOST}:{settings.MYSQL_PORT}/{settings.MYSQL_DATABASE}")
- print()
-
- # 连接数据库
- sqlite_conn = get_sqlite_connection(sqlite_path)
- mysql_conn = get_mysql_connection()
-
- try:
- # 表结构和列定义
- tables = {
- 'users': ['id', 'username', 'email', 'password_hash', 'role',
- 'oauth_provider', 'oauth_id', 'created_at', 'updated_at'],
- 'projects': ['id', 'name', 'description', 'config', 'created_at'],
- 'tasks': ['id', 'project_id', 'name', 'data', 'status',
- 'assigned_to', 'created_at'],
- 'annotations': ['id', 'task_id', 'user_id', 'result',
- 'created_at', 'updated_at'],
- }
-
- print("开始迁移...")
- total = 0
-
- # 按顺序迁移(考虑外键约束)
- for table_name in ['users', 'projects', 'tasks', 'annotations']:
- columns = tables[table_name]
- count = migrate_table(sqlite_conn, mysql_conn, table_name, columns)
- total += count
-
- print()
- print(f"迁移完成!共迁移 {total} 条记录")
-
- except Exception as e:
- print(f"迁移失败: {e}")
- mysql_conn.rollback()
- sys.exit(1)
- finally:
- sqlite_conn.close()
- mysql_conn.close()
- if __name__ == '__main__':
- main()
|