Sfoglia il codice sorgente

feat:编制依据时效性mysql入库

ai02 4 settimane fa
parent
commit
d8282b2934
1 ha cambiato i file con 215 aggiunte e 0 eliminazioni
  1. 215 0
      src/app/scripts/status_info_in_database.py

+ 215 - 0
src/app/scripts/status_info_in_database.py

@@ -0,0 +1,215 @@
+"""
+将output.json中的标准状态信息导入数据库
+读取output.json文件,填充 t_samp_standard_base_info_status 表
+"""
+from __future__ import annotations
+
+import asyncio
+import json
+from datetime import datetime
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+from sqlalchemy import text
+
+from app.config.database import get_async_sessionmaker, get_async_engine
+from app.config.setting import settings
+
+
+# ============================================
+# 配置参数区域 - 便于修改
+# ============================================
+
+# 数据库表名配置
+TABLE_STANDARD_BASE_INFO_STATUS = "t_samp_standard_base_info_status"  # 标准基础信息状态表
+
+# 默认创建人/修改人ID
+DEFAULT_USER_ID = "ed6a79d3-0083-4d81-8b48-fc522f686f74"
+
+# output.json 文件路径
+OUTPUT_JSON_PATH = r"F:/时效性相关文档/output.json"
+
+# 失败汇总JSON保存路径
+FAILED_REPORT_PATH = r"E:\Desktop\luqiao\LQKgDataGovernance\src\app\scripts\status_db_failed_report.json"
+
+# 默认值配置
+STATUS_INFO_DEFAULTS = {
+    "created_by": DEFAULT_USER_ID,
+    "updated_by": DEFAULT_USER_ID,
+}
+
+
+async def insert_standard_base_info_status(
+    session,
+    item: Dict[str, Any]
+) -> tuple[bool, Optional[str]]:
+    """
+    插入标准基础信息状态表 t_samp_standard_base_info_status
+    
+    Args:
+        session: 数据库会话
+        item: JSON中的单条数据
+    
+    Returns:
+        (是否插入成功, 失败原因)
+    """
+    try:
+        # 字段映射(从JSON字段名到数据库字段名)
+        doc_id = item.get("id")
+        chinese_name = item.get("中文名")
+        standard_number = item.get("编号")
+        file_type = item.get("文件类型")
+        validity = item.get("状态(XH(现行)废止(FZ)")
+        
+        sql = text(f"""
+            INSERT INTO {TABLE_STANDARD_BASE_INFO_STATUS} (
+                id, chinese_name, standard_number, file_type, validity,
+                created_by, created_time, updated_by, updated_time
+            ) VALUES (
+                :id, :chinese_name, :standard_number, :file_type, :validity,
+                :created_by, :created_time, :updated_by, :updated_time
+            )
+        """)
+        
+        await session.execute(sql, {
+            "id": doc_id,
+            "chinese_name": chinese_name,
+            "standard_number": standard_number,
+            "file_type": file_type,
+            "validity": validity,
+            "created_by": STATUS_INFO_DEFAULTS["created_by"],
+            "created_time": datetime.now(),
+            "updated_by": STATUS_INFO_DEFAULTS["updated_by"],
+            "updated_time": datetime.now(),
+        })
+        
+        return True, None
+    except Exception as e:
+        error_message = str(e)
+        return False, error_message
+
+
+async def process_json_file(json_path: str | Path) -> Dict[str, Any]:
+    """
+    处理JSON文件,导入数据库
+    
+    Args:
+        json_path: JSON文件路径
+    
+    Returns:
+        统计信息字典
+    """
+    json_path = Path(json_path)
+    if not json_path.exists():
+        raise FileNotFoundError(f"JSON文件不存在: {json_path}")
+    
+    # 读取JSON文件
+    with open(json_path, "r", encoding="utf-8") as f:
+        data = json.load(f)
+    
+    # 确保数据是列表
+    if not isinstance(data, list):
+        raise ValueError(f"JSON数据格式错误: 期望数组,实际为 {type(data).__name__}")
+    
+    SessionMaker = get_async_sessionmaker()
+    
+    stats = {
+        "total": len(data),
+        "success": 0,
+        "failed": 0,
+        "skipped": 0,
+        "failed_items": []
+    }
+    
+    async with SessionMaker() as session:
+        for idx, item in enumerate(data, 1):
+            doc_id = item.get("id", f"第{idx}条")
+            
+            # 检查必要字段
+            if not item.get("id"):
+                print(f"📄 [{idx}/{len(data)}] ID={doc_id} ⊘ (缺少id字段,跳过)")
+                stats["skipped"] += 1
+                stats["failed_items"].append({
+                    "index": idx,
+                    "id": doc_id,
+                    "error": "缺少id字段",
+                    "data": item,
+                })
+                continue
+            
+            try:
+                # 插入数据
+                ok, error = await insert_standard_base_info_status(session, item)
+                
+                if ok:
+                    await session.commit()
+                    chinese_name = item.get("中文名", "")
+                    print(f"📄 [{idx}/{len(data)}] ID={doc_id} {chinese_name[:30]}... ✅")
+                    stats["success"] += 1
+                else:
+                    await session.rollback()
+                    print(f"📄 [{idx}/{len(data)}] ID={doc_id} ❌ ({error})")
+                    stats["failed"] += 1
+                    stats["failed_items"].append({
+                        "index": idx,
+                        "id": doc_id,
+                        "error": "插入失败",
+                        "reason": error,
+                        "data": item,
+                    })
+                    
+            except Exception as e:
+                await session.rollback()
+                error_message = str(e)
+                print(f"📄 [{idx}/{len(data)}] ID={doc_id} ❌ ({error_message})")
+                stats["failed"] += 1
+                stats["failed_items"].append({
+                    "index": idx,
+                    "id": doc_id,
+                    "error": "未知错误",
+                    "reason": error_message,
+                    "data": item,
+                })
+    
+    return stats
+
+
+async def main():
+    """主函数"""
+    try:
+        print(f"🔍 开始导入标准状态信息到数据库...")
+        print(f"📂 JSON文件: {OUTPUT_JSON_PATH}")
+        print(f"🔗 数据库: {settings.DATABASE_URL}")
+        print(f"📊 目标表: {TABLE_STANDARD_BASE_INFO_STATUS}")
+        print("-" * 60)
+        
+        stats = await process_json_file(OUTPUT_JSON_PATH)
+
+        # 保存失败记录
+        with open(FAILED_REPORT_PATH, "w", encoding="utf-8") as f:
+            json.dump({
+                "failed": stats["failed_items"],
+                "summary": {
+                    "total": stats["total"],
+                    "success": stats["success"],
+                    "failed": stats["failed"],
+                    "skipped": stats["skipped"],
+                }
+            }, f, ensure_ascii=False, indent=2)
+        
+        print("\n" + "=" * 60)
+        print(f"📊 总计: {stats['total']} | ✅ 成功: {stats['success']} | ❌ 失败: {stats['failed']} | ⊘ 跳过: {stats['skipped']}")
+        if stats["failed"] > 0:
+            print(f"❌ 失败汇总JSON: {FAILED_REPORT_PATH}")
+        print("=" * 60)
+        
+    except Exception as e:
+        print(f"❌ 错误: {str(e)}")
+    finally:
+        # 关闭数据库引擎,避免事件循环关闭警告
+        engine = get_async_engine()
+        await engine.dispose()
+
+
+if __name__ == "__main__":
+    asyncio.run(main())