#!/usr/bin/env python3 """ 数据库表结构导出脚本 功能: - 获取数据库中所有表的结构信息 - 每个表的信息保存到单独的文件中 - 支持JSON和Markdown格式 - 自动创建输出目录 使用方法: python export_table_schemas.py python export_table_schemas.py --format markdown python export_table_schemas.py --output-dir ../custom_output """ import os import sys import json import argparse from pathlib import Path from datetime import datetime from typing import Dict, List, Any, Optional # 添加项目根目录到Python路径 sys.path.append(str(Path(__file__).parent.parent)) from sqlalchemy import create_engine, inspect, text from sqlalchemy.engine import Engine from app.database import DATABASE_URL class TableSchemaExporter: """数据库表结构导出器""" def __init__(self, database_url: str = None): """ 初始化数据库连接 Args: database_url: 数据库连接URL,默认使用项目配置 """ self.database_url = database_url or DATABASE_URL self.engine = create_engine(self.database_url) self.inspector = inspect(self.engine) def get_all_tables(self) -> List[Dict[str, str]]: """ 获取所有表名(包含schema信息) Returns: 表信息列表,每项包含 schema 和 table_name """ tables = [] try: with self.engine.connect() as conn: result = conn.execute(text(""" SELECT schemaname as schema_name, tablename as table_name, schemaname || '.' || tablename as full_name FROM pg_tables WHERE schemaname NOT IN ('information_schema', 'pg_catalog') ORDER BY schemaname, tablename """)) for row in result: tables.append({ 'schema': row.schema_name, 'table': row.table_name, 'full_name': row.full_name }) print(f"✓ 发现 {len(tables)} 个表") return tables except Exception as e: print(f"✗ 获取表列表失败: {e}") return [] def get_table_comment(self, schema: str, table: str) -> Optional[str]: """获取表注释""" try: with self.engine.connect() as conn: result = conn.execute(text(""" SELECT obj_description(c.oid) FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = :table_name AND n.nspname = :schema_name """), {"table_name": table, "schema_name": schema}) return result.scalar() except Exception as e: print(f" 警告:无法获取表注释: {e}") return None def get_table_row_count(self, schema: str, table: str) -> Optional[int]: """获取表的行数""" try: with self.engine.connect() as conn: result = conn.execute( text(f'SELECT COUNT(*) FROM "{schema}"."{table}"') ) return result.scalar() except Exception as e: print(f" 警告:无法获取行数: {e}") return None def get_table_info(self, schema: str, table: str) -> Dict[str, Any]: """ 获取单个表的完整信息 Args: schema: 模式名 table: 表名 Returns: 表的详细信息字典 """ print(f" 正在提取表 {schema}.{table} ...") table_info = { 'schema': schema, 'table_name': table, 'full_name': f"{schema}.{table}", 'comment': self.get_table_comment(schema, table), 'row_count': self.get_table_row_count(schema, table), 'columns': [], 'indexes': [], 'foreign_keys': [], 'primary_keys': [], 'extracted_at': datetime.now().isoformat() } try: # 获取列信息 columns = self.inspector.get_columns(table, schema=schema) pk_constraint = self.inspector.get_pk_constraint(table, schema=schema) fk_constraints = self.inspector.get_foreign_keys(table, schema=schema) # 构建外键映射 fk_map = {} for fk in fk_constraints: for col in fk['constrained_columns']: fk_map[col] = { 'referred_table': fk['referred_table'], 'referred_column': fk['referred_columns'][0] if fk['referred_columns'] else None, 'constraint_name': fk['name'] } # 处理列信息 for col in columns: column_info = { 'name': col['name'], 'type': str(col['type']), 'nullable': col['nullable'], 'default': str(col['default']) if col['default'] is not None else None, 'primary_key': col['name'] in (pk_constraint.get('constrained_columns', []) or []), 'foreign_key': fk_map.get(col['name']), 'comment': col.get('comment') } table_info['columns'].append(column_info) # 主键信息 if pk_constraint and pk_constraint.get('constrained_columns'): table_info['primary_keys'] = pk_constraint['constrained_columns'] # 获取索引信息 indexes = self.inspector.get_indexes(table, schema=schema) for idx in indexes: index_info = { 'name': idx['name'], 'columns': idx['column_names'], 'unique': idx['unique'], 'type': idx.get('type', 'btree') } table_info['indexes'].append(index_info) # 外键信息 for fk in fk_constraints: fk_info = { 'name': fk['name'], 'constrained_columns': fk['constrained_columns'], 'referred_schema': fk.get('referred_schema'), 'referred_table': fk['referred_table'], 'referred_columns': fk['referred_columns'], 'on_delete': fk.get('options', {}).get('ondelete'), 'on_update': fk.get('options', {}).get('onupdate') } table_info['foreign_keys'].append(fk_info) print(f" ✓ {len(table_info['columns'])} 个字段, " f"{len(table_info['indexes'])} 个索引, " f"{len(table_info['foreign_keys'])} 个外键") except Exception as e: print(f" ✗ 提取失败: {e}") return table_info def format_as_json(self, table_info: Dict[str, Any]) -> str: """格式化为JSON""" return json.dumps(table_info, ensure_ascii=False, indent=2) def format_as_markdown(self, table_info: Dict[str, Any]) -> str: """格式化为Markdown""" lines = [ f"# {table_info['full_name']}", "" ] # 基本信息 if table_info['comment']: lines.extend([ f"**表说明**: {table_info['comment']}", "" ]) lines.extend([ f"**模式**: `{table_info['schema']}`", f"**表名**: `{table_info['table_name']}`", ]) if table_info['row_count'] is not None: lines.append(f"**数据行数**: {table_info['row_count']:,}") lines.extend([ f"**提取时间**: {table_info['extracted_at']}", "" ]) # 主键信息 if table_info['primary_keys']: pk_list = ", ".join([f"`{pk}`" for pk in table_info['primary_keys']]) lines.extend([ f"**主键**: {pk_list}", "" ]) # 字段信息 lines.extend([ "## 字段信息", "", "| 字段名 | 类型 | 可空 | 默认值 | 主键 | 外键 | 说明 |", "|--------|------|------|--------|------|------|------|" ]) for col in table_info['columns']: pk_mark = "✓" if col['primary_key'] else "" nullable = "✓" if col['nullable'] else "" default_val = col['default'] or "" comment = col['comment'] or "" # 外键信息 fk_mark = "" if col['foreign_key']: fk = col['foreign_key'] fk_mark = f"{fk['referred_table']}.{fk['referred_column']}" lines.append( f"| `{col['name']}` | {col['type']} | {nullable} | {default_val} | {pk_mark} | {fk_mark} | {comment} |" ) lines.append("") # 索引信息 if table_info['indexes']: lines.extend([ "## 索引信息", "", "| 索引名 | 字段 | 唯一 | 类型 |", "|--------|------|------|------|" ]) for idx in table_info['indexes']: unique_mark = "✓" if idx['unique'] else "" columns_str = ", ".join([f"`{col}`" for col in idx['columns']]) lines.append( f"| `{idx['name']}` | {columns_str} | {unique_mark} | {idx['type']} |" ) lines.append("") # 外键约束 if table_info['foreign_keys']: lines.extend([ "## 外键约束", "", "| 约束名 | 本表字段 | 引用表 | 引用字段 | 删除规则 | 更新规则 |", "|--------|----------|--------|----------|----------|----------|" ]) for fk in table_info['foreign_keys']: constrained_cols = ", ".join([f"`{col}`" for col in fk['constrained_columns']]) referred_cols = ", ".join([f"`{col}`" for col in fk['referred_columns']]) referred_table = fk['referred_table'] if fk.get('referred_schema'): referred_table = f"{fk['referred_schema']}.{referred_table}" on_delete = fk['on_delete'] or "-" on_update = fk['on_update'] or "-" lines.append( f"| `{fk['name']}` | {constrained_cols} | `{referred_table}` | {referred_cols} | {on_delete} | {on_update} |" ) lines.append("") return "\n".join(lines) def export_table(self, schema: str, table: str, output_dir: Path, format: str = 'json'): """ 导出单个表的结构信息到文件 Args: schema: 模式名 table: 表名 output_dir: 输出目录 format: 输出格式 (json 或 markdown) """ # 获取表信息 table_info = self.get_table_info(schema, table) # 格式化内容 if format == 'json': content = self.format_as_json(table_info) extension = 'json' else: # markdown content = self.format_as_markdown(table_info) extension = 'md' # 生成文件名(使用 schema.table 格式) filename = f"{schema}.{table}.{extension}" filepath = output_dir / filename # 写入文件 try: with open(filepath, 'w', encoding='utf-8') as f: f.write(content) print(f" ✓ 已保存到: {filepath.name}") except Exception as e: print(f" ✗ 保存失败: {e}") def export_all_tables(self, output_dir: Path, format: str = 'json'): """ 导出所有表的结构信息 Args: output_dir: 输出目录 format: 输出格式 (json 或 markdown) """ # 创建输出目录 output_dir.mkdir(parents=True, exist_ok=True) print(f"✓ 输出目录: {output_dir}") print(f"✓ 输出格式: {format.upper()}") print() # 获取所有表 tables = self.get_all_tables() if not tables: print("✗ 未找到任何表") return # 导出每个表 print(f"开始导出 {len(tables)} 个表...\n") success_count = 0 for table_info in tables: try: self.export_table( table_info['schema'], table_info['table'], output_dir, format ) success_count += 1 except Exception as e: print(f" ✗ 导出 {table_info['full_name']} 失败: {e}") print() print("="*60) print(f"✓ 导出完成!成功: {success_count}/{len(tables)}") print(f"✓ 文件保存在: {output_dir.absolute()}") # 生成索引文件 self.generate_index_file(tables, output_dir, format) def generate_index_file(self, tables: List[Dict[str, str]], output_dir: Path, format: str): """ 生成索引文件 Args: tables: 表信息列表 output_dir: 输出目录 format: 输出格式 """ print("\n生成索引文件...") if format == 'json': # JSON索引 index_data = { 'generated_at': datetime.now().isoformat(), 'total_tables': len(tables), 'tables': [ { 'schema': t['schema'], 'table': t['table'], 'full_name': t['full_name'], 'file': f"{t['full_name']}.json" } for t in tables ] } index_file = output_dir / 'index.json' with open(index_file, 'w', encoding='utf-8') as f: json.dumps(index_data, f, ensure_ascii=False, indent=2) f.write(json.dumps(index_data, ensure_ascii=False, indent=2)) else: # markdown # Markdown索引 lines = [ "# 数据库表结构索引", "", f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", f"**表总数**: {len(tables)}", "", "## 表列表", "" ] # 按schema分组 schemas = {} for t in tables: schema = t['schema'] if schema not in schemas: schemas[schema] = [] schemas[schema].append(t) for schema, schema_tables in sorted(schemas.items()): lines.extend([ f"### {schema}", "" ]) for t in sorted(schema_tables, key=lambda x: x['table']): filename = f"{t['full_name']}.md" lines.append(f"- [{t['table']}](./{filename})") lines.append("") index_file = output_dir / 'README.md' with open(index_file, 'w', encoding='utf-8') as f: f.write("\n".join(lines)) print(f"✓ 索引文件已生成: {index_file.name}") def main(): """主函数""" parser = argparse.ArgumentParser( description="导出数据库表结构信息到单独的文件", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 导出为JSON格式(默认) python export_table_schemas.py # 导出为Markdown格式 python export_table_schemas.py --format markdown # 指定输出目录 python export_table_schemas.py --output-dir ../custom_output # 只导出特定的表 python export_table_schemas.py --tables aigcspace.models aigcspace.users """ ) parser.add_argument( "--format", choices=["json", "markdown"], default="json", help="输出格式 (默认: json)" ) parser.add_argument( "--output-dir", type=str, default="database_info", help="输出目录路径 (默认: database_info)" ) parser.add_argument( "--tables", nargs="*", help="指定要导出的表名(格式: schema.table),默认导出所有表" ) parser.add_argument( "--database-url", type=str, help="数据库连接URL (默认: 使用项目配置)" ) args = parser.parse_args() try: print("="*60) print("数据库表结构导出工具") print("="*60) print() # 初始化导出器 print("正在连接数据库...") exporter = TableSchemaExporter(args.database_url) print("✓ 数据库连接成功") print() # 确定输出目录(相对于backend目录) script_dir = Path(__file__).parent backend_dir = script_dir.parent output_dir = backend_dir / args.output_dir output_dir = output_dir.resolve() # 转换为绝对路径 # 导出表 if args.tables: # 导出指定的表 print(f"导出指定的 {len(args.tables)} 个表...") output_dir.mkdir(parents=True, exist_ok=True) for table_full_name in args.tables: if '.' in table_full_name: schema, table = table_full_name.split('.', 1) else: schema = 'public' table = table_full_name try: exporter.export_table(schema, table, output_dir, args.format) except Exception as e: print(f"✗ 导出 {table_full_name} 失败: {e}") else: # 导出所有表 exporter.export_all_tables(output_dir, args.format) except Exception as e: print(f"\n❌ 执行失败: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()