#!/usr/bin/env python # -*- coding: utf-8 -*- """ 场景模块数据库迁移执行脚本 用途:执行 migrate_scene_module.sql 迁移脚本 """ import pymysql import yaml import os import sys def load_config(): """加载数据库配置""" script_dir = os.path.dirname(os.path.abspath(__file__)) # 优先使用同目录下的 config.yaml config_path = os.path.join(script_dir, 'config.yaml') if not os.path.exists(config_path): config_path = os.path.join(script_dir, 'config.example.yaml') if not os.path.exists(config_path): raise FileNotFoundError(f"配置文件不存在: {config_path}") with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) return config['database'] def execute_migration(): """执行迁移脚本""" try: # 加载数据库配置 db_config = load_config() print("=" * 60) print("场景模块数据库迁移工具") print("=" * 60) print(f"数据库地址: {db_config['host']}:{db_config['port']}") print(f"数据库名称: {db_config['database']}") print(f"用户名: {db_config['user']}") print("=" * 60) # 读取迁移脚本 sql_file = os.path.join(os.path.dirname(__file__), 'migrate_scene_module.sql') if not os.path.exists(sql_file): print(f"错误: 迁移脚本文件不存在: {sql_file}") return False with open(sql_file, 'r', encoding='utf-8') as f: sql_content = f.read() print(f"\n读取迁移脚本: {sql_file}") print(f"脚本大小: {len(sql_content)} 字节\n") # 连接数据库 print("正在连接数据库...") connection = pymysql.connect( host=db_config['host'], port=db_config['port'], user=db_config['user'], password=db_config['password'], database=db_config['database'], charset='utf8mb4' ) print("数据库连接成功!\n") try: cursor = connection.cursor() # 分割SQL语句(按分号分割,但跳过注释和存储过程) statements = [] current_statement = [] in_delimiter_block = False for line in sql_content.split('\n'): stripped = line.strip() # 跳过空行和注释 if not stripped or stripped.startswith('--'): continue # 检测DELIMITER块 if 'DELIMITER' in stripped.upper(): in_delimiter_block = not in_delimiter_block continue current_statement.append(line) # 如果不在DELIMITER块中,遇到分号就分割 if not in_delimiter_block and stripped.endswith(';'): stmt = '\n'.join(current_statement) if stmt.strip(): statements.append(stmt) current_statement = [] # 添加最后一条语句 if current_statement: stmt = '\n'.join(current_statement) if stmt.strip(): statements.append(stmt) print(f"共解析出 {len(statements)} 条SQL语句\n") print("开始执行迁移...\n") # 执行每条语句 success_count = 0 error_count = 0 for i, statement in enumerate(statements, 1): try: # 跳过SELECT验证语句的输出 if statement.strip().upper().startswith('SELECT'): cursor.execute(statement) results = cursor.fetchall() if results and len(results) > 0: print(f"[{i}/{len(statements)}] 验证查询结果:") for row in results: print(f" {row}") else: cursor.execute(statement) print(f"[{i}/{len(statements)}] 执行成功") success_count += 1 except Exception as e: error_count += 1 print(f"[{i}/{len(statements)}] 执行失败: {str(e)}") # 某些语句失败可能是因为表或字段已存在,继续执行 continue # 提交事务 connection.commit() print("\n" + "=" * 60) print("迁移执行完成!") print("=" * 60) print(f"成功: {success_count} 条") print(f"失败: {error_count} 条") print("=" * 60) # 验证结果 print("\n验证迁移结果:") print("-" * 60) # 检查 scene_template 表 cursor.execute(""" SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'scene_template' """) scene_template_exists = cursor.fetchone()[0] > 0 print(f"✓ scene_template 表: {'已创建' if scene_template_exists else '未创建'}") # 检查 recognition_record 新增字段 cursor.execute(""" SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'recognition_record' AND COLUMN_NAME IN ('scene_type', 'hazard_count', 'current_step', 'hazard_details') """) new_columns = [row[0] for row in cursor.fetchall()] print(f"✓ recognition_record 新增字段: {', '.join(new_columns) if new_columns else '无'}") print("-" * 60) return True finally: cursor.close() connection.close() print("\n数据库连接已关闭") except FileNotFoundError as e: print(f"错误: 配置文件不存在 - {e}") return False except pymysql.Error as e: print(f"数据库错误: {e}") return False except Exception as e: print(f"执行错误: {e}") import traceback traceback.print_exc() return False if __name__ == '__main__': success = execute_migration() sys.exit(0 if success else 1)