| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 场景模块数据库迁移执行脚本
- 用途:执行 migrate_scene_module.sql 迁移脚本
- """
- import pymysql
- import yaml
- import os
- import sys
- def load_config():
- """加载数据库配置"""
- script_dir = os.path.dirname(os.path.abspath(__file__))
-
- # 优先使用同目录下的 config.yaml
- config_path = os.path.join(script_dir, 'config.yaml')
- if not os.path.exists(config_path):
- config_path = os.path.join(script_dir, 'config.example.yaml')
-
- if not os.path.exists(config_path):
- raise FileNotFoundError(f"配置文件不存在: {config_path}")
-
- with open(config_path, 'r', encoding='utf-8') as f:
- config = yaml.safe_load(f)
-
- return config['database']
- def execute_migration():
- """执行迁移脚本"""
- try:
- # 加载数据库配置
- db_config = load_config()
-
- print("=" * 60)
- print("场景模块数据库迁移工具")
- print("=" * 60)
- print(f"数据库地址: {db_config['host']}:{db_config['port']}")
- print(f"数据库名称: {db_config['database']}")
- print(f"用户名: {db_config['user']}")
- print("=" * 60)
-
- # 读取迁移脚本
- sql_file = os.path.join(os.path.dirname(__file__), 'migrate_scene_module.sql')
-
- if not os.path.exists(sql_file):
- print(f"错误: 迁移脚本文件不存在: {sql_file}")
- return False
-
- with open(sql_file, 'r', encoding='utf-8') as f:
- sql_content = f.read()
-
- print(f"\n读取迁移脚本: {sql_file}")
- print(f"脚本大小: {len(sql_content)} 字节\n")
-
- # 连接数据库
- print("正在连接数据库...")
- connection = pymysql.connect(
- host=db_config['host'],
- port=db_config['port'],
- user=db_config['user'],
- password=db_config['password'],
- database=db_config['database'],
- charset='utf8mb4'
- )
-
- print("数据库连接成功!\n")
-
- try:
- cursor = connection.cursor()
-
- # 分割SQL语句(按分号分割,但跳过注释和存储过程)
- statements = []
- current_statement = []
- in_delimiter_block = False
-
- for line in sql_content.split('\n'):
- stripped = line.strip()
-
- # 跳过空行和注释
- if not stripped or stripped.startswith('--'):
- continue
-
- # 检测DELIMITER块
- if 'DELIMITER' in stripped.upper():
- in_delimiter_block = not in_delimiter_block
- continue
-
- current_statement.append(line)
-
- # 如果不在DELIMITER块中,遇到分号就分割
- if not in_delimiter_block and stripped.endswith(';'):
- stmt = '\n'.join(current_statement)
- if stmt.strip():
- statements.append(stmt)
- current_statement = []
-
- # 添加最后一条语句
- if current_statement:
- stmt = '\n'.join(current_statement)
- if stmt.strip():
- statements.append(stmt)
-
- print(f"共解析出 {len(statements)} 条SQL语句\n")
- print("开始执行迁移...\n")
-
- # 执行每条语句
- success_count = 0
- error_count = 0
-
- for i, statement in enumerate(statements, 1):
- try:
- # 跳过SELECT验证语句的输出
- if statement.strip().upper().startswith('SELECT'):
- cursor.execute(statement)
- results = cursor.fetchall()
- if results and len(results) > 0:
- print(f"[{i}/{len(statements)}] 验证查询结果:")
- for row in results:
- print(f" {row}")
- else:
- cursor.execute(statement)
- print(f"[{i}/{len(statements)}] 执行成功")
-
- success_count += 1
-
- except Exception as e:
- error_count += 1
- print(f"[{i}/{len(statements)}] 执行失败: {str(e)}")
- # 某些语句失败可能是因为表或字段已存在,继续执行
- continue
-
- # 提交事务
- connection.commit()
-
- print("\n" + "=" * 60)
- print("迁移执行完成!")
- print("=" * 60)
- print(f"成功: {success_count} 条")
- print(f"失败: {error_count} 条")
- print("=" * 60)
-
- # 验证结果
- print("\n验证迁移结果:")
- print("-" * 60)
-
- # 检查 scene_template 表
- cursor.execute("""
- SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'scene_template'
- """)
- scene_template_exists = cursor.fetchone()[0] > 0
- print(f"✓ scene_template 表: {'已创建' if scene_template_exists else '未创建'}")
-
- # 检查 recognition_record 新增字段
- cursor.execute("""
- SELECT COLUMN_NAME
- FROM INFORMATION_SCHEMA.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'recognition_record'
- AND COLUMN_NAME IN ('scene_type', 'hazard_count', 'current_step', 'hazard_details')
- """)
- new_columns = [row[0] for row in cursor.fetchall()]
- print(f"✓ recognition_record 新增字段: {', '.join(new_columns) if new_columns else '无'}")
-
- print("-" * 60)
-
- return True
-
- finally:
- cursor.close()
- connection.close()
- print("\n数据库连接已关闭")
-
- except FileNotFoundError as e:
- print(f"错误: 配置文件不存在 - {e}")
- return False
- except pymysql.Error as e:
- print(f"数据库错误: {e}")
- return False
- except Exception as e:
- print(f"执行错误: {e}")
- import traceback
- traceback.print_exc()
- return False
- if __name__ == '__main__':
- success = execute_migration()
- sys.exit(0 if success else 1)
|