#!/usr/bin/env python """ SQLite 到 MySQL 数据迁移脚本 用法: python scripts/migrate_sqlite_to_mysql.py [sqlite_db_path] 示例: python scripts/migrate_sqlite_to_mysql.py annotation_platform.db """ import sys import os import sqlite3 # 添加 backend 目录到路径 sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) import pymysql from config import settings def get_sqlite_connection(db_path: str): """获取 SQLite 连接""" conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row return conn def get_mysql_connection(): """获取 MySQL 连接""" return pymysql.connect( host=settings.MYSQL_HOST, port=settings.MYSQL_PORT, user=settings.MYSQL_USER, password=settings.MYSQL_PASSWORD, database=settings.MYSQL_DATABASE, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) def migrate_table(sqlite_conn, mysql_conn, table_name: str, columns: list): """迁移单个表的数据""" sqlite_cursor = sqlite_conn.cursor() mysql_cursor = mysql_conn.cursor() # 查询 SQLite 数据 sqlite_cursor.execute(f"SELECT * FROM {table_name}") rows = sqlite_cursor.fetchall() if not rows: print(f" {table_name}: 无数据") return 0 # 构建 INSERT 语句 placeholders = ', '.join(['%s'] * len(columns)) columns_str = ', '.join(columns) insert_sql = f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders})" # 批量插入 count = 0 for row in rows: try: values = tuple(row[col] for col in columns) mysql_cursor.execute(insert_sql, values) count += 1 except pymysql.err.IntegrityError as e: if e.args[0] == 1062: # Duplicate entry print(f" 跳过重复记录: {row['id']}") else: raise mysql_conn.commit() print(f" {table_name}: 迁移 {count} 条记录") return count def main(): # 获取 SQLite 数据库路径 if len(sys.argv) > 1: sqlite_path = sys.argv[1] else: sqlite_path = os.path.join(os.path.dirname(__file__), '..', 'annotation_platform.db') if not os.path.exists(sqlite_path): print(f"错误: SQLite 数据库不存在: {sqlite_path}") sys.exit(1) print(f"源数据库: {sqlite_path}") print(f"目标数据库: {settings.MYSQL_HOST}:{settings.MYSQL_PORT}/{settings.MYSQL_DATABASE}") print() # 连接数据库 sqlite_conn = get_sqlite_connection(sqlite_path) mysql_conn = get_mysql_connection() try: # 表结构和列定义 tables = { 'users': ['id', 'username', 'email', 'password_hash', 'role', 'oauth_provider', 'oauth_id', 'created_at', 'updated_at'], 'projects': ['id', 'name', 'description', 'config', 'created_at'], 'tasks': ['id', 'project_id', 'name', 'data', 'status', 'assigned_to', 'created_at'], 'annotations': ['id', 'task_id', 'user_id', 'result', 'created_at', 'updated_at'], } print("开始迁移...") total = 0 # 按顺序迁移(考虑外键约束) for table_name in ['users', 'projects', 'tasks', 'annotations']: columns = tables[table_name] count = migrate_table(sqlite_conn, mysql_conn, table_name, columns) total += count print() print(f"迁移完成!共迁移 {total} 条记录") except Exception as e: print(f"迁移失败: {e}") mysql_conn.rollback() sys.exit(1) finally: sqlite_conn.close() mysql_conn.close() if __name__ == '__main__': main()