| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- """
- OCR任务表添加软删除字段迁移脚本
- 添加字段:is_deleted(软删除标记)
- 运行: python -m scripts.migrate_ocr_add_is_deleted
- """
- import os
- import sys
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- import psycopg2
- from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
- def get_db_connection():
- """获取数据库连接"""
- return psycopg2.connect(
- host=os.getenv('DB_HOST', 'localhost'),
- port=os.getenv('DB_PORT', '5432'),
- user=os.getenv('DB_USER', 'postgres'),
- password=os.getenv('DB_PASSWORD', ''),
- database=os.getenv('DB_NAME', 'model_square')
- )
- def table_exists(cursor):
- """检查表是否存在"""
- cursor.execute("""
- SELECT EXISTS (
- SELECT FROM information_schema.tables
- WHERE table_schema = 'aigcspace'
- AND table_name = 'ocr_tasks'
- );
- """)
- return cursor.fetchone()[0]
- def column_exists(cursor, column_name):
- """检查列是否存在"""
- cursor.execute("""
- SELECT EXISTS (
- SELECT FROM information_schema.columns
- WHERE table_schema = 'aigcspace'
- AND table_name = 'ocr_tasks'
- AND column_name = %s
- );
- """, (column_name,))
- return cursor.fetchone()[0]
- def index_exists(cursor, index_name):
- """检查索引是否存在"""
- cursor.execute("""
- SELECT EXISTS (
- SELECT FROM pg_indexes
- WHERE schemaname = 'aigcspace'
- AND tablename = 'ocr_tasks'
- AND indexname = %s
- );
- """, (index_name,))
- return cursor.fetchone()[0]
- def migrate():
- """执行迁移"""
- conn = get_db_connection()
- conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
- cursor = conn.cursor()
-
- try:
- if not table_exists(cursor):
- print("❌ 表 aigcspace.ocr_tasks 不存在,请先运行 migrate_ocr.py")
- return
-
- print("开始添加软删除字段到 OCR 任务表...")
-
- # 添加 is_deleted 字段
- if not column_exists(cursor, 'is_deleted'):
- cursor.execute("""
- ALTER TABLE aigcspace.ocr_tasks
- ADD COLUMN is_deleted INTEGER DEFAULT 0 NOT NULL;
- """)
- cursor.execute("""
- COMMENT ON COLUMN aigcspace.ocr_tasks.is_deleted IS '软删除标记:0-未删除,1-已删除';
- """)
- print("✓ 添加字段 is_deleted")
- else:
- print("✓ 字段 is_deleted 已存在")
-
- # 添加索引
- if not index_exists(cursor, 'idx_ocr_tasks_is_deleted'):
- cursor.execute("""
- CREATE INDEX idx_ocr_tasks_is_deleted ON aigcspace.ocr_tasks(is_deleted);
- """)
- print("✓ 创建索引 idx_ocr_tasks_is_deleted")
- else:
- print("✓ 索引 idx_ocr_tasks_is_deleted 已存在")
-
- if not index_exists(cursor, 'idx_ocr_tasks_user_deleted'):
- cursor.execute("""
- CREATE INDEX idx_ocr_tasks_user_deleted ON aigcspace.ocr_tasks(user_id, is_deleted);
- """)
- print("✓ 创建索引 idx_ocr_tasks_user_deleted")
- else:
- print("✓ 索引 idx_ocr_tasks_user_deleted 已存在")
-
- # 验证字段
- cursor.execute("""
- SELECT
- column_name,
- data_type,
- column_default,
- is_nullable
- FROM information_schema.columns
- WHERE table_schema = 'aigcspace'
- AND table_name = 'ocr_tasks'
- AND column_name = 'is_deleted';
- """)
- result = cursor.fetchone()
- if result:
- print(f"\n字段验证:")
- print(f" 列名: {result[0]}")
- print(f" 类型: {result[1]}")
- print(f" 默认值: {result[2]}")
- print(f" 可空: {result[3]}")
-
- print("\n✅ OCR任务表软删除字段添加完成!")
-
- except Exception as e:
- print(f"❌ 迁移失败: {e}")
- raise
- finally:
- cursor.close()
- conn.close()
- def rollback():
- """回滚迁移"""
- conn = get_db_connection()
- conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
- cursor = conn.cursor()
-
- try:
- if not table_exists(cursor):
- print("✓ 表 aigcspace.ocr_tasks 不存在,无需回滚")
- return
-
- print("开始回滚 OCR 任务表软删除字段...")
-
- # 删除索引
- if index_exists(cursor, 'idx_ocr_tasks_user_deleted'):
- cursor.execute("DROP INDEX IF EXISTS aigcspace.idx_ocr_tasks_user_deleted;")
- print("✓ 删除索引 idx_ocr_tasks_user_deleted")
-
- if index_exists(cursor, 'idx_ocr_tasks_is_deleted'):
- cursor.execute("DROP INDEX IF EXISTS aigcspace.idx_ocr_tasks_is_deleted;")
- print("✓ 删除索引 idx_ocr_tasks_is_deleted")
-
- # 删除字段
- if column_exists(cursor, 'is_deleted'):
- cursor.execute("ALTER TABLE aigcspace.ocr_tasks DROP COLUMN is_deleted;")
- print("✓ 删除字段 is_deleted")
-
- print("\n✅ OCR任务表软删除字段回滚完成!")
-
- except Exception as e:
- print(f"❌ 回滚失败: {e}")
- raise
- finally:
- cursor.close()
- conn.close()
- def status():
- """查看迁移状态"""
- conn = get_db_connection()
- conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
- cursor = conn.cursor()
-
- try:
- if not table_exists(cursor):
- print("❌ 表 aigcspace.ocr_tasks 不存在")
- return
-
- print("OCR任务表软删除字段状态:")
-
- # 检查字段
- field_exists = column_exists(cursor, 'is_deleted')
- status_icon = '✓' if field_exists else '✗'
- print(f"{status_icon} is_deleted: {'已添加' if field_exists else '未添加'}")
-
- # 检查索引
- idx1_exists = index_exists(cursor, 'idx_ocr_tasks_is_deleted')
- idx2_exists = index_exists(cursor, 'idx_ocr_tasks_user_deleted')
- print(f"{'✓' if idx1_exists else '✗'} idx_ocr_tasks_is_deleted: {'已创建' if idx1_exists else '未创建'}")
- print(f"{'✓' if idx2_exists else '✗'} idx_ocr_tasks_user_deleted: {'已创建' if idx2_exists else '未创建'}")
-
- # 如果字段存在,显示统计信息
- if field_exists:
- cursor.execute("""
- SELECT
- COUNT(*) as total,
- COUNT(*) FILTER (WHERE is_deleted = 0) as active,
- COUNT(*) FILTER (WHERE is_deleted = 1) as deleted
- FROM aigcspace.ocr_tasks;
- """)
- result = cursor.fetchone()
- print(f"\n数据统计:")
- print(f" 总记录数: {result[0]}")
- print(f" 未删除: {result[1]}")
- print(f" 已删除: {result[2]}")
-
- except Exception as e:
- print(f"❌ 查看状态失败: {e}")
- raise
- finally:
- cursor.close()
- conn.close()
- if __name__ == "__main__":
- import argparse
-
- parser = argparse.ArgumentParser(description='OCR任务表软删除字段迁移脚本')
- parser.add_argument('--action', choices=['migrate', 'rollback', 'status'],
- default='migrate', help='操作类型')
- args = parser.parse_args()
-
- if args.action == 'migrate':
- migrate()
- elif args.action == 'rollback':
- rollback()
- elif args.action == 'status':
- status()
|