فهرست منبع

feat: 添加 PostgreSQL 容器到 docker-compose,弃用远程 PG,新增数据备份和迁移脚本

- docker-compose.yml: 新增 db 服务 (postgres:16-alpine),web 添加 depends_on
- docker-compose.dev.yml: 简化,db 服务提升到主 compose
- .env: 注释旧远程 PG 配置,改为连接本地 db 容器
- app/config.py: 移除 SQLite 回退逻辑
- entrypoint.sh: 添加 migrate_link_to_text.py 执行
- backup/maas_collect_init.sql: 远程 PG 完整备份 (结构+数据)
- migrate_sqlite_to_pg.py: SQLite → PG 数据迁移脚本
- dump_pg_to_sql.py: PG → SQL 文件导出工具
kinglee 6 روز پیش
والد
کامیت
6bddb23064
8فایلهای تغییر یافته به همراه1304 افزوده شده و 39 حذف شده
  1. 12 5
      .env
  2. 7 7
      app/config.py
  3. 905 0
      backup/maas_collect_init.sql
  4. 0 27
      docker-compose.dev.yml
  5. 23 0
      docker-compose.yml
  6. 167 0
      dump_pg_to_sql.py
  7. 1 0
      entrypoint.sh
  8. 189 0
      migrate_sqlite_to_pg.py

+ 12 - 5
.env

@@ -1,9 +1,16 @@
-# PostgreSQL 数据库配置
-DB_HOST=47.109.147.74
+# PostgreSQL 数据库配置(本地容器)
+DB_HOST=db
 DB_PORT=5432
-DB_USER=maas_collect
-DB_PASSWORD=jLDMdRjYZyWLJSfr
-DB_NAME=maas_collect
+DB_USER=liaowang
+DB_PASSWORD=liaowang_secret
+DB_NAME=liaowang_db
+
+# 旧远程 PG 配置(已弃用)
+# DB_HOST=47.109.147.74
+# DB_PORT=5432
+# DB_USER=maas_collect
+# DB_PASSWORD=jLDMdRjYZyWLJSfr
+# DB_NAME=maas_collect
 
 # 自动拼接的 SQLAlchemy URI(一般无需修改)
 DATABASE_URL=postgresql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/${DB_NAME}

+ 7 - 7
app/config.py

@@ -5,8 +5,6 @@ from dotenv import load_dotenv
 env_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env')
 load_dotenv(env_path, interpolate=True)
 
-basedir = os.path.abspath(os.path.dirname(__file__))
-
 def _build_database_uri():
     """从 .env 中的独立配置项拼接 DATABASE_URL"""
     user = os.environ.get('DB_USER', '')
@@ -14,15 +12,17 @@ def _build_database_uri():
     host = os.environ.get('DB_HOST', '')
     port = os.environ.get('DB_PORT', '5432')
     name = os.environ.get('DB_NAME', '')
-    if all([user, password, host, name]):
-        return f'postgresql://{user}:{password}@{host}:{port}/{name}'
-    return os.environ.get('DATABASE_URL', '')
+    if not all([user, password, host, name]):
+        raise RuntimeError(
+            "缺少 PostgreSQL 数据库配置,请在 .env 中设置 "
+            "DB_USER, DB_PASSWORD, DB_HOST, DB_NAME"
+        )
+    return f'postgresql://{user}:{password}@{host}:{port}/{name}'
 
 class Config:
     APP_NAME = os.environ.get('APP_NAME', '路桥采集平台')
     SECRET_KEY = os.environ.get('SECRET_KEY') or 'you-will-never-guess'
-    SQLALCHEMY_DATABASE_URI = _build_database_uri() or \
-        'sqlite:///' + os.path.join(basedir, 'app.db')
+    SQLALCHEMY_DATABASE_URI = _build_database_uri()
     SQLALCHEMY_TRACK_MODIFICATIONS = False
 
     # JWT 配置(用于本地 Token 签发)

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 905 - 0
backup/maas_collect_init.sql


+ 0 - 27
docker-compose.dev.yml

@@ -4,37 +4,10 @@ services:
   web:
     environment:
       - FLASK_DEBUG=true
