Эх сурвалжийг харах

feat:修改入库逻辑

ZengChao 2 долоо хоног өмнө
parent
commit
ddd6c6d120

+ 1 - 1
.env

@@ -13,7 +13,7 @@ MILVUS_DB=lq_db
 MILVUS_USER=
 MILVUS_PASSWORD=
 
-DATABASE_URL=mysql+aiomysql://root:lq123@192.168.92.61:13306/lq_oauth_db
+DATABASE_URL=mysql+aiomysql://root:Lq123456!@192.168.92.61:13306/lq_db
 # OpenAI Embedding向量模型配置
 EMBEDDING_BASE_URL=http://192.168.91.253:9003/v1
 EMBEDDING_MODEL=Qwen3-Embedding-8B

+ 3 - 3
src/app/scripts/base_check.py

@@ -1,10 +1,10 @@
 # ===================== 配置区(只改这里) =====================
-ROOT_DIR = r"G:\临时文件\3个"              # 根目录
-EXCEL_PATH = r"C:\Users\ZengChao\Desktop\id.xlsx"    # Excel 路径
+ROOT_DIR = r"G:\需要汇总的编制依据"              # 根目录
+EXCEL_PATH = r"C:\Users\ZengChao\Desktop\编制依据.xlsx"    # Excel 路径
 SHEET_NAME = None                       # None 表示使用第一个 sheet
 ID_COLUMN = "id"                        # Excel 的 id 列名
 CN_NAME_COLUMN = "中文名"               # Excel 的中文名列名
-OUTPUT_JSON = r"G:\临时文件\3个\issues.json"    # 输出 JSON 路径
+OUTPUT_JSON = r"C:\Users\ZengChao\Desktop\编制依据issues.json"    # 输出 JSON 路径
 
 IGNORE_HIDDEN = True                    # 是否忽略隐藏/临时文件
 # ============================================================

+ 3 - 3
src/app/scripts/base_count.py

@@ -20,11 +20,11 @@ import pandas as pd
 # =========================
 # 变量都放这里,方便改
 # =========================
-ROOT_DIR = r"F:\编制依据修复2"          # 根目录:下面都是编号子目录
-EXCEL_PATH = r"C:\Users\ZengChao\Desktop\id.xlsx"   # Excel 路径
+ROOT_DIR = r"G:\最终编制依据"          # 根目录:下面都是编号子目录
+EXCEL_PATH = r"C:\Users\ZengChao\Desktop\编制依据.xlsx"   # Excel 路径
 SHEET_NAME = 0                       # sheet 名称或索引(0 表示第一个)
 ID_COL = "id"                        # id 列名
-OUT_JSON = r"F:\编制依据修复2/missing_ids.json"     # 输出 json 路径
+OUT_JSON = r"C:\Users\ZengChao\Desktop/编制依据missing_ids.json"     # 输出 json 路径
 
 # 输出格式是否同时包含统计信息
 OUTPUT_WITH_META = True

+ 50 - 13
src/app/scripts/base_in_collection.py

@@ -18,6 +18,9 @@ from app.config.setting import settings
 # 根目录配置
 ROOT_FOLDER = r"C:\Users\ZengChao\Desktop\新建文件夹 (2)"
 
+# 失败汇总JSON保存路径
+FAILED_REPORT_PATH = r"C:\Users\ZengChao\Desktop\base_collection_failed_report.json"
+
 # Collection 名称
 PARENT_COLLECTION_NAME = "test_27_parent"
 CHILD_COLLECTION_NAME = "test_27_child"
@@ -55,7 +58,7 @@ def build_metadata(doc_data: Dict[str, Any], hierarchy: str, file_url: str) -> D
 
 
 def insert_parent_rows(client: MilvusClient, collection_name: str, parent_rows: List[Dict[str, Any]], 
-                       doc_data: Dict[str, Any], doc_id: str, folder_name: str) -> int:
+                       doc_data: Dict[str, Any], doc_id: str, folder_name: str) -> tuple[int, str | None]:
     """
     插入 parent 数据到 Milvus。
     
@@ -71,7 +74,7 @@ def insert_parent_rows(client: MilvusClient, collection_name: str, parent_rows:
         插入的行数
     """
     if not parent_rows:
-        return 0
+        return 0, None
     
     # 获取 embeddings 客户端
     embeddings = get_embeddings()
@@ -105,14 +108,14 @@ def insert_parent_rows(client: MilvusClient, collection_name: str, parent_rows:
     
     try:
         client.insert(collection_name=collection_name, data=entities)
-        return len(entities)
+        return len(entities), None
     except Exception as e:
         print(f"📁 {folder_name} ❌ 插入 parent 失败: {e}")
-        return 0
+        return 0, str(e)
 
 
 def insert_child_rows(client: MilvusClient, collection_name: str, child_rows: List[Dict[str, Any]], 
-                      doc_data: Dict[str, Any], doc_id: str, folder_name: str) -> int:
+                      doc_data: Dict[str, Any], doc_id: str, folder_name: str) -> tuple[int, str | None]:
     """
     插入 children 数据到 Milvus。
     
@@ -128,7 +131,7 @@ def insert_child_rows(client: MilvusClient, collection_name: str, child_rows: Li
         插入的行数
     """
     if not child_rows:
-        return 0
+        return 0, None
     
     # 获取 embeddings 客户端
     embeddings = get_embeddings()
