export_table_schemas.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. #!/usr/bin/env python3
  2. """
  3. 数据库表结构导出脚本
  4. 功能:
  5. - 获取数据库中所有表的结构信息
  6. - 每个表的信息保存到单独的文件中
  7. - 支持JSON和Markdown格式
  8. - 自动创建输出目录
  9. 使用方法:
  10. python export_table_schemas.py
  11. python export_table_schemas.py --format markdown
  12. python export_table_schemas.py --output-dir ../custom_output
  13. """
  14. import os
  15. import sys
  16. import json
  17. import argparse
  18. from pathlib import Path
  19. from datetime import datetime
  20. from typing import Dict, List, Any, Optional
  21. # 添加项目根目录到Python路径
  22. sys.path.append(str(Path(__file__).parent.parent))
  23. from sqlalchemy import create_engine, inspect, text
  24. from sqlalchemy.engine import Engine
  25. from app.database import DATABASE_URL
  26. class TableSchemaExporter:
  27. """数据库表结构导出器"""
  28. def __init__(self, database_url: str = None):
  29. """
  30. 初始化数据库连接
  31. Args:
  32. database_url: 数据库连接URL,默认使用项目配置
  33. """
  34. self.database_url = database_url or DATABASE_URL
  35. self.engine = create_engine(self.database_url)
  36. self.inspector = inspect(self.engine)
  37. def get_all_tables(self) -> List[Dict[str, str]]:
  38. """
  39. 获取所有表名(包含schema信息)
  40. Returns:
  41. 表信息列表,每项包含 schema 和 table_name
  42. """
  43. tables = []
  44. try:
  45. with self.engine.connect() as conn:
  46. result = conn.execute(text("""
  47. SELECT
  48. schemaname as schema_name,
  49. tablename as table_name,
  50. schemaname || '.' || tablename as full_name
  51. FROM pg_tables
  52. WHERE schemaname NOT IN ('information_schema', 'pg_catalog')
  53. ORDER BY schemaname, tablename
  54. """))
  55. for row in result:
  56. tables.append({
  57. 'schema': row.schema_name,
  58. 'table': row.table_name,
  59. 'full_name': row.full_name
  60. })
  61. print(f"✓ 发现 {len(tables)} 个表")
  62. return tables
  63. except Exception as e:
  64. print(f"✗ 获取表列表失败: {e}")
  65. return []
  66. def get_table_comment(self, schema: str, table: str) -> Optional[str]:
  67. """获取表注释"""
  68. try:
  69. with self.engine.connect() as conn:
  70. result = conn.execute(text("""
  71. SELECT obj_description(c.oid)
  72. FROM pg_class c
  73. JOIN pg_namespace n ON n.oid = c.relnamespace
  74. WHERE c.relname = :table_name AND n.nspname = :schema_name
  75. """), {"table_name": table, "schema_name": schema})
  76. return result.scalar()
  77. except Exception as e:
  78. print(f" 警告:无法获取表注释: {e}")
  79. return None
  80. def get_table_row_count(self, schema: str, table: str) -> Optional[int]:
  81. """获取表的行数"""
  82. try:
  83. with self.engine.connect() as conn:
  84. result = conn.execute(
  85. text(f'SELECT COUNT(*) FROM "{schema}"."{table}"')
  86. )
  87. return result.scalar()
  88. except Exception as e:
  89. print(f" 警告:无法获取行数: {e}")
  90. return None
  91. def get_table_info(self, schema: str, table: str) -> Dict[str, Any]:
  92. """
  93. 获取单个表的完整信息
  94. Args:
  95. schema: 模式名
  96. table: 表名
  97. Returns:
  98. 表的详细信息字典
  99. """
  100. print(f" 正在提取表 {schema}.{table} ...")
  101. table_info = {
  102. 'schema': schema,
  103. 'table_name': table,
  104. 'full_name': f"{schema}.{table}",
  105. 'comment': self.get_table_comment(schema, table),
  106. 'row_count': self.get_table_row_count(schema, table),
  107. 'columns': [],
  108. 'indexes': [],
  109. 'foreign_keys': [],
  110. 'primary_keys': [],
  111. 'extracted_at': datetime.now().isoformat()
  112. }
  113. try:
  114. # 获取列信息
  115. columns = self.inspector.get_columns(table, schema=schema)
  116. pk_constraint = self.inspector.get_pk_constraint(table, schema=schema)
  117. fk_constraints = self.inspector.get_foreign_keys(table, schema=schema)
  118. # 构建外键映射
  119. fk_map = {}
  120. for fk in fk_constraints:
  121. for col in fk['constrained_columns']:
  122. fk_map[col] = {
  123. 'referred_table': fk['referred_table'],
  124. 'referred_column': fk['referred_columns'][0] if fk['referred_columns'] else None,
  125. 'constraint_name': fk['name']
  126. }
  127. # 处理列信息
  128. for col in columns:
  129. column_info = {
  130. 'name': col['name'],
  131. 'type': str(col['type']),
  132. 'nullable': col['nullable'],
  133. 'default': str(col['default']) if col['default'] is not None else None,
  134. 'primary_key': col['name'] in (pk_constraint.get('constrained_columns', []) or []),
  135. 'foreign_key': fk_map.get(col['name']),
  136. 'comment': col.get('comment')
  137. }
  138. table_info['columns'].append(column_info)
  139. # 主键信息
  140. if pk_constraint and pk_constraint.get('constrained_columns'):
  141. table_info['primary_keys'] = pk_constraint['constrained_columns']
  142. # 获取索引信息
  143. indexes = self.inspector.get_indexes(table, schema=schema)
  144. for idx in indexes:
  145. index_info = {
  146. 'name': idx['name'],
  147. 'columns': idx['column_names'],
  148. 'unique': idx['unique'],
  149. 'type': idx.get('type', 'btree')
  150. }
  151. table_info['indexes'].append(index_info)
  152. # 外键信息
  153. for fk in fk_constraints:
  154. fk_info = {
  155. 'name': fk['name'],
  156. 'constrained_columns': fk['constrained_columns'],
  157. 'referred_schema': fk.get('referred_schema'),
  158. 'referred_table': fk['referred_table'],
  159. 'referred_columns': fk['referred_columns'],
  160. 'on_delete': fk.get('options', {}).get('ondelete'),
  161. 'on_update': fk.get('options', {}).get('onupdate')
  162. }
  163. table_info['foreign_keys'].append(fk_info)
  164. print(f" ✓ {len(table_info['columns'])} 个字段, "
  165. f"{len(table_info['indexes'])} 个索引, "
  166. f"{len(table_info['foreign_keys'])} 个外键")
  167. except Exception as e:
  168. print(f" ✗ 提取失败: {e}")
  169. return table_info
  170. def format_as_json(self, table_info: Dict[str, Any]) -> str:
  171. """格式化为JSON"""
  172. return json.dumps(table_info, ensure_ascii=False, indent=2)
  173. def format_as_markdown(self, table_info: Dict[str, Any]) -> str:
  174. """格式化为Markdown"""
  175. lines = [
  176. f"# {table_info['full_name']}",
  177. ""
  178. ]
  179. # 基本信息
  180. if table_info['comment']:
  181. lines.extend([
  182. f"**表说明**: {table_info['comment']}",
  183. ""
  184. ])
  185. lines.extend([
  186. f"**模式**: `{table_info['schema']}`",
  187. f"**表名**: `{table_info['table_name']}`",
  188. ])
  189. if table_info['row_count'] is not None:
  190. lines.append(f"**数据行数**: {table_info['row_count']:,}")
  191. lines.extend([
  192. f"**提取时间**: {table_info['extracted_at']}",
  193. ""
  194. ])
  195. # 主键信息
  196. if table_info['primary_keys']:
  197. pk_list = ", ".join([f"`{pk}`" for pk in table_info['primary_keys']])
  198. lines.extend([
  199. f"**主键**: {pk_list}",
  200. ""
  201. ])
  202. # 字段信息
  203. lines.extend([
  204. "## 字段信息",
  205. "",
  206. "| 字段名 | 类型 | 可空 | 默认值 | 主键 | 外键 | 说明 |",
  207. "|--------|------|------|--------|------|------|------|"
  208. ])
  209. for col in table_info['columns']:
  210. pk_mark = "✓" if col['primary_key'] else ""
  211. nullable = "✓" if col['nullable'] else ""
  212. default_val = col['default'] or ""
  213. comment = col['comment'] or ""
  214. # 外键信息
  215. fk_mark = ""
  216. if col['foreign_key']:
  217. fk = col['foreign_key']
  218. fk_mark = f"{fk['referred_table']}.{fk['referred_column']}"
  219. lines.append(
  220. f"| `{col['name']}` | {col['type']} | {nullable} | {default_val} | {pk_mark} | {fk_mark} | {comment} |"
  221. )
  222. lines.append("")
  223. # 索引信息
  224. if table_info['indexes']:
  225. lines.extend([
  226. "## 索引信息",
  227. "",
  228. "| 索引名 | 字段 | 唯一 | 类型 |",
  229. "|--------|------|------|------|"
  230. ])
  231. for idx in table_info['indexes']:
  232. unique_mark = "✓" if idx['unique'] else ""
  233. columns_str = ", ".join([f"`{col}`" for col in idx['columns']])
  234. lines.append(
  235. f"| `{idx['name']}` | {columns_str} | {unique_mark} | {idx['type']} |"
  236. )
  237. lines.append("")
  238. # 外键约束
  239. if table_info['foreign_keys']:
  240. lines.extend([
  241. "## 外键约束",
  242. "",
  243. "| 约束名 | 本表字段 | 引用表 | 引用字段 | 删除规则 | 更新规则 |",
  244. "|--------|----------|--------|----------|----------|----------|"
  245. ])
  246. for fk in table_info['foreign_keys']:
  247. constrained_cols = ", ".join([f"`{col}`" for col in fk['constrained_columns']])
  248. referred_cols = ", ".join([f"`{col}`" for col in fk['referred_columns']])
  249. referred_table = fk['referred_table']
  250. if fk.get('referred_schema'):
  251. referred_table = f"{fk['referred_schema']}.{referred_table}"
  252. on_delete = fk['on_delete'] or "-"
  253. on_update = fk['on_update'] or "-"
  254. lines.append(
  255. f"| `{fk['name']}` | {constrained_cols} | `{referred_table}` | {referred_cols} | {on_delete} | {on_update} |"
  256. )
  257. lines.append("")
  258. return "\n".join(lines)
  259. def export_table(self, schema: str, table: str, output_dir: Path, format: str = 'json'):
  260. """
  261. 导出单个表的结构信息到文件
  262. Args:
  263. schema: 模式名
  264. table: 表名
  265. output_dir: 输出目录
  266. format: 输出格式 (json 或 markdown)
  267. """
  268. # 获取表信息
  269. table_info = self.get_table_info(schema, table)
  270. # 格式化内容
  271. if format == 'json':
  272. content = self.format_as_json(table_info)
  273. extension = 'json'
  274. else: # markdown
  275. content = self.format_as_markdown(table_info)
  276. extension = 'md'
  277. # 生成文件名(使用 schema.table 格式)
  278. filename = f"{schema}.{table}.{extension}"
  279. filepath = output_dir / filename
  280. # 写入文件
  281. try:
  282. with open(filepath, 'w', encoding='utf-8') as f:
  283. f.write(content)
  284. print(f" ✓ 已保存到: {filepath.name}")
  285. except Exception as e:
  286. print(f" ✗ 保存失败: {e}")
  287. def export_all_tables(self, output_dir: Path, format: str = 'json'):
  288. """
  289. 导出所有表的结构信息
  290. Args:
  291. output_dir: 输出目录
  292. format: 输出格式 (json 或 markdown)
  293. """
  294. # 创建输出目录
  295. output_dir.mkdir(parents=True, exist_ok=True)
  296. print(f"✓ 输出目录: {output_dir}")
  297. print(f"✓ 输出格式: {format.upper()}")
  298. print()
  299. # 获取所有表
  300. tables = self.get_all_tables()
  301. if not tables:
  302. print("✗ 未找到任何表")
  303. return
  304. # 导出每个表
  305. print(f"开始导出 {len(tables)} 个表...\n")
  306. success_count = 0
  307. for table_info in tables:
  308. try:
  309. self.export_table(
  310. table_info['schema'],
  311. table_info['table'],
  312. output_dir,
  313. format
  314. )
  315. success_count += 1
  316. except Exception as e:
  317. print(f" ✗ 导出 {table_info['full_name']} 失败: {e}")
  318. print()
  319. print("="*60)
  320. print(f"✓ 导出完成!成功: {success_count}/{len(tables)}")
  321. print(f"✓ 文件保存在: {output_dir.absolute()}")
  322. # 生成索引文件
  323. self.generate_index_file(tables, output_dir, format)
  324. def generate_index_file(self, tables: List[Dict[str, str]], output_dir: Path, format: str):
  325. """
  326. 生成索引文件
  327. Args:
  328. tables: 表信息列表
  329. output_dir: 输出目录
  330. format: 输出格式
  331. """
  332. print("\n生成索引文件...")
  333. if format == 'json':
  334. # JSON索引
  335. index_data = {
  336. 'generated_at': datetime.now().isoformat(),
  337. 'total_tables': len(tables),
  338. 'tables': [
  339. {
  340. 'schema': t['schema'],
  341. 'table': t['table'],
  342. 'full_name': t['full_name'],
  343. 'file': f"{t['full_name']}.json"
  344. }
  345. for t in tables
  346. ]
  347. }
  348. index_file = output_dir / 'index.json'
  349. with open(index_file, 'w', encoding='utf-8') as f:
  350. json.dumps(index_data, f, ensure_ascii=False, indent=2)
  351. f.write(json.dumps(index_data, ensure_ascii=False, indent=2))
  352. else: # markdown
  353. # Markdown索引
  354. lines = [
  355. "# 数据库表结构索引",
  356. "",
  357. f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
  358. f"**表总数**: {len(tables)}",
  359. "",
  360. "## 表列表",
  361. ""
  362. ]
  363. # 按schema分组
  364. schemas = {}
  365. for t in tables:
  366. schema = t['schema']
  367. if schema not in schemas:
  368. schemas[schema] = []
  369. schemas[schema].append(t)
  370. for schema, schema_tables in sorted(schemas.items()):
  371. lines.extend([
  372. f"### {schema}",
  373. ""
  374. ])
  375. for t in sorted(schema_tables, key=lambda x: x['table']):
  376. filename = f"{t['full_name']}.md"
  377. lines.append(f"- [{t['table']}](./{filename})")
  378. lines.append("")
  379. index_file = output_dir / 'README.md'
  380. with open(index_file, 'w', encoding='utf-8') as f:
  381. f.write("\n".join(lines))
  382. print(f"✓ 索引文件已生成: {index_file.name}")
  383. def main():
  384. """主函数"""
  385. parser = argparse.ArgumentParser(
  386. description="导出数据库表结构信息到单独的文件",
  387. formatter_class=argparse.RawDescriptionHelpFormatter,
  388. epilog="""
  389. 示例:
  390. # 导出为JSON格式(默认)
  391. python export_table_schemas.py
  392. # 导出为Markdown格式
  393. python export_table_schemas.py --format markdown
  394. # 指定输出目录
  395. python export_table_schemas.py --output-dir ../custom_output
  396. # 只导出特定的表
  397. python export_table_schemas.py --tables aigcspace.models aigcspace.users
  398. """
  399. )
  400. parser.add_argument(
  401. "--format",
  402. choices=["json", "markdown"],
  403. default="json",
  404. help="输出格式 (默认: json)"
  405. )
  406. parser.add_argument(
  407. "--output-dir",
  408. type=str,
  409. default="database_info",
  410. help="输出目录路径 (默认: database_info)"
  411. )
  412. parser.add_argument(
  413. "--tables",
  414. nargs="*",
  415. help="指定要导出的表名(格式: schema.table),默认导出所有表"
  416. )
  417. parser.add_argument(
  418. "--database-url",
  419. type=str,
  420. help="数据库连接URL (默认: 使用项目配置)"
  421. )
  422. args = parser.parse_args()
  423. try:
  424. print("="*60)
  425. print("数据库表结构导出工具")
  426. print("="*60)
  427. print()
  428. # 初始化导出器
  429. print("正在连接数据库...")
  430. exporter = TableSchemaExporter(args.database_url)
  431. print("✓ 数据库连接成功")
  432. print()
  433. # 确定输出目录(相对于backend目录)
  434. script_dir = Path(__file__).parent
  435. backend_dir = script_dir.parent
  436. output_dir = backend_dir / args.output_dir
  437. output_dir = output_dir.resolve() # 转换为绝对路径
  438. # 导出表
  439. if args.tables:
  440. # 导出指定的表
  441. print(f"导出指定的 {len(args.tables)} 个表...")
  442. output_dir.mkdir(parents=True, exist_ok=True)
  443. for table_full_name in args.tables:
  444. if '.' in table_full_name:
  445. schema, table = table_full_name.split('.', 1)
  446. else:
  447. schema = 'public'
  448. table = table_full_name
  449. try:
  450. exporter.export_table(schema, table, output_dir, args.format)
  451. except Exception as e:
  452. print(f"✗ 导出 {table_full_name} 失败: {e}")
  453. else:
  454. # 导出所有表
  455. exporter.export_all_tables(output_dir, args.format)
  456. except Exception as e:
  457. print(f"\n❌ 执行失败: {e}")
  458. import traceback
  459. traceback.print_exc()
  460. sys.exit(1)
  461. if __name__ == "__main__":
  462. main()