-      - DB_HOST=db
-      - DB_USER=${DB_USER:-liaowang}
-      - DB_PASSWORD=${DB_PASSWORD:-liaowang_secret}
-      - DB_NAME=${DB_NAME:-liaowang_db}
     volumes:
       # 开发模式:挂载代码实现热重载
       - .:/app
       - /app/venv
-    depends_on:
-      db:
-        condition: service_healthy
     # 开发模式不需要健康检查
     healthcheck:
       disable: true
-
-  db:
-    image: postgres:16-alpine
-    container_name: liaowang-db
-    environment:
-      POSTGRES_DB: ${DB_NAME:-liaowang_db}
-      POSTGRES_USER: ${DB_USER:-liaowang}
-      POSTGRES_PASSWORD: ${DB_PASSWORD:-liaowang_secret}
-    ports:
-      - "5432:5432"
-    volumes:
-      - postgres_data:/var/lib/postgresql/data
-    healthcheck:
-      test: ["CMD-SHELL", "pg_isready -U ${DB_USER:-liaowang}"]
-      interval: 10s
-      timeout: 5s
-      retries: 5
-
-volumes:
-  postgres_data:

+ 23 - 0
docker-compose.yml

@@ -1,6 +1,23 @@
 version: '3.8'
 
 services:
+  db:
+    image: postgres:16-alpine
+    container_name: liaowang-db
+    environment:
+      POSTGRES_DB: ${DB_NAME:-liaowang_db}
+      POSTGRES_USER: ${DB_USER:-liaowang}
+      POSTGRES_PASSWORD: ${DB_PASSWORD:-liaowang_secret}
+    ports:
+      - "5432:5432"
+    volumes:
+      - postgres_data:/var/lib/postgresql/data
+    healthcheck:
+      test: ["CMD-SHELL", "pg_isready -U ${DB_USER:-liaowang}"]
+      interval: 10s
+      timeout: 5s
+      retries: 5
+
   web:
     build: .
     container_name: liaowang-web
@@ -27,6 +44,9 @@ services:
     env_file:
       - .env
     restart: unless-stopped
+    depends_on:
+      db:
+        condition: service_healthy
     # 健康检查
     healthcheck:
       test: ["CMD", "curl", "-f", "http://localhost:5000"]
@@ -34,3 +54,6 @@ services:
       timeout: 10s
       retries: 3
       start_period: 15s
+
+volumes:
+  postgres_data:

+ 167 - 0
dump_pg_to_sql.py