@@ -162,13 +165,13 @@ def insert_child_rows(client: MilvusClient, collection_name: str, child_rows: Li
     
     try:
         client.insert(collection_name=collection_name, data=entities)
-        return len(entities)
+        return len(entities), None
     except Exception as e:
         print(f"📁 {folder_name} ❌ 插入 children 失败: {e}")
-        return 0
+        return 0, str(e)
 
 
-def process_folder(root_folder: str | Path) -> Dict[str, int]:
+def process_folder(root_folder: str | Path) -> Dict[str, Any]:
     """
     处理文件夹结构,导入 Milvus。
     
@@ -184,7 +187,7 @@ def process_folder(root_folder: str | Path) -> Dict[str, int]:
     
     client = get_milvusclient()
     
-    stats = {"success": 0, "failed": 0, "skipped": 0, "parent_rows": 0, "child_rows": 0}
+    stats = {"success": 0, "failed": 0, "skipped": 0, "parent_rows": 0, "child_rows": 0, "failed_items": []}
     
     for subfolder in sorted(root_folder.iterdir()):
         if not subfolder.is_dir():
@@ -212,14 +215,38 @@ def process_folder(root_folder: str | Path) -> Dict[str, int]:
             
             if not doc_data or not doc_data.get("id"):
                 print(f"📁 {folder_name} ❌ JSON格式错误或缺少doc/id")
+                stats["failed_items"].append({
+                    "folder": folder_name,
+                    "error": "JSON格式错误或缺少doc/id",
+                    "reason": "JSON格式错误或缺少doc/id",
+                })
                 stats["failed"] += 1
                 continue
             
             doc_id = doc_data.get("id")
             
             # 插入 parent 和 children
-            parent_count = insert_parent_rows(client, PARENT_COLLECTION_NAME, parent_rows, doc_data, doc_id, folder_name)
-            child_count = insert_child_rows(client, CHILD_COLLECTION_NAME, child_rows, doc_data, doc_id, folder_name)
+            parent_count, parent_error = insert_parent_rows(
+                client, PARENT_COLLECTION_NAME, parent_rows, doc_data, doc_id, folder_name
+            )
+            child_count, child_error = insert_child_rows(
+                client, CHILD_COLLECTION_NAME, child_rows, doc_data, doc_id, folder_name
+            )
+
+            if parent_error:
+                stats["failed_items"].append({
+                    "folder": folder_name,
+                    "error": "parent 入库失败",
+                    "reason": parent_error,
+                })
+                stats["failed"] += 1
+            if child_error:
+                stats["failed_items"].append({
+                    "folder": folder_name,
+                    "error": "child 入库失败",
+                    "reason": child_error,
+                })
+                stats["failed"] += 1
             
             if parent_count > 0 or child_count > 0:
                 print(f"📁 {folder_name} ✅ parent: {parent_count}, child: {child_count}")
@@ -231,7 +258,13 @@ def process_folder(root_folder: str | Path) -> Dict[str, int]:
                 stats["skipped"] += 1
                 
         except Exception as e:
-            print(f"📁 {folder_name} ❌ {str(e)}")
+            error_message = str(e)
+            print(f"📁 {folder_name} ❌ {error_message}")
+            stats["failed_items"].append({
+                "folder": folder_name,
+                "error": "未知错误",
+                "reason": error_message,
+            })
             stats["failed"] += 1
     
     return stats
@@ -248,10 +281,14 @@ def main():
         print("-" * 60)
         
         stats = process_folder(ROOT_FOLDER)
+
+        with open(FAILED_REPORT_PATH, "w", encoding="utf-8") as f:
+            json.dump({"failed": stats["failed_items"]}, f, ensure_ascii=False, indent=2)
         
         print("\n" + "=" * 60)
         print(f"✅ 成功: {stats['success']} | ❌ 失败: {stats['failed']} | ⊘ 跳过: {stats['skipped']}")
         print(f"📊 Parent 行数: {stats['parent_rows']} | Child 行数: {stats['child_rows']}")
+        print(f"❌ 失败汇总JSON: {FAILED_REPORT_PATH}")
         print("=" * 60)
         
     except Exception as e:

+ 31 - 6
src/app/scripts/base_in_minio.py

@@ -17,10 +17,13 @@ from app.config.minio_client import get_minio_client
 from app.config.setting import settings
 
 # 目标路径前缀
-PREFIX = "sampledata/base"
+PREFIX = "sampledata/standard"
 
 # 根目录配置:每个子文件夹包含 1 个原始文件 + 1 个 .md + 1 个 .json
-ROOT_FOLDER = r"C:\Users\ZengChao\Desktop\新建文件夹"
+ROOT_FOLDER = r"F:\第二阶段编制依据及施工方案数据治理-20260206\最终编制依据"
+
+# 失败汇总JSON保存路径
+FAILED_REPORT_PATH = r"F:\第二阶段编制依据及施工方案数据治理-20260206\base_minio_failed_report.json"
 
 
 def find_files(subfolder: Path) -> Tuple[Path, Path, Path]:
@@ -70,7 +73,7 @@ def main() -> None:
     if not client.bucket_exists(bucket):
         client.make_bucket(bucket)
 
-    stats = {"success": 0, "failed": 0, "skipped": 0}
+    stats = {"success": 0, "failed": 0, "skipped": 0, "failed_items": []}
 
     for subfolder in sorted(root.iterdir()):
         if not subfolder.is_dir():
@@ -98,17 +101,39 @@ def main() -> None:
             print(f"📁 {folder_name} ✅ 已上传: {file_obj}, {md_obj}, {json_obj}")
             stats["success"] += 1
         except (FileNotFoundError, ValueError) as e:
-            print(f"📁 {folder_name} ⊘ {e}")
+            error_message = str(e)
+            print(f"📁 {folder_name} ⊘ {error_message}")
+            stats["failed_items"].append({
+                "folder": folder_name,
+                "error": "文件或JSON校验失败",
+                "reason": error_message,
+            })
             stats["skipped"] += 1
         except S3Error as e:
-            print(f"📁 {folder_name} ❌ MinIO 错误: {e}")
+            error_message = str(e)
+            print(f"📁 {folder_name} ❌ MinIO 错误: {error_message}")
+            stats["failed_items"].append({
+                "folder": folder_name,
+                "error": "MinIO 上传失败",
+                "reason": error_message,
+            })
             stats["failed"] += 1
         except Exception as e:
-            print(f"📁 {folder_name} ❌ {e}")
+            error_message = str(e)
+            print(f"📁 {folder_name} ❌ {error_message}")
+            stats["failed_items"].append({
+                "folder": folder_name,
+                "error": "未知错误",
+                "reason": error_message,
+            })
             stats["failed"] += 1
 
+    with open(FAILED_REPORT_PATH, "w", encoding="utf-8") as f:
+        json.dump({"failed": stats["failed_items"]}, f, ensure_ascii=False, indent=2)
+
     print("\n" + "=" * 60)
     print(f"✅ 成功: {stats['success']} | ❌ 失败: {stats['failed']} | ⊘ 跳过: {stats['skipped']}")
+    print(f"❌ 失败汇总JSON: {FAILED_REPORT_PATH}")
     print("=" * 60)
 
 

+ 114 - 37
src/app/scripts/base_info_in_database.py

@@ -16,11 +16,41 @@ from app.config.database import get_async_sessionmaker, get_async_engine
 from app.config.setting import settings
 
 
+# ============================================
+# 配置参数区域 - 便于修改
+# ============================================
+
+# 数据库表名配置
+TABLE_DOCUMENT_MAIN = "t_samp_document_main"  # 文档主表
+TABLE_STANDARD_BASE_INFO = "t_samp_standard_base_info"  # 标准基础信息表
+
 # 默认创建人/修改人ID
 DEFAULT_USER_ID = "ed6a79d3-0083-4d81-8b48-fc522f686f74"
 
 # 根目录配置
-ROOT_FOLDER = r"C:\Users\ZengChao\Desktop\新建文件夹"
+ROOT_FOLDER = r"F:\第二阶段编制依据及施工方案数据治理-20260206\最终编制依据"
+
+# 失败汇总JSON保存路径
+FAILED_REPORT_PATH = r"F:\第二阶段编制依据及施工方案数据治理-20260206\base_db_failed_report.json"
+
+# 默认值配置 - 需要修改时只改这里
+DOCUMENT_MAIN_DEFAULTS = {
+    "conversion_status": 2,  # 2-完成
+    "whether_to_enter": 0,  # 0-未入库
+    "conversion_error": None,
+    "source_type": "standard",
+    "kb_id": "a61e1530-9ff3-4640-b2f7-fe7c9edfcbc1",
+    "kb_method": "parent_child",
+    "whether_to_task": 0,
+    "created_by": DEFAULT_USER_ID,
+    "updated_by": DEFAULT_USER_ID,
+}
+
+STANDARD_BASE_INFO_DEFAULTS = {
+    "created_by": DEFAULT_USER_ID,
+    "updated_by": DEFAULT_USER_ID,
+    "note": None,
+}
 
 
 def parse_date(date_str: Optional[str]) -> Optional[str]:
@@ -36,7 +66,7 @@ def parse_date(date_str: Optional[str]) -> Optional[str]:
         return None
 
 
-async def insert_document_main(session, doc_data: Dict[str, Any], title: str, file_extension: str, file_url: str, md_url: str, json_url: str, folder_name: str = "") -> bool:
+async def insert_document_main(session, doc_data: Dict[str, Any], title: str, file_extension: str, file_url: str, md_url: str, json_url: str, folder_name: str = "") -> tuple[bool, Optional[str]]:
     """
     插入文档主表 t_samp_document_main
     
@@ -51,48 +81,53 @@ async def insert_document_main(session, doc_data: Dict[str, Any], title: str, fi
         folder_name: 文件夹名称,用于错误打印
     
     Returns:
-        是否插入成功
+        (是否插入成功, 失败原因)
     """
     try:
         doc_id = doc_data.get("id")
         
-        sql = text("""
-            INSERT INTO t_samp_document_main (
+        sql = text(f"""
+            INSERT INTO {TABLE_DOCUMENT_MAIN} (
                 id, title, conversion_status, whether_to_enter,
-                file_url, md_url, json_url, file_extension,
+                conversion_error, file_url, md_url, json_url, file_extension,
                 created_by, created_time, updated_by, updated_time,
-                source_type
+                source_type, kb_id, kb_method, whether_to_task
             ) VALUES (
                 :id, :title, :conversion_status, :whether_to_enter,
-                :file_url, :md_url, :json_url, :file_extension,
+                :conversion_error, :file_url, :md_url, :json_url, :file_extension,
                 :created_by, :created_time, :updated_by, :updated_time,
-                :source_type
+                :source_type, :kb_id, :kb_method, :whether_to_task
             )
         """)
         
         await session.execute(sql, {
             "id": doc_id,
             "title": title,
-            "conversion_status": 2,  # 2-完成
-            "whether_to_enter": 0,   # 0-未入库
+            "conversion_status": DOCUMENT_MAIN_DEFAULTS["conversion_status"],
+            "whether_to_enter": DOCUMENT_MAIN_DEFAULTS["whether_to_enter"],
+            "conversion_error": DOCUMENT_MAIN_DEFAULTS["conversion_error"],
             "file_url": file_url,
             "md_url": md_url,
             "json_url": json_url,
             "file_extension": file_extension,
-            "created_by": DEFAULT_USER_ID,
+            "created_by": DOCUMENT_MAIN_DEFAULTS["created_by"],
             "created_time": datetime.now(),
-            "updated_by": DEFAULT_USER_ID,
+            "updated_by": DOCUMENT_MAIN_DEFAULTS["updated_by"],
             "updated_time": datetime.now(),
-            "source_type": "basis",
+            "source_type": DOCUMENT_MAIN_DEFAULTS["source_type"],
+            "kb_id": DOCUMENT_MAIN_DEFAULTS["kb_id"],
+            "kb_method": DOCUMENT_MAIN_DEFAULTS["kb_method"],
+            "whether_to_task": DOCUMENT_MAIN_DEFAULTS["whether_to_task"],
         })
         
-        return True
+        return True, None
     except Exception as e:
-        print(f"📁 {folder_name} ❌ 插入主表失败: {e}")
-        return False
+        error_message = str(e)
+        print(f"📁 {folder_name} ❌ 插入主表失败: {error_message}")
+        return False, error_message
 
 
-async def insert_standard_base_info(session, doc_data: Dict[str, Any], folder_name: str = "") -> bool:
+async def insert_standard_base_info(session, doc_data: Dict[str, Any], folder_name: str = "") -> tuple[bool, Optional[str]]:
     """
     插入标准基础信息表 t_samp_standard_base_info
     
@@ -102,7 +137,7 @@ async def insert_standard_base_info(session, doc_data: Dict[str, Any], folder_na
         folder_name: 文件夹名称,用于错误打印
     
     Returns:
-        是否插入成功
+        (是否插入成功, 失败原因)
     """
     try:
         doc_id = doc_data.get("id")
@@ -117,21 +152,23 @@ async def insert_standard_base_info(session, doc_data: Dict[str, Any], folder_na
             drafting_unit = None
             participating_units = None
         
-        sql = text("""
-            INSERT INTO t_samp_standard_base_info (
+        sql = text(f"""
+            INSERT INTO {TABLE_STANDARD_BASE_INFO} (
                 id, chinese_name, english_name, standard_number,
                 issuing_authority, release_date, implementation_date,
                 drafting_unit, approving_department, participating_units,
                 document_type, professional_field, engineering_phase, validity,
                 reference_basis, source_url,
-                created_by, created_time, updated_by, updated_time
+                created_by, created_time, updated_by, updated_time,
+                note
             ) VALUES (
                 :id, :chinese_name, :english_name, :standard_number,
                 :issuing_authority, :release_date, :implementation_date,
                 :drafting_unit, :approving_department, :participating_units,
                 :document_type, :professional_field, :engineering_phase, :validity,
                 :reference_basis, :source_url,
-                :created_by, :created_time, :updated_by, :updated_time
+                :created_by, :created_time, :updated_by, :updated_time,
+                :note
             )
         """)
         
@@ -152,19 +189,21 @@ async def insert_standard_base_info(session, doc_data: Dict[str, Any], folder_na
             "validity": doc_data.get("validity"),
             "reference_basis": doc_data.get("reference_basis"),
             "source_url": doc_data.get("source_url"),
-            "created_by": DEFAULT_USER_ID,
+            "created_by": STANDARD_BASE_INFO_DEFAULTS["created_by"],
             "created_time": datetime.now(),
-            "updated_by": DEFAULT_USER_ID,
+            "updated_by": STANDARD_BASE_INFO_DEFAULTS["updated_by"],
             "updated_time": datetime.now(),
+            "note": STANDARD_BASE_INFO_DEFAULTS["note"],
         })
         
-        return True
+        return True, None
     except Exception as e:
-        print(f"📁 {folder_name} ❌ 插入基础信息表失败: {e}")
-        return False
+        error_message = str(e)
+        print(f"📁 {folder_name} ❌ 插入基础信息表失败: {error_message}")
+        return False, error_message
 
 
-async def process_folder(root_folder: str | Path) -> Dict[str, int]:
+async def process_folder(root_folder: str | Path) -> Dict[str, Any]:
     """
     处理文件夹结构,导入数据库
     
@@ -180,7 +219,7 @@ async def process_folder(root_folder: str | Path) -> Dict[str, int]:
     
     SessionMaker = get_async_sessionmaker()
     
-    stats = {"success": 0, "failed": 0, "skipped": 0}
+    stats = {"success": 0, "failed": 0, "skipped": 0, "failed_items": []}
     
     # 遍历子文件夹
     for subfolder in sorted(root_folder.iterdir()):
@@ -206,6 +245,11 @@ async def process_folder(root_folder: str | Path) -> Dict[str, int]:
             doc_data = data.get("doc")
             if not doc_data or not doc_data.get("id"):
                 print(f"📁 {folder_name} ❌ (JSON格式错误或缺少doc/id)")
+                stats["failed_items"].append({
+                    "folder": folder_name,
+                    "error": "JSON格式错误或缺少doc/id",
+                    "reason": "JSON格式错误或缺少doc/id",
+                })
                 stats["failed"] += 1
                 continue
             
@@ -221,6 +265,11 @@ async def process_folder(root_folder: str | Path) -> Dict[str, int]:
             
             if not original_file:
                 print(f"📁 {folder_name} ❌ (未找到原始文件)")
+                stats["failed_items"].append({
+                    "folder": folder_name,
+                    "error": "未找到原始文件",
+                    "reason": "未找到原始文件",
+                })
                 stats["failed"] += 1
                 continue
             
@@ -229,22 +278,34 @@ async def process_folder(root_folder: str | Path) -> Dict[str, int]:
             file_extension = original_file.suffix  # 扩展名(含点)
             
             # 构造 URL
-            file_url = f"/base/{doc_id}{file_extension}"
-            md_url = f"/base/{doc_id}.md"
-            json_url = f"/base/{doc_id}.json"
+            file_url = f"/standard/{doc_id}{file_extension}"
+            md_url = f"/standard/{doc_id}.md"
+            json_url = f"/standard/{doc_id}.json"
             
             # 插入数据库
             async with SessionMaker() as session:
                 try:
                     # 先插入主表
-                    if not await insert_document_main(session, doc_data, title, file_extension, file_url, md_url, json_url, folder_name):
+                    main_ok, main_error = await insert_document_main(session, doc_data, title, file_extension, file_url, md_url, json_url, folder_name)
+                    if not main_ok:
                         await session.rollback()
+                        stats["failed_items"].append({
+                            "folder": folder_name,
+                            "error": "插入主表失败",
+                            "reason": main_error,
+                        })
                         stats["failed"] += 1
                         continue
                     
                     # 再插入基础信息表
-                    if not await insert_standard_base_info(session, doc_data, folder_name):
+                    base_ok, base_error = await insert_standard_base_info(session, doc_data, folder_name)
+                    if not base_ok:
                         await session.rollback()
+                        stats["failed_items"].append({
+                            "folder": folder_name,
+                            "error": "插入基础信息表失败",
+                            "reason": base_error,
+                        })
                         stats["failed"] += 1
                         continue
                     
@@ -253,12 +314,24 @@ async def process_folder(root_folder: str | Path) -> Dict[str, int]:
                     stats["success"] += 1
                     
                 except Exception as e:
+                    error_message = str(e)
                     await session.rollback()
-                    print(f"📁 {folder_name} ❌ ({str(e)})")
+                    print(f"📁 {folder_name} ❌ ({error_message})")
+                    stats["failed_items"].append({
+                        "folder": folder_name,
+                        "error": "未知错误",
+                        "reason": error_message,
+                    })
                     stats["failed"] += 1
                 
         except Exception as e:
-            print(f"📁 {folder_name} ❌ ({str(e)})")
+            error_message = str(e)
+            print(f"📁 {folder_name} ❌ ({error_message})")
+            stats["failed_items"].append({
+                "folder": folder_name,
+                "error": "未知错误",
+                "reason": error_message,
+            })
             stats["failed"] += 1
     
     return stats
@@ -273,9 +346,13 @@ async def main():
         print("-" * 60)
         
         stats = await process_folder(ROOT_FOLDER)
+
+        with open(FAILED_REPORT_PATH, "w", encoding="utf-8") as f:
+            json.dump({"failed": stats["failed_items"]}, f, ensure_ascii=False, indent=2)
         
         print("\n" + "=" * 60)
         print(f"✅ 成功: {stats['success']} | ❌ 失败: {stats['failed']} | ⊘ 跳过: {stats['skipped']}")
+        print(f"❌ 失败汇总JSON: {FAILED_REPORT_PATH}")
         print("=" * 60)
         
     except Exception as e:

+ 11 - 6
src/app/scripts/base_info_json_generation.py

@@ -22,10 +22,12 @@ from app.models import StandardBaseInfo  # noqa: F401  # 仅用于类型/一致
 
 
 # ==================== 配置参数 ====================
-EXCEL_FILE = r"C:\Users\ZengChao\Desktop\id.xlsx"      # ✅ Excel文件路径
-ROOT_FOLDER = r"C:\Users\ZengChao\Desktop\新建文件夹"   # ✅ 根文件夹路径
+EXCEL_FILE = r"C:\Users\ZengChao\Desktop\编制依据_output3.xlsx"      # ✅ Excel文件路径
+ROOT_FOLDER = r"F:\第二阶段编制依据及施工方案数据治理-20260206\最终编制依据"   # ✅ 根文件夹路径
 SHEET_INDEX = 0                                        # 目标sheet索引(0为第一个sheet)
 
+FAILED_REPORT_PATH = r"F:\第二阶段编制依据及施工方案数据治理-20260206\base_json_failed_report.json"  # ✅ 失败汇总JSON保存路径
+
 PARENT_MAX_CHARS = 6000                                # ✅ 父段最大长度(超长切片)
 CHILD_INDEX_START = 0                                  # ✅ children.index 起始
 # ================================================
@@ -314,7 +316,7 @@ class StandardInfoGenerator:
 
         return result
 
-    def process_folder_structure(self, root_folder: str | Path) -> dict[str, list[str]]:
+    def process_folder_structure(self, root_folder: str | Path) -> dict[str, list[Any]]:
         root_folder = Path(root_folder)
         if not root_folder.is_dir():
             raise NotADirectoryError(f"不是有效的文件夹: {root_folder}")
@@ -371,12 +373,11 @@ class StandardInfoGenerator:
                 with open(output_path, "w", encoding="utf-8") as f:
                     json.dump(out_json, f, ensure_ascii=False, indent=2)
 
-                print(f"📄 {folder_name} ✅ (已生成: {output_path.name})")
                 results["success"].append(folder_name)
 
             except Exception as e:
                 print(f"📄 {folder_name} ❌ ({str(e)})")
-                results["failed"].append(f"{folder_name} ({str(e)})")
+                results["failed"].append({"folder": folder_name, "error": str(e)})
 
         return results
 
@@ -386,8 +387,12 @@ def main():
         generator = StandardInfoGenerator(EXCEL_FILE, sheet_index=SHEET_INDEX)
         results = generator.process_folder_structure(ROOT_FOLDER)
 
+        with open(FAILED_REPORT_PATH, "w", encoding="utf-8") as f:
+            json.dump({"failed": results["failed"]}, f, ensure_ascii=False, indent=2)
+
         print("\n" + "=" * 60)
-        print(f"✅ 成功: {len(results['success'])} | ❌ 失败: {len(results['failed'])} | ⊘ 跳过: {len(results['skipped'])}")
+        print(f"❌ 失败: {len(results['failed'])} | ⊘ 跳过: {len(results['skipped'])}")
+        print(f"❌ 失败汇总JSON: {FAILED_REPORT_PATH}")
         print("=" * 60)
     except Exception as e:
         print(f"❌ 错误: {str(e)}")

+ 187 - 0
src/app/scripts/plan_check.py

@@ -0,0 +1,187 @@
+# ===================== 配置区(只改这里) =====================
+ROOT_DIR = r"G:\需要汇总的施工方案"              # 根目录
+EXCEL_PATH = r"C:\Users\ZengChao\Desktop\施工方案.xlsx"    # Excel 路径
+SHEET_NAME = None                       # None 表示使用第一个 sheet
+ID_COLUMN = "ID"                        # Excel 的 id 列名
+CN_NAME_COLUMN = "工程项目名称"               # Excel 的中文名列名
+OUTPUT_JSON = r"C:\Users\ZengChao\Desktop\施工方案issues.json"    # 输出 JSON 路径
+
+IGNORE_HIDDEN = True                    # 是否忽略隐藏/临时文件
+# ============================================================
+
+import json
+from pathlib import Path
+from typing import Any, Dict, List, Tuple
+from openpyxl import load_workbook
+
+
+def is_hidden(name: str) -> bool:
+    if name.startswith("."):
+        return True
+    if name.startswith("~$"):
+        return True
+    if name in {"Thumbs.db", "desktop.ini"}:
+        return True
+    return False
+
+
+def normalize_id(v: Any) -> str:
+    """Excel 里 id 可能是数字/浮点,统一转字符串。"""
+    if v is None:
+        return ""
+    if isinstance(v, bool):
+        return str(v).strip()
+    if isinstance(v, int):
+        return str(v).strip()
+    if isinstance(v, float):
+        return str(int(v)) if v.is_integer() else str(v).strip()
+    return str(v).strip()
+
+
+def clean_cn_name(name: Any) -> str:
+    """中文名去掉《》书名号 + 去空格"""
+    if name is None:
+        return ""
+    s = str(name).strip()
+    return s.replace("《", "").replace("》", "").strip()
+
+
+def load_excel_id_to_cnname(
+    excel_path: Path,
+    sheet_name: str | None,
+    id_col: str,
+    cn_name_col: str
+) -> Dict[str, str]:
+    wb = load_workbook(excel_path, read_only=True, data_only=True)
+    ws = wb[sheet_name] if sheet_name else wb.active
+
+    header_row = next(ws.iter_rows(min_row=1, max_row=1, values_only=True), None)
+    if not header_row:
+        raise ValueError("Excel 第一行表头为空。")
+
+    header = [str(x).strip() if x is not None else "" for x in header_row]
+    if id_col not in header:
+        raise ValueError(f"Excel 表头找不到列名:{id_col},实际表头:{header}")
+    if cn_name_col not in header:
+        raise ValueError(f"Excel 表头找不到列名:{cn_name_col},实际表头:{header}")
+
+    id_idx = header.index(id_col)
+    cn_idx = header.index(cn_name_col)
+
+    mapping: Dict[str, str] = {}
+    for row in ws.iter_rows(min_row=2, values_only=True):
+        rid = row[id_idx] if id_idx < len(row) else None
+        cname = row[cn_idx] if cn_idx < len(row) else None
+        rid_s = normalize_id(rid)
+        cname_s = clean_cn_name(cname)
+        if rid_s and cname_s:
+            mapping[rid_s] = cname_s
+    return mapping
+
+
+def list_items(d: Path) -> List[Path]:
+    items = []
+    for p in d.iterdir():
+        if IGNORE_HIDDEN and is_hidden(p.name):
+            continue
+        items.append(p)
+    return items
+
+
+def check_one_id_folder(id_folder: Path, excel_cn: str) -> Tuple[bool, List[str], str | None]:
+    """
+    检查某个编号目录:
+    - 目录内只能有3个条目:1个文件夹 + 1个md + 1个pdf
+    - 三者名字一致,得到 base_name
+    - excel_cn(去《》后) 是否 in base_name
+    """
+    problems: List[str] = []
+
+    items = list_items(id_folder)
+    if len(items) != 3:
+        return False, [f"条目数量不是3(实际 {len(items)})"], None
+
+    subfolders = [p for p in items if p.is_dir()]
+    mds = [p for p in items if p.is_file() and p.suffix.lower() == ".md"]
+    pdfs = [p for p in items if p.is_file() and p.suffix.lower() == ".pdf"]
+
+    if not (len(subfolders) == 1 and len(mds) == 1 and len(pdfs) == 1):
+        return False, [f"类型不符合:folder={len(subfolders)}, md={len(mds)}, pdf={len(pdfs)}(应为1/1/1)"], None
+
+    folder_name = subfolders[0].name.strip()
+    md_base = mds[0].stem.strip()
+    pdf_base = pdfs[0].stem.strip()
+
+    if not (folder_name == md_base == pdf_base):
+        return False, [f"三者名字不一致:folder='{folder_name}', md='{md_base}', pdf='{pdf_base}'"], None
+
+    base_name = folder_name
+
+
+
+    return (len(problems) == 0), problems, base_name
+
+
+def main():
+    root = Path(ROOT_DIR)
+    excel_path = Path(EXCEL_PATH)
+
+    if not root.exists() or not root.is_dir():
+        raise SystemExit(f"ROOT_DIR 不存在或不是目录:{root}")
+    if not excel_path.exists() or not excel_path.is_file():
+        raise SystemExit(f"EXCEL_PATH 不存在或不是文件:{excel_path}")
+
+    id_to_cn = load_excel_id_to_cnname(excel_path, SHEET_NAME, ID_COLUMN, CN_NAME_COLUMN)
+
+    issues = []
+    checked = 0
+
+    # 只遍历根目录的直接子文件夹:子文件夹名=编号
+    for id_folder in sorted(root.iterdir()):
+        if not id_folder.is_dir():
+            continue
+        if IGNORE_HIDDEN and is_hidden(id_folder.name):
+            continue
+
+        checked += 1
+        dir_id = id_folder.name.strip()
+
+        if dir_id not in id_to_cn:
+            issues.append({
+                "id": dir_id,
+                "path": str(id_folder),
+                "base_name": None,
+                "problems": [f"Excel 中找不到 id:'{dir_id}'"]
+            })
+            continue
+
+        excel_cn = id_to_cn[dir_id]  # 已去《》
+        ok, problems, base_name = check_one_id_folder(id_folder, excel_cn)
+        if not ok:
+            issues.append({
+                "id": dir_id,
+                "path": str(id_folder),
+                "base_name": base_name,
+                "excel_cn": excel_cn,
+                "problems": problems
+            })
+
+    result = {
+        "root_dir": str(root),
+        "excel_path": str(excel_path),
+        "checked_id_folders": checked,
+        "issue_count": len(issues),
+        "issues": issues,
+        "bad_ids": [x["id"] for x in issues],  # 你要的“有问题的文件夹编号”也单独列出来
+    }
+
+    out = Path(OUTPUT_JSON)
+    out.parent.mkdir(parents=True, exist_ok=True)
+    out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
+
+    print(f"检查完成:检查编号目录 {checked} 个;发现问题 {len(issues)} 个")
+    print(f"已输出:{out}")
+
+
+if __name__ == "__main__":
+    main()

+ 2 - 2
src/app/scripts/plan_count.py

@@ -1,9 +1,9 @@
 # ===================== 配置区(只改这里) =====================
-ROOT_DIR = r"F:\已修复的施工方案"              # 根目录:应存在子文件夹,子文件夹名=编号(id)
+ROOT_DIR = r"G:\最终施工方案"              # 根目录:应存在子文件夹,子文件夹名=编号(id)
 EXCEL_PATH = r"C:\Users\ZengChao\Desktop\施工方案.xlsx"    # Excel 路径
 SHEET_NAME = None                       # None=第一个sheet
 ID_COLUMN = "ID"                        # Excel 的 id 列名
-OUTPUT_JSON = r"C:\Users\ZengChao\Desktop\missing_folders.json"  # 输出 JSON
+OUTPUT_JSON = r"C:\Users\ZengChao\Desktop\施工方案missing_folders.json"  # 输出 JSON
 
 IGNORE_HIDDEN = True                    # 忽略隐藏/临时文件夹
 # ============================================================

+ 298 - 0
src/app/scripts/plan_info_in_collection.py

@@ -0,0 +1,298 @@
+"""
+将 JSON 文件中的 parent 和 children 数据插入 Milvus Collection。
+读取每个子文件夹的 JSON,解析 doc、parent、children 数组并构造入库数据。
+"""
+from __future__ import annotations
+
+import json
+from datetime import datetime
+from pathlib import Path
+from typing import Any, Dict, List
+
+from pymilvus import MilvusClient
+
+from app.config.embeddings import get_embeddings
+from app.config.milvus_client import get_milvusclient
+from app.config.setting import settings
+
+# 根目录配置
+ROOT_FOLDER = r"C:\Users\ZengChao\Desktop\施工方案文件夹"
+
+# 失败汇总JSON保存路径
+FAILED_REPORT_PATH = r"C:\Users\ZengChao\Desktop\plan_collection_failed_report.json"
+
+# Collection 名称
+PARENT_COLLECTION_NAME = "plan_parent"
+CHILD_COLLECTION_NAME = "plan_child"
+
+# 默认创建人/修改人ID
+DEFAULT_USER_ID = "ed6a79d3-0083-4d81-8b48-fc522f686f74"
+
+# MinIO URL 前缀
+PREFIX = "sampledata/plan"
+
+
+def build_metadata(doc_data: Dict[str, Any], hierarchy: str, file_url: str) -> Dict[str, Any]:
+    """
+    构造 metadata 字段。
+    
+    Args:
+        doc_data: JSON 中的 doc 数据
+        hierarchy: 文档层级信息
+        file_url: 文件 URL
+    
+    Returns:
+        metadata 字典
+    """
+    return {
+        "file_name": doc_data.get("plan_name", ""),
+        "plan_category": doc_data.get("plan_category", ""),
+        "project_name": doc_data.get("project_name", ""),
+        "compiling_unit": doc_data.get("compiling_unit", ""),
+        "compiling_date": doc_data.get("compiling_date", ""),
+        "hierarchy": hierarchy,
+        "file_url": file_url,
+        "plan_type_list": {}  # 空 JSON
+    }
+
+
+def insert_parent_rows(client: MilvusClient, collection_name: str, parent_rows: List[Dict[str, Any]], 
+                       doc_data: Dict[str, Any], doc_id: str, folder_name: str) -> tuple[int, str | None]:
+    """
+    插入 parent 数据到 Milvus。
+    
+    Args:
+        client: Milvus 客户端
+        collection_name: Collection 名称
+        parent_rows: parent 数组
+        doc_data: doc 数据
+        doc_id: 文档 ID
+        folder_name: 文件夹名称
+    
+    Returns:
+        插入的行数
+    """
+    if not parent_rows:
+        return 0, None
+    
+    # 获取 embeddings 客户端
+    embeddings = get_embeddings()
+    
+    file_url = f"{PREFIX}/{doc_id}.md"
+    now_ts = int(datetime.now().timestamp())
+    
+    # 批量提取所有文本
+    texts = [row.get("text", "") for row in parent_rows]
+    # 批量生成向量
+    vectors = embeddings.embed_documents(texts)
+    
+    entities = []
+    for idx, row in enumerate(parent_rows):
+        entity = {
+            "text": texts[idx],
+            "dense": vectors[idx],
+            "document_id": doc_id,
+            "parent_id": str(row.get("parent_id", "")),
+            "index": row.get("index", 0),
+            "tag_list": "",
+            "permission": {},
+            "metadata": build_metadata(doc_data, row.get("hierarchy", ""), file_url),
+            "is_deleted": False,
+            "created_by": DEFAULT_USER_ID,
+            "created_time": now_ts,
+            "updated_by": DEFAULT_USER_ID,
+            "updated_time": now_ts,
+        }
+        entities.append(entity)
+    
+    try:
+        client.insert(collection_name=collection_name, data=entities)
+        return len(entities), None
+    except Exception as e:
+        print(f"📁 {folder_name} ❌ 插入 parent 失败: {e}")
+        return 0, str(e)
+
+
+def insert_child_rows(client: MilvusClient, collection_name: str, child_rows: List[Dict[str, Any]], 
+                      doc_data: Dict[str, Any], doc_id: str, folder_name: str) -> tuple[int, str | None]:
+    """
+    插入 children 数据到 Milvus。
+    
+    Args:
+        client: Milvus 客户端
+        collection_name: Collection 名称
+        child_rows: children 数组
+        doc_data: doc 数据
+        doc_id: 文档 ID
+        folder_name: 文件夹名称
+    
+    Returns:
+        插入的行数
+    """
+    if not child_rows:
+        return 0, None
+    
+    # 获取 embeddings 客户端
+    embeddings = get_embeddings()
+    
+    file_url = f"{PREFIX}/{doc_id}.md"
+    now_ts = int(datetime.now().timestamp())
+    
+    # 批量提取所有文本
+    texts = [row.get("text", "") for row in child_rows]
+    # 批量生成向量
+    vectors = embeddings.embed_documents(texts)
+    
+    entities = []
+    for idx, row in enumerate(child_rows):
+        entity = {
+            "text": texts[idx],
+            "dense": vectors[idx],
+            "document_id": doc_id,
+            "parent_id": str(row.get("parent_id", "")),
+            "index": row.get("index", 0),
+            "tag_list": "",
+            "permission": {},
+            "metadata": build_metadata(doc_data, row.get("hierarchy", ""), file_url),
+            "is_deleted": False,
+            "created_by": DEFAULT_USER_ID,
+            "created_time": now_ts,
+            "updated_by": DEFAULT_USER_ID,
+            "updated_time": now_ts,
+        }
+        entities.append(entity)
+    
+    try:
+        client.insert(collection_name=collection_name, data=entities)
+        return len(entities), None
+    except Exception as e:
+        print(f"📁 {folder_name} ❌ 插入 children 失败: {e}")
+        return 0, str(e)
+
+
+def process_folder(root_folder: str | Path) -> Dict[str, Any]:
+    """
+    处理文件夹结构,导入 Milvus。
+    
+    Args:
+        root_folder: 根目录路径
+    
+    Returns:
+        统计信息字典
+    """
+    root_folder = Path(root_folder)
+    if not root_folder.is_dir():
+        raise NotADirectoryError(f"不是有效的文件夹: {root_folder}")
+    
+    client = get_milvusclient()
+    
+    stats = {"success": 0, "failed": 0, "skipped": 0, "parent_rows": 0, "child_rows": 0, "failed_items": []}
+    
+    for subfolder in sorted(root_folder.iterdir()):
+        if not subfolder.is_dir():
+            continue
+        
+        folder_name = subfolder.name
+        
+        # 查找 JSON 文件
+        json_files = list(subfolder.glob("*.json"))
+        if not json_files:
+            print(f"📁 {folder_name} ⊘ 无JSON文件")
+            stats["skipped"] += 1
+            continue
+        
+        json_path = json_files[0]
+        
+        try:
+            # 读取 JSON
+            with open(json_path, "r", encoding="utf-8") as f:
+                data = json.load(f)
+            
+            doc_data = data.get("doc")
+            parent_rows = data.get("parent", [])
+            child_rows = data.get("children", [])
+            
+            if not doc_data or not doc_data.get("id"):
+                print(f"📁 {folder_name} ❌ JSON格式错误或缺少doc/id")
+                stats["failed_items"].append({
+                    "folder": folder_name,
+                    "error": "JSON格式错误或缺少doc/id",
+                    "reason": "JSON格式错误或缺少doc/id",
+                })
+                stats["failed"] += 1
+                continue
+            
+            doc_id = doc_data.get("id")
+            
+            # 插入 parent 和 children
+            parent_count, parent_error = insert_parent_rows(
+                client, PARENT_COLLECTION_NAME, parent_rows, doc_data, doc_id, folder_name
+            )
+            child_count, child_error = insert_child_rows(
+                client, CHILD_COLLECTION_NAME, child_rows, doc_data, doc_id, folder_name
+            )
+
+            if parent_error:
+                stats["failed_items"].append({
+                    "folder": folder_name,
+                    "error": "parent 入库失败",
+                    "reason": parent_error,
+                })
+                stats["failed"] += 1
+            if child_error:
+                stats["failed_items"].append({
+                    "folder": folder_name,
+                    "error": "child 入库失败",
+                    "reason": child_error,
+                })
+                stats["failed"] += 1
+            
+            if parent_count > 0 or child_count > 0:
+                print(f"📁 {folder_name} ✅ parent: {parent_count}, child: {child_count}")
+                stats["success"] += 1
+                stats["parent_rows"] += parent_count
+                stats["child_rows"] += child_count
+            else:
+                print(f"📁 {folder_name} ⊘ 无数据入库")
+                stats["skipped"] += 1
+                
+        except Exception as e:
+            error_message = str(e)
+            print(f"📁 {folder_name} ❌ {error_message}")
+            stats["failed_items"].append({
+                "folder": folder_name,
+                "error": "未知错误",
+                "reason": error_message,
+            })
+            stats["failed"] += 1
+    
+    return stats
+
+
+def main():
+    """主函数"""
+    try:
+        print(f"🔍 开始导入 Milvus...")
+        print(f"📂 根目录: {ROOT_FOLDER}")
+        print(f"🔗 Milvus: {settings.MILVUS_HOST}:{settings.MILVUS_PORT}")
+        print(f"📊 Parent Collection: {PARENT_COLLECTION_NAME}")
+        print(f"📊 Child Collection: {CHILD_COLLECTION_NAME}")
+        print("-" * 60)
+        
+        stats = process_folder(ROOT_FOLDER)
+
+        with open(FAILED_REPORT_PATH, "w", encoding="utf-8") as f:
+            json.dump({"failed": stats["failed_items"]}, f, ensure_ascii=False, indent=2)
+        
+        print("\n" + "=" * 60)
+        print(f"✅ 成功: {stats['success']} | ❌ 失败: {stats['failed']} | ⊘ 跳过: {stats['skipped']}")
+        print(f"📊 Parent 行数: {stats['parent_rows']} | Child 行数: {stats['child_rows']}")
+        print(f"❌ 失败汇总JSON: {FAILED_REPORT_PATH}")
+        print("=" * 60)
+        
+    except Exception as e:
+        print(f"❌ 错误: {str(e)}")
+
+
+if __name__ == "__main__":
+    main()

+ 378 - 0
src/app/scripts/plan_info_in_database.py

@@ -0,0 +1,378 @@
+"""
+将JSON文件中的施工方案信息导入数据库
+读取文件夹下的JSON文件,填充 t_samp_document_main 和 t_samp_construction_plan_base_info 表
+"""
+from __future__ import annotations
+
+import asyncio
+import json
+from datetime import datetime
+from pathlib import Path
+from typing import Any, Dict, Optional
+
+from sqlalchemy import text
+
+from app.config.database import get_async_sessionmaker, get_async_engine
+from app.config.setting import settings
+
+
+
+# ============================================
+# 配置参数区域 - 便于修改
+# ============================================
+
+# 数据库表名配置
+TABLE_DOCUMENT_MAIN = "t_samp_document_main"  # 文档主表
+TABLE_CONSTRUCTION_PLAN_BASE_INFO = "t_samp_construction_plan_base_info"  # 施工方案基础信息表
+
+# 默认创建人/修改人ID
+DEFAULT_USER_ID = "ed6a79d3-0083-4d81-8b48-fc522f686f74"
+
+# 根目录配置
+ROOT_FOLDER = r"C:\Users\ZengChao\Desktop\1"
+
+# 失败汇总JSON保存路径
+FAILED_REPORT_PATH = r"C:\Users\ZengChao\Desktop\plan_db_failed_report.json"
+
+# 默认值配置 - 需要修改时只改这里
+DOCUMENT_MAIN_DEFAULTS = {
+    "conversion_status": 2,  # 2-完成
+    "whether_to_enter": 0,  # 0-未入库
+    "conversion_error": None,
+    "source_type": "construction_plan",
+    "kb_id": None,
+    "kb_method": None,
+    "whether_to_task": 0,
+    "created_by": DEFAULT_USER_ID,
+    "updated_by": DEFAULT_USER_ID,
+}
+
+PLAN_BASE_INFO_DEFAULTS = {
+    "note": None,
+    "compilation_basis": None,
+    "level_1_classification": "施工方案",
+    "level_2_classification": None,
+    "level_3_classification": None,
+    "level_4_classification": None,
+    "created_by": DEFAULT_USER_ID,
+    "updated_by": DEFAULT_USER_ID,
+}
+
+
+async def insert_document_main(
+    session,
+    doc_data: Dict[str, Any],
+    title: str,
+    file_extension: str,
+    file_url: str,
+    md_url: str,
+    json_url: str,
+    folder_name: str = "",
+) -> tuple[bool, Optional[str]]:
+    """
+    插入文档主表 t_samp_document_main
+    """
+    try:
+        doc_id = doc_data.get("id")
+
+        sql = text(f"""
+            INSERT INTO {TABLE_DOCUMENT_MAIN} (
+                id, title, conversion_status, whether_to_enter,
+                conversion_error, file_url, md_url, json_url, file_extension,
+                created_by, created_time, updated_by, updated_time,
+                source_type, kb_id, kb_method, whether_to_task
+            ) VALUES (
+                :id, :title, :conversion_status, :whether_to_enter,
+                :conversion_error, :file_url, :md_url, :json_url, :file_extension,
+                :created_by, :created_time, :updated_by, :updated_time,
+                :source_type, :kb_id, :kb_method, :whether_to_task
+            )
+        """)
+
+        await session.execute(sql, {
+            "id": doc_id,
+            "title": title,
+            "conversion_status": DOCUMENT_MAIN_DEFAULTS["conversion_status"],
+            "whether_to_enter": DOCUMENT_MAIN_DEFAULTS["whether_to_enter"],
+            "conversion_error": DOCUMENT_MAIN_DEFAULTS["conversion_error"],
+            "file_url": file_url,
+            "md_url": md_url,
+            "json_url": json_url,
+            "file_extension": file_extension,
+            "created_by": DOCUMENT_MAIN_DEFAULTS["created_by"],
+            "created_time": datetime.now(),
+            "updated_by": DOCUMENT_MAIN_DEFAULTS["updated_by"],
+            "updated_time": datetime.now(),
+            "source_type": DOCUMENT_MAIN_DEFAULTS["source_type"],
+            "kb_id": DOCUMENT_MAIN_DEFAULTS["kb_id"],
+            "kb_method": DOCUMENT_MAIN_DEFAULTS["kb_method"],
+            "whether_to_task": DOCUMENT_MAIN_DEFAULTS["whether_to_task"],
+        })
+
+        return True, None
+    except Exception as e:
+        error_message = str(e)
+        print(f"📁 {folder_name} ❌ 插入主表失败: {error_message}")
+        return False, error_message
+
+
+def parse_date(date_str: Optional[str]) -> Optional[str]:
+    """解析日期字符串为数据库格式"""
+    if not date_str:
+        return None
+    try:
+        # 如果已经是 YYYY-MM-DD 格式,直接返回
+        if isinstance(date_str, str) and len(date_str) == 10:
+            return date_str
+        return None
+    except Exception:
+        return None
+
+
+async def insert_plan_base_info(session, doc_data: Dict[str, Any], folder_name: str = "") -> tuple[bool, Optional[str]]:
+    """
+    插入施工方案基础信息表 t_samp_construction_plan_base_info
+    
+    Args:
+        session: 数据库会话
+        doc_data: JSON中的doc数据
+        folder_name: 文件夹名称,用于错误打印
+    
+    Returns:
+        (是否插入成功, 失败原因)
+    """
+    try:
+        doc_id = doc_data.get("id")
+        
+        note = doc_data.get("note")
+        if note is None:
+            note = PLAN_BASE_INFO_DEFAULTS["note"]
+
+        compilation_basis = doc_data.get("compilation_basis")
+        if compilation_basis is None:
+            compilation_basis = PLAN_BASE_INFO_DEFAULTS["compilation_basis"]
+
+        level_1_classification = doc_data.get("level_1_classification")
+        if not level_1_classification:
+            level_1_classification = PLAN_BASE_INFO_DEFAULTS["level_1_classification"]
+
+        level_2_classification = doc_data.get("level_2_classification")
+        if level_2_classification is None:
+            level_2_classification = PLAN_BASE_INFO_DEFAULTS["level_2_classification"]
+
+        level_3_classification = doc_data.get("level_3_classification")
+        if level_3_classification is None:
+            level_3_classification = PLAN_BASE_INFO_DEFAULTS["level_3_classification"]
+
+        level_4_classification = doc_data.get("level_4_classification")
+        if level_4_classification is None:
+            level_4_classification = PLAN_BASE_INFO_DEFAULTS["level_4_classification"]
+
+        sql = text(f"""
+            INSERT INTO {TABLE_CONSTRUCTION_PLAN_BASE_INFO} (
+                id, plan_name, project_name, project_section,
+                compiling_unit, compiling_date, plan_summary,
+                compilation_basis, note, plan_category,
+                level_1_classification, level_2_classification,
+                level_3_classification, level_4_classification,
+                created_by, created_time, updated_by, updated_time
+            ) VALUES (
+                :id, :plan_name, :project_name, :project_section,
+                :compiling_unit, :compiling_date, :plan_summary,
+                :compilation_basis, :note, :plan_category,
+                :level_1_classification, :level_2_classification,
+                :level_3_classification, :level_4_classification,
+                :created_by, :created_time, :updated_by, :updated_time
+            )
+        """)
+        
+        await session.execute(sql, {
+            "id": doc_id,
+            "plan_name": doc_data.get("plan_name"),
+            "project_name": doc_data.get("project_name"),
+            "project_section": doc_data.get("project_section"),
+            "compiling_unit": doc_data.get("compiling_unit"),
+            "compiling_date": parse_date(doc_data.get("compiling_date")),
+            "plan_summary": doc_data.get("plan_summary"),
+            "compilation_basis": compilation_basis,
+            "note": note,
+            "plan_category": doc_data.get("plan_category"),
+            "level_1_classification": level_1_classification,
+            "level_2_classification": level_2_classification,
+            "level_3_classification": level_3_classification,
+            "level_4_classification": level_4_classification,
+            "created_by": PLAN_BASE_INFO_DEFAULTS["created_by"],
+            "created_time": datetime.now(),
+            "updated_by": PLAN_BASE_INFO_DEFAULTS["updated_by"],
+            "updated_time": datetime.now(),
+        })
+        
+        return True, None
+    except Exception as e:
+        error_message = str(e)
+        print(f"📁 {folder_name} ❌ 插入施工方案基础信息表失败: {error_message}")
+        return False, error_message
+
+
+async def process_folder(root_folder: str | Path) -> Dict[str, Any]:
+    """
+    处理文件夹结构,导入数据库
+    
+    Args:
+        root_folder: 根目录路径
+    
+    Returns:
+        统计信息字典
+    """
+    root_folder = Path(root_folder)
+    if not root_folder.is_dir():
+        raise NotADirectoryError(f"不是有效的文件夹: {root_folder}")
+    
+    SessionMaker = get_async_sessionmaker()
+    
+    stats = {"success": 0, "failed": 0, "skipped": 0, "failed_items": []}
+    
+    # 遍历子文件夹
+    for subfolder in sorted(root_folder.iterdir()):
+        if not subfolder.is_dir():
+            continue
+        
+        folder_name = subfolder.name
+        
+        # 查找JSON文件
+        json_files = list(subfolder.glob("*.json"))
+        if not json_files:
+            print(f"📁 {folder_name} ⊘ (无JSON文件)")
+            stats["skipped"] += 1
+            continue
+        
+        json_path = json_files[0]
+        
+        try:
+            # 读取JSON
+            with open(json_path, "r", encoding="utf-8") as f:
+                data = json.load(f)
+            
+            doc_data = data.get("doc")
+            if not doc_data or not doc_data.get("id"):
+                print(f"📁 {folder_name} ❌ (JSON格式错误或缺少doc/id)")
+                stats["failed_items"].append({
+                    "folder": folder_name,
+                    "error": "JSON格式错误或缺少doc/id",
+                    "reason": "JSON格式错误或缺少doc/id",
+                })
+                stats["failed"] += 1
+                continue
+            
+            doc_id = doc_data.get("id")
+
+            # 查找原始文件(非 .json 和 .md 的文件)
+            all_files = list(subfolder.glob("*"))
+            original_file = None
+            for f in all_files:
+                if f.is_file() and f.suffix not in [".json", ".md"]:
+                    original_file = f
+                    break
+
+            if not original_file:
+                print(f"📁 {folder_name} ❌ (未找到原始文件)")
+                stats["failed_items"].append({
+                    "folder": folder_name,
+                    "error": "未找到原始文件",
+                    "reason": "未找到原始文件",
+                })
+                stats["failed"] += 1
+                continue
+
+            # 获取文件信息
+            title = original_file.stem  # 文件名(不含扩展名)
+            file_extension = original_file.suffix  # 扩展名(含点)
+
+            # 构造 URL
+            file_url = f"/plan/{doc_id}{file_extension}"
+            md_url = f"/plan/{doc_id}.md"
+            json_url = f"/plan/{doc_id}.json"
+
+            # 插入数据库
+            async with SessionMaker() as session:
+                try:
+                    # 先插入主表
+                    main_ok, main_error = await insert_document_main(session, doc_data, title, file_extension, file_url, md_url, json_url, folder_name)
+                    if not main_ok:
+                        await session.rollback()
+                        stats["failed_items"].append({
+                            "folder": folder_name,
+                            "error": "插入主表失败",
+                            "reason": main_error,
+                        })
+                        stats["failed"] += 1
+                        continue
+
+                    # 再插入施工方案基础信息表
+                    plan_ok, plan_error = await insert_plan_base_info(session, doc_data, folder_name)
+                    if not plan_ok:
+                        await session.rollback()
+                        stats["failed_items"].append({
+                            "folder": folder_name,
+                            "error": "插入施工方案基础信息表失败",
+                            "reason": plan_error,
+                        })
+                        stats["failed"] += 1
+                        continue
+                    
+                    await session.commit()
+                    print(f"📁 {folder_name} ✅")
+                    stats["success"] += 1
+                    
+                except Exception as e:
+                    error_message = str(e)
+                    await session.rollback()
+                    print(f"📁 {folder_name} ❌ ({error_message})")
+                    stats["failed_items"].append({
+                        "folder": folder_name,
+                        "error": "未知错误",
+                        "reason": error_message,
+                    })
+                    stats["failed"] += 1
+                
+        except Exception as e:
+            error_message = str(e)
+            print(f"📁 {folder_name} ❌ ({error_message})")
+            stats["failed_items"].append({
+                "folder": folder_name,
+                "error": "未知错误",
+                "reason": error_message,
+            })
+            stats["failed"] += 1
+    
+    return stats
+
+
+async def main():
+    """主函数"""
+    try:
+        print(f"🔍 开始导入数据库...")
+        print(f"📂 根目录: {ROOT_FOLDER}")
+        print(f"🔗 数据库: {settings.DATABASE_URL}")
+        print("-" * 60)
+        
+        stats = await process_folder(ROOT_FOLDER)
+
+        with open(FAILED_REPORT_PATH, "w", encoding="utf-8") as f:
+            json.dump({"failed": stats["failed_items"]}, f, ensure_ascii=False, indent=2)
+        
+        print("\n" + "=" * 60)
+        print(f"✅ 成功: {stats['success']} | ❌ 失败: {stats['failed']} | ⊘ 跳过: {stats['skipped']}")
+        print(f"❌ 失败汇总JSON: {FAILED_REPORT_PATH}")
+        print("=" * 60)
+        
+    except Exception as e:
+        print(f"❌ 错误: {str(e)}")
+    finally:
+        # 关闭数据库引擎,避免事件循环关闭警告
+        engine = get_async_engine()
+        await engine.dispose()
+
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 141 - 0
src/app/scripts/plan_info_in_minio.py

@@ -0,0 +1,141 @@
+"""
+将每个子文件夹中的原始文件、JSON、MD 上传至 MinIO。
+上传路径规则(object_name):
+- 原始文件: /plan/{id}{原始扩展名}
+- Markdown : /plan/{id}.md
+- JSON     : /plan/{id}.json
+"""
+from __future__ import annotations
+
+import json
+from pathlib import Path
+from typing import Tuple
+
+from minio.error import S3Error
+
+from app.config.minio_client import get_minio_client
+from app.config.setting import settings
+
+# 目标路径前缀
+PREFIX = "sampledata/plan"
+
+# 根目录配置:每个子文件夹包含 1 个原始文件 + 1 个 .md + 1 个 .json
+ROOT_FOLDER = r"C:\Users\ZengChao\Desktop\施工方案文件夹"
+
+# 失败汇总JSON保存路径
+FAILED_REPORT_PATH = r"C:\Users\ZengChao\Desktop\plan_minio_failed_report.json"
+
+
+def find_files(subfolder: Path) -> Tuple[Path, Path, Path]:
+    """返回 (original_file, md_file, json_file),找不到则抛异常。"""
+    json_files = list(subfolder.glob("*.json"))
+    if not json_files:
+        raise FileNotFoundError("未找到 json 文件")
+    json_file = json_files[0]
+
+    md_files = list(subfolder.glob("*.md"))
+    if not md_files:
+        raise FileNotFoundError("未找到 md 文件")
+    md_file = md_files[0]
+
+    original_file = None
+    for f in subfolder.iterdir():
+        if f.is_file() and f.suffix not in {".json", ".md"}:
+            original_file = f
+            break
+    if original_file is None:
+        raise FileNotFoundError("未找到原始文件")
+
+    return original_file, md_file, json_file
+
+
+def upload_file(client, bucket: str, object_name: str, file_path: Path) -> None:
+    """上传文件到 MinIO,使用 put_object。"""
+    with file_path.open("rb") as fp:
+        result=client.put_object(
+            bucket_name=bucket,
+            object_name=object_name,
+            data=fp,
+            length=file_path.stat().st_size,
+        )
+    # print(f"    ✅ 上传成功: {result.object_name} (源文件: {file_path.name})")
+
+
+def main() -> None:
+    root = Path(ROOT_FOLDER)
+    if not root.is_dir():
+        raise NotADirectoryError(f"不是有效的目录: {root}")
+
+    client = get_minio_client()
+    bucket = settings.MINIO_BUCKET_NAME
+
+    # 确保桶存在
+    if not client.bucket_exists(bucket):
+        client.make_bucket(bucket)
+
+    stats = {"success": 0, "failed": 0, "skipped": 0, "failed_items": []}
+
+    for subfolder in sorted(root.iterdir()):
+        if not subfolder.is_dir():
+            continue
+        folder_name = subfolder.name
+        try:
+            original_file, md_file, json_file = find_files(subfolder)
+
+            with open(json_file, "r", encoding="utf-8") as f:
+                data = json.load(f)
+            doc = data.get("doc") or {}
+            doc_id = doc.get("id")
+            if not doc_id:
+                raise ValueError("JSON 中缺少 doc.id")
+
+            # 目标路径:/桶名/sampledata/plan/...
+            file_obj = f"{PREFIX}/{doc_id}{original_file.suffix}"
+            md_obj = f"{PREFIX}/{doc_id}.md"
+            json_obj = f"{PREFIX}/{doc_id}.json"
+
+            upload_file(client, bucket, file_obj, original_file)
+            upload_file(client, bucket, md_obj, md_file)
+            upload_file(client, bucket, json_obj, json_file)
+
+            print(f"📁 {folder_name} ✅ 已上传: {file_obj}, {md_obj}, {json_obj}")
+            stats["success"] += 1
+        except (FileNotFoundError, ValueError) as e:
+            error_message = str(e)
+            print(f"📁 {folder_name} ⊘ {error_message}")
+            stats["failed_items"].append({
+                "folder": folder_name,
+                "error": "文件或JSON校验失败",
+                "reason": error_message,
+            })
+            stats["skipped"] += 1
+        except S3Error as e:
+            error_message = str(e)
+            print(f"📁 {folder_name} ❌ MinIO 错误: {error_message}")
+            stats["failed_items"].append({
+                "folder": folder_name,
+                "error": "MinIO 上传失败",
+                "reason": error_message,
+            })
+            stats["failed"] += 1
+        except Exception as e:
+            error_message = str(e)
+            print(f"📁 {folder_name} ❌ {error_message}")
+            stats["failed_items"].append({
+                "folder": folder_name,
+                "error": "未知错误",
+                "reason": error_message,
+            })
+            stats["failed"] += 1
+
+    with open(FAILED_REPORT_PATH, "w", encoding="utf-8") as f:
+        json.dump({"failed": stats["failed_items"]}, f, ensure_ascii=False, indent=2)
+
+    print("\n" + "=" * 60)
+    print(f"✅ 成功: {stats['success']} | ❌ 失败: {stats['failed']} | ⊘ 跳过: {stats['skipped']}")
+    print(f"❌ 失败汇总JSON: {FAILED_REPORT_PATH}")
+    print("=" * 60)
+
+
+if __name__ == "__main__":
+    main()

+ 11 - 8
src/app/scripts/plan_info_json_generation.py

@@ -20,10 +20,12 @@ import pandas as pd
 
 
 # ==================== 配置参数 ====================
-EXCEL_FILE = r"C:\Users\ZengChao\Desktop\plan_id.xlsx"      # ✅ Excel文件路径
-ROOT_FOLDER = r"C:\Users\ZengChao\Desktop\施工方案文件夹"   # ✅ 根文件夹路径
+EXCEL_FILE = r"C:\Users\ZengChao\Desktop\施工方案.xlsx"      # ✅ Excel文件路径
+ROOT_FOLDER = r"C:\Users\ZengChao\Desktop\施工方案"  # ✅ 根文件夹路径
 SHEET_INDEX = 0                                               # 目标sheet索引(0为第一个sheet)
 
+FAILED_REPORT_PATH = r"C:\Users\ZengChao\Desktop\plan_json_failed_report.json"  # ✅ 失败汇总JSON保存路径
+
 PARENT_MAX_CHARS = 6000                                       # ✅ 父段最大长度(超长切片)
 CHILD_INDEX_START = 0                                         # ✅ children.index 起始
 EXCEL_ID_COLUMN = "ID"                                        # ✅ Excel主键列名
@@ -326,7 +328,7 @@ class PlanInfoGenerator:
 
         return result
 
-    def process_folder_structure(self, root_folder: str | Path) -> dict[str, list[str]]:
+    def process_folder_structure(self, root_folder: str | Path) -> dict[str, list[Any]]:
         root_folder = Path(root_folder)
         if not root_folder.is_dir():
             raise NotADirectoryError(f"不是有效的文件夹: {root_folder}")
@@ -381,12 +383,11 @@ class PlanInfoGenerator:
                 with open(output_path, "w", encoding="utf-8") as f:
                     json.dump(out_json, f, ensure_ascii=False, indent=2)
 
-                print(f"📄 {folder_name} ✅ (已生成: {output_path.name})")
                 results["success"].append(folder_name)
 
             except Exception as e:
                 print(f"📄 {folder_name} ❌ ({str(e)})")
-                results["failed"].append(f"{folder_name} ({str(e)})")
+                results["failed"].append({"folder": folder_name, "error": str(e)})
 
         return results
 
@@ -396,10 +397,12 @@ def main():
         generator = PlanInfoGenerator(EXCEL_FILE, sheet_index=SHEET_INDEX)
         results = generator.process_folder_structure(ROOT_FOLDER)
 
+        with open(FAILED_REPORT_PATH, "w", encoding="utf-8") as f:
+            json.dump({"failed": results["failed"]}, f, ensure_ascii=False, indent=2)
+
         print("\n" + "=" * 60)
-        print(
-            f"✅ 成功: {len(results['success'])} | ❌ 失败: {len(results['failed'])} | ⊘ 跳过: {len(results['skipped'])}"
-        )
+        print(f"❌ 失败: {len(results['failed'])} | ⊘ 跳过: {len(results['skipped'])}")
+        print(f"❌ 失败汇总JSON: {FAILED_REPORT_PATH}")
         print("=" * 60)
     except Exception as e:
         print(f"❌ 错误: {str(e)}")