""" 数据库迁移脚本:为对外API功能扩展projects表结构 添加以下字段: - status: 项目状态 (draft, configuring, ready, in_progress, completed) - source: 项目来源 (internal, external) - task_type: 任务类型 (text_classification, image_classification, object_detection, ner) - updated_at: 更新时间 - external_id: 外部系统的项目ID 使用方法: cd backend python scripts/migrate_external_api.py """ import sys import os # 添加父目录到路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from database import get_db_connection, db_config import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def check_column_exists(cursor, table: str, column: str) -> bool: """检查列是否存在""" if db_config.db_type == 'mysql': cursor.execute(""" SELECT COUNT(*) as cnt FROM information_schema.columns WHERE table_schema = %s AND table_name = %s AND column_name = %s """, (db_config.database, table, column)) row = cursor.fetchone() return row['cnt'] > 0 if row else False else: cursor.execute(f"PRAGMA table_info({table})") columns = cursor.fetchall() return any(col['name'] == column for col in columns) def migrate_mysql(): """MySQL数据库迁移""" with get_db_connection() as conn: cursor = conn.cursor() # 添加 status 字段 if not check_column_exists(cursor, 'projects', 'status'): logger.info("添加 projects.status 字段...") cursor.execute(""" ALTER TABLE projects ADD COLUMN status VARCHAR(20) DEFAULT 'draft' """) cursor.execute("CREATE INDEX idx_projects_status ON projects(status)") logger.info("✓ status 字段添加成功") else: logger.info("○ status 字段已存在,跳过") # 添加 source 字段 if not check_column_exists(cursor, 'projects', 'source'): logger.info("添加 projects.source 字段...") cursor.execute(""" ALTER TABLE projects ADD COLUMN source VARCHAR(20) DEFAULT 'internal' """) cursor.execute("CREATE INDEX idx_projects_source ON projects(source)") logger.info("✓ source 字段添加成功") else: logger.info("○ source 字段已存在,跳过") # 添加 task_type 字段 if not check_column_exists(cursor, 'projects', 'task_type'): logger.info("添加 projects.task_type 字段...") cursor.execute(""" ALTER TABLE projects ADD COLUMN task_type VARCHAR(50) """) logger.info("✓ task_type 字段添加成功") else: logger.info("○ task_type 字段已存在,跳过") # 添加 updated_at 字段 if not check_column_exists(cursor, 'projects', 'updated_at'): logger.info("添加 projects.updated_at 字段...") cursor.execute(""" ALTER TABLE projects ADD COLUMN updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP """) logger.info("✓ updated_at 字段添加成功") else: logger.info("○ updated_at 字段已存在,跳过") # 添加 external_id 字段 if not check_column_exists(cursor, 'projects', 'external_id'): logger.info("添加 projects.external_id 字段...") cursor.execute(""" ALTER TABLE projects ADD COLUMN external_id VARCHAR(100) """) cursor.execute("CREATE INDEX idx_projects_external_id ON projects(external_id)") logger.info("✓ external_id 字段添加成功") else: logger.info("○ external_id 字段已存在,跳过") logger.info("MySQL 迁移完成!") def migrate_sqlite(): """SQLite数据库迁移""" with get_db_connection() as conn: cursor = conn.cursor() # 添加 status 字段 if not check_column_exists(cursor, 'projects', 'status'): logger.info("添加 projects.status 字段...") cursor.execute(""" ALTER TABLE projects ADD COLUMN status TEXT DEFAULT 'draft' """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_projects_status ON projects(status)") logger.info("✓ status 字段添加成功") else: logger.info("○ status 字段已存在,跳过") # 添加 source 字段 if not check_column_exists(cursor, 'projects', 'source'): logger.info("添加 projects.source 字段...") cursor.execute(""" ALTER TABLE projects ADD COLUMN source TEXT DEFAULT 'internal' """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_projects_source ON projects(source)") logger.info("✓ source 字段添加成功") else: logger.info("○ source 字段已存在,跳过") # 添加 task_type 字段 if not check_column_exists(cursor, 'projects', 'task_type'): logger.info("添加 projects.task_type 字段...") cursor.execute(""" ALTER TABLE projects ADD COLUMN task_type TEXT """) logger.info("✓ task_type 字段添加成功") else: logger.info("○ task_type 字段已存在,跳过") # 添加 updated_at 字段 if not check_column_exists(cursor, 'projects', 'updated_at'): logger.info("添加 projects.updated_at 字段...") cursor.execute(""" ALTER TABLE projects ADD COLUMN updated_at TIMESTAMP """) logger.info("✓ updated_at 字段添加成功") else: logger.info("○ updated_at 字段已存在,跳过") # 添加 external_id 字段 if not check_column_exists(cursor, 'projects', 'external_id'): logger.info("添加 projects.external_id 字段...") cursor.execute(""" ALTER TABLE projects ADD COLUMN external_id TEXT """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_projects_external_id ON projects(external_id)") logger.info("✓ external_id 字段添加成功") else: logger.info("○ external_id 字段已存在,跳过") logger.info("SQLite 迁移完成!") def main(): """执行迁移""" logger.info(f"开始数据库迁移,数据库类型: {db_config.db_type}") if db_config.db_type == 'mysql': migrate_mysql() else: migrate_sqlite() logger.info("=" * 50) logger.info("迁移完成!新增字段:") logger.info(" - status: 项目状态 (draft/configuring/ready/in_progress/completed)") logger.info(" - source: 项目来源 (internal/external)") logger.info(" - task_type: 任务类型") logger.info(" - updated_at: 更新时间") logger.info(" - external_id: 外部系统项目ID") if __name__ == "__main__": main()