| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- """
- 数据库迁移脚本:为对外API功能扩展projects表结构
- 添加以下字段:
- - status: 项目状态 (draft, configuring, ready, in_progress, completed)
- - source: 项目来源 (internal, external)
- - task_type: 任务类型 (text_classification, image_classification, object_detection, ner)
- - updated_at: 更新时间
- - external_id: 外部系统的项目ID
- 使用方法:
- cd backend
- python scripts/migrate_external_api.py
- """
- import sys
- import os
- # 添加父目录到路径
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from database import get_db_connection, db_config
- import logging
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- def check_column_exists(cursor, table: str, column: str) -> bool:
- """检查列是否存在"""
- if db_config.db_type == 'mysql':
- cursor.execute("""
- SELECT COUNT(*) as cnt FROM information_schema.columns
- WHERE table_schema = %s AND table_name = %s AND column_name = %s
- """, (db_config.database, table, column))
- row = cursor.fetchone()
- return row['cnt'] > 0 if row else False
- else:
- cursor.execute(f"PRAGMA table_info({table})")
- columns = cursor.fetchall()
- return any(col['name'] == column for col in columns)
- def migrate_mysql():
- """MySQL数据库迁移"""
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # 添加 status 字段
- if not check_column_exists(cursor, 'projects', 'status'):
- logger.info("添加 projects.status 字段...")
- cursor.execute("""
- ALTER TABLE projects
- ADD COLUMN status VARCHAR(20) DEFAULT 'draft'
- """)
- cursor.execute("CREATE INDEX idx_projects_status ON projects(status)")
- logger.info("✓ status 字段添加成功")
- else:
- logger.info("○ status 字段已存在,跳过")
-
- # 添加 source 字段
- if not check_column_exists(cursor, 'projects', 'source'):
- logger.info("添加 projects.source 字段...")
- cursor.execute("""
- ALTER TABLE projects
- ADD COLUMN source VARCHAR(20) DEFAULT 'internal'
- """)
- cursor.execute("CREATE INDEX idx_projects_source ON projects(source)")
- logger.info("✓ source 字段添加成功")
- else:
- logger.info("○ source 字段已存在,跳过")
-
- # 添加 task_type 字段
- if not check_column_exists(cursor, 'projects', 'task_type'):
- logger.info("添加 projects.task_type 字段...")
- cursor.execute("""
- ALTER TABLE projects
- ADD COLUMN task_type VARCHAR(50)
- """)
- logger.info("✓ task_type 字段添加成功")
- else:
- logger.info("○ task_type 字段已存在,跳过")
-
- # 添加 updated_at 字段
- if not check_column_exists(cursor, 'projects', 'updated_at'):
- logger.info("添加 projects.updated_at 字段...")
- cursor.execute("""
- ALTER TABLE projects
- ADD COLUMN updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
- """)
- logger.info("✓ updated_at 字段添加成功")
- else:
- logger.info("○ updated_at 字段已存在,跳过")
-
- # 添加 external_id 字段
- if not check_column_exists(cursor, 'projects', 'external_id'):
- logger.info("添加 projects.external_id 字段...")
- cursor.execute("""
- ALTER TABLE projects
- ADD COLUMN external_id VARCHAR(100)
- """)
- cursor.execute("CREATE INDEX idx_projects_external_id ON projects(external_id)")
- logger.info("✓ external_id 字段添加成功")
- else:
- logger.info("○ external_id 字段已存在,跳过")
-
- logger.info("MySQL 迁移完成!")
- def migrate_sqlite():
- """SQLite数据库迁移"""
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # 添加 status 字段
- if not check_column_exists(cursor, 'projects', 'status'):
- logger.info("添加 projects.status 字段...")
- cursor.execute("""
- ALTER TABLE projects
- ADD COLUMN status TEXT DEFAULT 'draft'
- """)
- cursor.execute("CREATE INDEX IF NOT EXISTS idx_projects_status ON projects(status)")
- logger.info("✓ status 字段添加成功")
- else:
- logger.info("○ status 字段已存在,跳过")
-
- # 添加 source 字段
- if not check_column_exists(cursor, 'projects', 'source'):
- logger.info("添加 projects.source 字段...")
- cursor.execute("""
- ALTER TABLE projects
- ADD COLUMN source TEXT DEFAULT 'internal'
- """)
- cursor.execute("CREATE INDEX IF NOT EXISTS idx_projects_source ON projects(source)")
- logger.info("✓ source 字段添加成功")
- else:
- logger.info("○ source 字段已存在,跳过")
-
- # 添加 task_type 字段
- if not check_column_exists(cursor, 'projects', 'task_type'):
- logger.info("添加 projects.task_type 字段...")
- cursor.execute("""
- ALTER TABLE projects
- ADD COLUMN task_type TEXT
- """)
- logger.info("✓ task_type 字段添加成功")
- else:
- logger.info("○ task_type 字段已存在,跳过")
-
- # 添加 updated_at 字段
- if not check_column_exists(cursor, 'projects', 'updated_at'):
- logger.info("添加 projects.updated_at 字段...")
- cursor.execute("""
- ALTER TABLE projects
- ADD COLUMN updated_at TIMESTAMP
- """)
- logger.info("✓ updated_at 字段添加成功")
- else:
- logger.info("○ updated_at 字段已存在,跳过")
-
- # 添加 external_id 字段
- if not check_column_exists(cursor, 'projects', 'external_id'):
- logger.info("添加 projects.external_id 字段...")
- cursor.execute("""
- ALTER TABLE projects
- ADD COLUMN external_id TEXT
- """)
- cursor.execute("CREATE INDEX IF NOT EXISTS idx_projects_external_id ON projects(external_id)")
- logger.info("✓ external_id 字段添加成功")
- else:
- logger.info("○ external_id 字段已存在,跳过")
-
- logger.info("SQLite 迁移完成!")
- def main():
- """执行迁移"""
- logger.info(f"开始数据库迁移,数据库类型: {db_config.db_type}")
-
- if db_config.db_type == 'mysql':
- migrate_mysql()
- else:
- migrate_sqlite()
-
- logger.info("=" * 50)
- logger.info("迁移完成!新增字段:")
- logger.info(" - status: 项目状态 (draft/configuring/ready/in_progress/completed)")
- logger.info(" - source: 项目来源 (internal/external)")
- logger.info(" - task_type: 任务类型")
- logger.info(" - updated_at: 更新时间")
- logger.info(" - external_id: 外部系统项目ID")
- if __name__ == "__main__":
- main()
|