""" OCR任务表添加软删除字段迁移脚本 添加字段:is_deleted(软删除标记) 运行: python -m scripts.migrate_ocr_add_is_deleted """ import os import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from dotenv import load_dotenv load_dotenv() import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT def get_db_connection(): """获取数据库连接""" return psycopg2.connect( host=os.getenv('DB_HOST', 'localhost'), port=os.getenv('DB_PORT', '5432'), user=os.getenv('DB_USER', 'postgres'), password=os.getenv('DB_PASSWORD', ''), database=os.getenv('DB_NAME', 'model_square') ) def table_exists(cursor): """检查表是否存在""" cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'aigcspace' AND table_name = 'ocr_tasks' ); """) return cursor.fetchone()[0] def column_exists(cursor, column_name): """检查列是否存在""" cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.columns WHERE table_schema = 'aigcspace' AND table_name = 'ocr_tasks' AND column_name = %s ); """, (column_name,)) return cursor.fetchone()[0] def index_exists(cursor, index_name): """检查索引是否存在""" cursor.execute(""" SELECT EXISTS ( SELECT FROM pg_indexes WHERE schemaname = 'aigcspace' AND tablename = 'ocr_tasks' AND indexname = %s ); """, (index_name,)) return cursor.fetchone()[0] def migrate(): """执行迁移""" conn = get_db_connection() conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cursor = conn.cursor() try: if not table_exists(cursor): print("❌ 表 aigcspace.ocr_tasks 不存在,请先运行 migrate_ocr.py") return print("开始添加软删除字段到 OCR 任务表...") # 添加 is_deleted 字段 if not column_exists(cursor, 'is_deleted'): cursor.execute(""" ALTER TABLE aigcspace.ocr_tasks ADD COLUMN is_deleted INTEGER DEFAULT 0 NOT NULL; """) cursor.execute(""" COMMENT ON COLUMN aigcspace.ocr_tasks.is_deleted IS '软删除标记:0-未删除,1-已删除'; """) print("✓ 添加字段 is_deleted") else: print("✓ 字段 is_deleted 已存在") # 添加索引 if not index_exists(cursor, 'idx_ocr_tasks_is_deleted'): cursor.execute(""" CREATE INDEX idx_ocr_tasks_is_deleted ON aigcspace.ocr_tasks(is_deleted); """) print("✓ 创建索引 idx_ocr_tasks_is_deleted") else: print("✓ 索引 idx_ocr_tasks_is_deleted 已存在") if not index_exists(cursor, 'idx_ocr_tasks_user_deleted'): cursor.execute(""" CREATE INDEX idx_ocr_tasks_user_deleted ON aigcspace.ocr_tasks(user_id, is_deleted); """) print("✓ 创建索引 idx_ocr_tasks_user_deleted") else: print("✓ 索引 idx_ocr_tasks_user_deleted 已存在") # 验证字段 cursor.execute(""" SELECT column_name, data_type, column_default, is_nullable FROM information_schema.columns WHERE table_schema = 'aigcspace' AND table_name = 'ocr_tasks' AND column_name = 'is_deleted'; """) result = cursor.fetchone() if result: print(f"\n字段验证:") print(f" 列名: {result[0]}") print(f" 类型: {result[1]}") print(f" 默认值: {result[2]}") print(f" 可空: {result[3]}") print("\n✅ OCR任务表软删除字段添加完成!") except Exception as e: print(f"❌ 迁移失败: {e}") raise finally: cursor.close() conn.close() def rollback(): """回滚迁移""" conn = get_db_connection() conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cursor = conn.cursor() try: if not table_exists(cursor): print("✓ 表 aigcspace.ocr_tasks 不存在,无需回滚") return print("开始回滚 OCR 任务表软删除字段...") # 删除索引 if index_exists(cursor, 'idx_ocr_tasks_user_deleted'): cursor.execute("DROP INDEX IF EXISTS aigcspace.idx_ocr_tasks_user_deleted;") print("✓ 删除索引 idx_ocr_tasks_user_deleted") if index_exists(cursor, 'idx_ocr_tasks_is_deleted'): cursor.execute("DROP INDEX IF EXISTS aigcspace.idx_ocr_tasks_is_deleted;") print("✓ 删除索引 idx_ocr_tasks_is_deleted") # 删除字段 if column_exists(cursor, 'is_deleted'): cursor.execute("ALTER TABLE aigcspace.ocr_tasks DROP COLUMN is_deleted;") print("✓ 删除字段 is_deleted") print("\n✅ OCR任务表软删除字段回滚完成!") except Exception as e: print(f"❌ 回滚失败: {e}") raise finally: cursor.close() conn.close() def status(): """查看迁移状态""" conn = get_db_connection() conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cursor = conn.cursor() try: if not table_exists(cursor): print("❌ 表 aigcspace.ocr_tasks 不存在") return print("OCR任务表软删除字段状态:") # 检查字段 field_exists = column_exists(cursor, 'is_deleted') status_icon = '✓' if field_exists else '✗' print(f"{status_icon} is_deleted: {'已添加' if field_exists else '未添加'}") # 检查索引 idx1_exists = index_exists(cursor, 'idx_ocr_tasks_is_deleted') idx2_exists = index_exists(cursor, 'idx_ocr_tasks_user_deleted') print(f"{'✓' if idx1_exists else '✗'} idx_ocr_tasks_is_deleted: {'已创建' if idx1_exists else '未创建'}") print(f"{'✓' if idx2_exists else '✗'} idx_ocr_tasks_user_deleted: {'已创建' if idx2_exists else '未创建'}") # 如果字段存在,显示统计信息 if field_exists: cursor.execute(""" SELECT COUNT(*) as total, COUNT(*) FILTER (WHERE is_deleted = 0) as active, COUNT(*) FILTER (WHERE is_deleted = 1) as deleted FROM aigcspace.ocr_tasks; """) result = cursor.fetchone() print(f"\n数据统计:") print(f" 总记录数: {result[0]}") print(f" 未删除: {result[1]}") print(f" 已删除: {result[2]}") except Exception as e: print(f"❌ 查看状态失败: {e}") raise finally: cursor.close() conn.close() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='OCR任务表软删除字段迁移脚本') parser.add_argument('--action', choices=['migrate', 'rollback', 'status'], default='migrate', help='操作类型') args = parser.parse_args() if args.action == 'migrate': migrate() elif args.action == 'rollback': rollback() elif args.action == 'status': status()