|
@@ -0,0 +1,141 @@
|
|
|
|
|
+#!/usr/bin/env python3
|
|
|
|
|
+"""
|
|
|
|
|
+标准状态数据入库 Milvus
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+from typing import List
|
|
|
|
|
+
|
|
|
|
|
+from langchain_core.documents import Document
|
|
|
|
|
+
|
|
|
|
|
+from app.config.embeddings import get_embeddings
|
|
|
|
|
+from app.config.milvus_client import get_milvusclient
|
|
|
|
|
+
|
|
|
|
|
+# ============================================================
|
|
|
|
|
+# 参数配置
|
|
|
|
|
+# ============================================================
|
|
|
|
|
+
|
|
|
|
|
+# Excel 文件路径
|
|
|
|
|
+EXCEL_PATH = r"C:\Users\ZengChao\Desktop\新建 XLSX 工作表.xlsx"
|
|
|
|
|
+
|
|
|
|
|
+# Collection 名称
|
|
|
|
|
+COLLECTION_NAME = "first_bfp_collection_status"
|
|
|
|
|
+
|
|
|
|
|
+# Excel 列名映射
|
|
|
|
|
+COL_CHINESE_NAME = "中文名" # 标准名称
|
|
|
|
|
+COL_STANDARD_NO = "编号" # 编号
|
|
|
|
|
+COL_STATUS = "状态" # 状态
|
|
|
|
|
+COL_ISSUING_AUTHORITY = "发布单位" # 发布机构
|
|
|
|
|
+
|
|
|
|
|
+# ============================================================
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def upsert_status_milvus(status_docs: List[Document], collection_name: str, embeddings):
|
|
|
|
|
+ """将状态文档写入 Milvus,使用项目内的 MilvusClient。"""
|
|
|
|
|
+ if not status_docs:
|
|
|
|
|
+ print("[WARN] 没有可写入的状态文档")
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ client = get_milvusclient()
|
|
|
|
|
+
|
|
|
|
|
+ texts = [doc.page_content for doc in status_docs]
|
|
|
|
|
+ vectors = embeddings.embed_documents(texts)
|
|
|
|
|
+
|
|
|
|
|
+ rows = []
|
|
|
|
|
+ for doc, vector in zip(status_docs, vectors):
|
|
|
|
|
+ rows.append({
|
|
|
|
|
+ "text": doc.page_content,
|
|
|
|
|
+ "dense": vector,
|
|
|
|
|
+ "issuing_authority": str(doc.metadata.get("issuing_authority", "") or ""),
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ client.insert(collection_name=collection_name, data=rows)
|
|
|
|
|
+ print(f"[OK] 状态数据写入 Milvus: {len(rows)} 条 (collection: {collection_name})")
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"[ERROR] 状态数据写入失败: {e}")
|
|
|
|
|
+ import traceback
|
|
|
|
|
+ traceback.print_exc()
|
|
|
|
|
+ raise
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def test_basic_functionality():
|
|
|
|
|
+ """测试基本功能:读取 Excel 数据并写入 Milvus"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 获取嵌入模型
|
|
|
|
|
+ embeddings = get_embeddings()
|
|
|
|
|
+ print("[OK] 成功获取嵌入模型")
|
|
|
|
|
+
|
|
|
|
|
+ import pandas as pd
|
|
|
|
|
+
|
|
|
|
|
+ df = pd.read_excel(EXCEL_PATH)
|
|
|
|
|
+ df = df.where(pd.notnull(df), None)
|
|
|
|
|
+
|
|
|
|
|
+ status_docs = []
|
|
|
|
|
+ seen_contents = set()
|
|
|
|
|
+
|
|
|
|
|
+ for _, row in df.iterrows():
|
|
|
|
|
+ chinese_name = row.get(COL_CHINESE_NAME)
|
|
|
|
|
+ standard_no = row.get(COL_STANDARD_NO)
|
|
|
|
|
+ status = row.get(COL_STATUS)
|
|
|
|
|
+
|
|
|
|
|
+ # 编号为空则跳过
|
|
|
|
|
+ if not standard_no or not str(standard_no).strip():
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if chinese_name or standard_no or status:
|
|
|
|
|
+ # 检查是否已带书名号,没有则添加
|
|
|
|
|
+ cn = str(chinese_name) if chinese_name else ""
|
|
|
|
|
+ if cn and not (cn.startswith("《") and cn.endswith("》")):
|
|
|
|
|
+ cn = f"《{cn}》"
|
|
|
|
|
+ content = f"{cn}({standard_no})状态为{status}"
|
|
|
|
|
+ else:
|
|
|
|
|
+ content = None
|
|
|
|
|
+
|
|
|
|
|
+ # 跳过空内容或重复内容
|
|
|
|
|
+ if not content or content in seen_contents:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ seen_contents.add(content)
|
|
|
|
|
+
|
|
|
|
|
+ # 提取发布单位
|
|
|
|
|
+ issuing_authority_val = row.get(COL_ISSUING_AUTHORITY)
|
|
|
|
|
+ issuing_authority = "" if pd.isna(issuing_authority_val) else str(issuing_authority_val)
|
|
|
|
|
+
|
|
|
|
|
+ # 转换为 LangChain Document
|
|
|
|
|
+ status_docs.append(
|
|
|
|
|
+ Document(
|
|
|
|
|
+ page_content=content,
|
|
|
|
|
+ metadata={"issuing_authority": issuing_authority}
|
|
|
|
|
+ )
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ print(f"[INFO] 解析出 {len(status_docs)} 条状态文档")
|
|
|
|
|
+
|
|
|
|
|
+ # 写入 Milvus
|
|
|
|
|
+ upsert_status_milvus(status_docs, COLLECTION_NAME, embeddings)
|
|
|
|
|
+ print(f"[SUCCESS] 写入 Milvus 成功!(collection: {COLLECTION_NAME})")
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"[ERROR] 测试失败: {e}")
|
|
|
|
|
+ import traceback
|
|
|
|
|
+ traceback.print_exc()
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
|
+ print("=" * 50)
|
|
|
|
|
+ print("[START] 开始状态数据入库")
|
|
|
|
|
+ print("=" * 50)
|
|
|
|
|
+
|
|
|
|
|
+ success = test_basic_functionality()
|
|
|
|
|
+
|
|
|
|
|
+ print("\n" + "=" * 50)
|
|
|
|
|
+ print(f"测试结果: {'成功' if success else '失败'}")
|
|
|
|
|
+
|
|
|
|
|
+ if success:
|
|
|
|
|
+ print("[SUCCESS] 入库流程完成!")
|
|
|
|
|
+ print("- 使用项目内 MilvusClient 写入")
|
|
|
|
|
+ print("- Dense 向量写入成功")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print("[ERROR] 入库流程出现错误")
|