@@ -0,0 +1,167 @@
+"""
+从远程 PostgreSQL 导出完整结构和数据为 SQL 文件。
+用法: python dump_pg_to_sql.py > backup/maas_collect_init.sql
+"""
+import os
+import sys
+import io
+import psycopg2
+from dotenv import load_dotenv
+
+# 确保 stdout 使用 UTF-8
+if sys.stdout.encoding != 'utf-8':
+    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
+
+load_dotenv()
+
+DB_USER = os.environ.get("DB_USER", "")
+DB_PASSWORD = os.environ.get("DB_PASSWORD", "")
+DB_HOST = os.environ.get("DB_HOST", "")
+DB_PORT = os.environ.get("DB_PORT", "5432")
+DB_NAME = os.environ.get("DB_NAME", "")
+
+if not all([DB_USER, DB_PASSWORD, DB_HOST, DB_NAME]):
+    print("ERROR: 缺少 PG 配置", file=sys.stderr)
+    sys.exit(1)
+
+PG_URI = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
+
+# 需要跳过的系统表
+SKIP_TABLES = {"alembic_version"}
+
+def sql_escape_string(val):
+    if val is None:
+        return "NULL"
+    return "'" + str(val).replace("'", "''") + "'"
+
+def dump():
+    conn = psycopg2.connect(PG_URI)
+    cur = conn.cursor()
+
+    # 获取所有用户表
+    cur.execute("""
+        SELECT tablename FROM pg_tables
+        WHERE schemaname = 'public'
+        ORDER BY tablename
+    """)
+    tables = [r[0] for r in cur.fetchall() if r[0] not in SKIP_TABLES]
+
+    print("-- ============================================")
+    print(f"-- Dump of {DB_NAME} from {DB_HOST}")
+    print("-- ============================================")
+    print()
+
+    # 需要特殊处理的保留字表名
+    QUOTED_TABLES = {"user"}
+
+    for table in tables:
+        print(f"-- Table: {table}")
+
+        safe_table = f'"{table}"' if table in QUOTED_TABLES else table
+
+        # 获取列信息
+        cur.execute(f"""
+            SELECT column_name, data_type, character_maximum_length, is_nullable
+            FROM information_schema.columns
+            WHERE table_name = '{table}'
+            ORDER BY ordinal_position
+        """)
+        columns = cur.fetchall()
+
+        # 获取主键
+        cur.execute(f"""
+            SELECT a.attname
+            FROM pg_index i
+            JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
+            WHERE i.indrelid = '{safe_table}'::regclass AND i.indisprimary
+        """)
+        pk_cols = [r[0] for r in cur.fetchall()]
+
+        # 获取唯一约束
+        cur.execute(f"""
+            SELECT conname,
+                   array_agg(a.attname ORDER BY array_position(conkey, a.attnum))
+            FROM pg_constraint c
+            JOIN pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = ANY(c.conkey)
+            WHERE c.contype = 'u' AND c.conrelid = '{safe_table}'::regclass
+            GROUP BY conname, c.conrelid
+        """)
+        unique_constraints = cur.fetchall()
+
+        # 生成 CREATE TABLE
+        col_defs = []
+        for col_name, data_type, char_max_len, is_nullable in columns:
+            type_str = data_type
+            if data_type == "character varying" and char_max_len:
+                type_str = f"VARCHAR({char_max_len})"
+            elif data_type == "character":
+                type_str = f"CHAR({char_max_len or 1})"
+            elif data_type == "timestamp without time zone":
+                type_str = "TIMESTAMP"
+            elif data_type == "double precision":
+                type_str = "DOUBLE PRECISION"
+
+            null_str = "NOT NULL" if is_nullable == "NO" else "NULL"
+            col_defs.append(f"    {col_name} {type_str} {null_str}")
+
+        if pk_cols:
+            col_defs.append(f"    PRIMARY KEY ({', '.join(pk_cols)})")
+
+        for uc_name, uc_cols in unique_constraints:
+            col_defs.append(f"    CONSTRAINT {uc_name} UNIQUE ({', '.join(uc_cols)})")
+
+        print(f"DROP TABLE IF EXISTS {safe_table} CASCADE;")
+        print(f"CREATE TABLE {safe_table} (")
+        print(",\n".join(col_defs))
+        print(f");")
+        print()
+
+        # 生成 INSERT 语句
+        col_names = [c[0] for c in columns]
+        order_col = pk_cols[0] if pk_cols else col_names[0]
+        cur.execute(f"SELECT * FROM {safe_table} ORDER BY {order_col}")
+        rows = cur.fetchall()
+
+        if rows:
+            for row in rows:
+                values = []
+                for i, val in enumerate(row):
+                    col_name = col_names[i]
+                    col_type = columns[i][1]
+                    if val is None:
+                        values.append("NULL")
+                    elif col_type == "boolean":
+                        values.append("TRUE" if val else "FALSE")
+                    elif col_type in ("integer", "bigint", "smallint", "numeric", "double precision", "real"):
+                        values.append(str(val))
+                    else:
+                        values.append(sql_escape_string(val))
+
+                cols_str = ", ".join(col_names)
+                vals_str = ", ".join(values)
+                print(f"INSERT INTO {safe_table} ({cols_str}) VALUES ({vals_str});")
+            print()
+
+        # 重置序列
+        if pk_cols:
+            try:
+                cur.execute(f"SELECT setval('{table}_{pk_cols[0]}_seq', (SELECT MAX({pk_cols[0]}) FROM {safe_table}))")
+                print("-- Sequence reset done")
+            except psycopg2.Error:
+                pass
+
+        print()
+
+    cur.close()
+    conn.close()
+    print("-- Dump complete", file=sys.stderr)
+
+if __name__ == "__main__":
+    output_file = os.path.join(os.path.dirname(__file__), "backup", "maas_collect_init.sql")
+    os.makedirs(os.path.dirname(output_file), exist_ok=True)
+    old_stdout = sys.stdout
+    with open(output_file, "w", encoding="utf-8") as f:
+        sys.stdout = f
+        dump()
+    sys.stdout = old_stdout
+    print("Dump complete: backup/maas_collect_init.sql", file=sys.stderr)

+ 1 - 0
entrypoint.sh

@@ -3,6 +3,7 @@ set -e
 
 echo "Running database migration..."
 uv run python migrate_db.py || echo "Warning: migration failed, continuing anyway"
+uv run python migrate_link_to_text.py || echo "Warning: link migration failed, continuing anyway"
 
 echo "Starting Flask application..."
 exec uv run python run.py

+ 189 - 0
migrate_sqlite_to_pg.py

@@ -0,0 +1,189 @@
+"""
+SQLite → PostgreSQL 数据迁移脚本
+
+用法:
+    python migrate_sqlite_to_pg.py [--dry-run]
+
+--dry-run  只打印计划,不执行写入
+"""
+import os
+import sys
+import sqlite3
+import argparse
+from dotenv import load_dotenv
+
+load_dotenv()
+
+# ── PostgreSQL 连接 ──
+DB_USER = os.environ.get("DB_USER", "")
+DB_PASSWORD = os.environ.get("DB_PASSWORD", "")
+DB_HOST = os.environ.get("DB_HOST", "")
+DB_PORT = os.environ.get("DB_PORT", "5432")
+DB_NAME = os.environ.get("DB_NAME", "")
+
+if not all([DB_USER, DB_PASSWORD, DB_HOST, DB_NAME]):
+    print("ERROR: 缺少 PG 配置 (DB_USER/DB_PASSWORD/DB_HOST/DB_NAME)")
+    sys.exit(1)
+
+PG_URI = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
+
+# ── SQLite 路径 ──
+SQLITE_PATH = os.path.join(os.path.dirname(__file__), "app", "app.db")
+if not os.path.exists(SQLITE_PATH):
+    print(f"ERROR: 找不到 SQLite 数据库: {SQLITE_PATH}")
+    sys.exit(1)
+
+# ── 布尔列(SQLite 存 0/1,需转换为 PG boolean) ──
+BOOL_COLUMNS = {"has_pagination", "has_deep_collection", "is_active"}
+
+# ── 迁移顺序(按外键依赖) ──
+# (sqlite表名, pg表名, 跳过列, 需要额外处理的列)
+MIGRATION_PLAN = [
+    ("user", '"user"', set(), {}),
+    ("spider_source", "spider_source", set(), {}),
+    ("collection_task", "collection_task", set(), {}),
+    # spider_task 是旧表,已不在 models.py 中,跳过
+    ("spider_result", "spider_result", set(), {}),
+    ("deep_collection", "deep_collection", set(), {}),
+    ("ai_model", "ai_model", set(), {}),
+    ("token_usage_log", "token_usage_log", set(), {}),
+    ("ai_conversation", "ai_conversation", set(), {}),
+    ("ai_message", "ai_message", set(), {}),
+    ("knowledge_import_task", "knowledge_import_task", set(), {}),
+]
+
+
+def get_sqlite_tables():
+    conn = sqlite3.connect(SQLITE_PATH)
+    cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
+    tables = {r[0] for r in cursor.fetchall()}
+    conn.close()
+    return tables
+
+
+def get_sqlite_data(table):
+    conn = sqlite3.connect(SQLITE_PATH)
+    cursor = conn.execute(f"SELECT * FROM {table}")
+    columns = [desc[0] for desc in cursor.description]
+    rows = cursor.fetchall()
+    conn.close()
+    return columns, rows
+
+
+def quote_identifier(name):
+    """如果标识符未被引号包裹,添加双引号。"""
+    if name.startswith('"') and name.endswith('"'):
+        return name
+    return f'"{name}"'
+
+
+def migrate(dry_run=False):
+    import psycopg2
+
+    print(f"SQLite: {SQLITE_PATH}")
+    print(f"PG:     {PG_URI.replace(DB_PASSWORD, '***')}")
+    print()
+
+    pg = psycopg2.connect(PG_URI)
+    pg.autocommit = False
+    cur = pg.cursor()
+
+    sqlite_tables = get_sqlite_tables()
+    total_inserted = 0
+    total_skipped = 0
+
+    for sqlite_table, pg_table, skip_cols, _ in MIGRATION_PLAN:
+        if sqlite_table not in sqlite_tables:
+            print(f"  [skip] SQLite 中不存在表 '{sqlite_table}'")
+            continue
+
+        columns, rows = get_sqlite_data(sqlite_table)
+        insert_cols = [c for c in columns if c not in skip_cols]
+        col_indices = [i for i, c in enumerate(columns) if c not in skip_cols]
+
+        if not rows:
+            print(f"  [empty] {sqlite_table} -> {pg_table} (0 行)")
+            continue
+
+        print(f"  [{sqlite_table}] -> [{pg_table}]: {len(rows)} 行")
+
+        if dry_run:
+            total_inserted += len(rows)
+            continue
+
+        # 禁用触发器(跳过 FK 检查)来清空表
+        quoted = quote_identifier(pg_table)
+        try:
+            cur.execute(f"ALTER TABLE {quoted} DISABLE TRIGGER ALL")
+            cur.execute(f"TRUNCATE TABLE {quoted} RESTART IDENTITY CASCADE")
+            cur.execute(f"ALTER TABLE {quoted} ENABLE TRIGGER ALL")
+        except psycopg2.Error as e:
+            print(f"    WARN: TRUNCATE {pg_table} 失败: {e}")
+            pg.rollback()
+            continue
+
+        # 批量插入
+        inserted = 0
+        skipped = 0
+        for row in rows:
+            values = []
+            for i, idx in enumerate(col_indices):
+                val = row[idx]
+                col_name = columns[idx]
+                if val in (0, 1) and col_name in BOOL_COLUMNS:
+                    val = bool(val)
+                values.append(val)
+
+            placeholders = ", ".join(["%s"] * len(values))
+            col_names = ", ".join(insert_cols)
+            sql = f"INSERT INTO {quoted} ({col_names}) VALUES ({placeholders})"
+            try:
+                cur.execute(sql, values)
+                inserted += 1
+            except psycopg2.Error as e:
+                skipped += 1
+                pg.rollback()
+                # 对于 url 超长问题,尝试用 TEXT 列存储
+                if "value too long" in str(e) and sqlite_table == "deep_collection":
+                    try:
+                        # 临时修改列类型
+                        cur.execute("ALTER TABLE deep_collection ALTER COLUMN url TYPE TEXT")
+                        cur.execute(sql, values)
+                        inserted += 1
+                        skipped -= 1
+                        print(f"    NOTE: deep_collection.url 已自动改为 TEXT 类型")
+                    except psycopg2.Error as e2:
+                        print(f"    WARN: 插入 {pg_table} 失败: {e2}")
+                        continue
+                else:
+                    print(f"    WARN: 插入 {pg_table} 失败 (row {inserted + skipped}): {e}")
+                    continue
+
+        # 重置序列
+        if inserted > 0:
+            try:
+                seq_sql = f"SELECT setval(pg_get_serial_sequence('{pg_table}', 'id'), (SELECT COALESCE(MAX(id), 1) FROM {quoted}))"
+                cur.execute(seq_sql)
+            except psycopg2.Error:
+                pass
+
+        total_inserted += inserted
+        total_skipped += skipped
+        status = "OK" if skipped == 0 else "PARTIAL"
+        print(f"    [{status}] 插入 {inserted} 行, 跳过 {skipped} 行")
+
+    if not dry_run:
+        pg.commit()
+        print(f"\n迁移完成! 共插入 {total_inserted} 行, 跳过 {total_skipped} 行")
+    else:
+        print(f"\n[DRY RUN] 将插入 ~{total_inserted} 行")
+
+    cur.close()
+    pg.close()
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(description="SQLite → PostgreSQL 数据迁移")
+    parser.add_argument("--dry-run", action="store_true", help="只打印计划,不执行")
+    args = parser.parse_args()
+    migrate(dry_run=args.dry_run)

برخی فایل ها در این مقایسه diff نمایش داده نمی شوند زیرا تعداد فایل ها بسیار زیاد است