milvus入库脚本.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. #!/usr/bin/env python3
  2. """
  3. 测试修复后的 Milvus 向量实现
  4. """
  5. import sys
  6. import os
  7. import json
  8. current_script_path = os.path.abspath(__file__)
  9. script_dir = os.path.dirname(current_script_path)
  10. project_root = os.path.abspath(os.path.join(script_dir, "../../"))
  11. if project_root not in sys.path:
  12. def load_documents_from_file(json_path: str):
  13. """
  14. 从单个 JSON 文件读取所有 chunks,生成:
  15. [{'content': str, 'metadata': dict}, ...]
  16. 只保留 chunk 自身的 content 和 metadata,不混入文件级元数据
  17. """
  18. documents = []
  19. if not os.path.isfile(json_path):
  20. print(f"[WARN] JSON 文件不存在: {json_path}")
  21. return documents
  22. try:
  23. with open(json_path, "r", encoding="utf-8") as f:
  24. data = json.load(f)
  25. except Exception as e:
  26. print(f"[ERROR] 读取 JSON 文件失败: {json_path}, error: {e}")
  27. return documents
  28. chunks = data.get("chunks", [])
  29. if not isinstance(chunks, list):
  30. print(f"[WARN] 文件 {json_path} 中的 chunks 字段不是 list,跳过")
  31. return documents
  32. for idx, chunk in enumerate(chunks):
  33. if not isinstance(chunk, dict):
  34. continue
  35. # 提取 content
  36. content = chunk.get("content", "")
  37. if not content or not str(content).strip():
  38. # 空内容不入库
  39. continue
  40. # 只用 chunk 自己的 metadata
  41. metadata = chunk.get("metadata", {}) or {}
  42. # 如果你也想保留 chunk 的索引,可以打开下面这行
  43. # metadata["chunk_index"] = idx
  44. documents.append({
  45. "content": content,
  46. "metadata": metadata,
  47. })
  48. print(f"[INFO] 文件 {os.path.basename(json_path)} 提取出 {len(documents)} 条 chunk 文档")
  49. return documents
  50. def test_basic_functionality():
  51. """测试基本功能"""
  52. try:
  53. # 导入并初始化 MilvusVectorManager
  54. from foundation.database.base.vector.milvus_vector import MilvusVectorManager
  55. print("成功导入 MilvusVectorManager")
  56. # 初始化管理器
  57. manager = MilvusVectorManager()
  58. print("MilvusVectorManager 初始化成功")
  59. # 测试 text_to_vector 方法
  60. test_text = "桥梁建设技术"
  61. vector = manager.text_to_vector(test_text)
  62. print(f"text_to_vector 测试成功,向量维度: {len(vector)}")
  63. # ====== 关键改动:从文件夹读取所有 JSON 文件,生成 documents ======
  64. # 指定你的 JSON 文件夹路径
  65. json_dir = "data_pipeline/test_rawdata"
  66. all_documents = []
  67. if not os.path.isdir(json_dir):
  68. print(f"[ERROR] 目录不存在: {json_dir}")
  69. return False
  70. # 遍历文件夹下所有文件
  71. for filename in os.listdir(json_dir):
  72. # 只处理 .json 文件(如果你是其它后缀,改这里)
  73. if not filename.lower().endswith(".json"):
  74. continue
  75. json_path = os.path.join(json_dir, filename)
  76. docs = load_documents_from_file(json_path)
  77. if docs:
  78. all_documents.extend(docs)
  79. print(f"[INFO] 总共从目录 {json_dir} 中解析出 {len(all_documents)} 条文档")
  80. if not all_documents:
  81. print("[ERROR] 未从任何文件中解析到文档,停止测试")
  82. return False
  83. # ====== 关键改动结束 ======
  84. collection_name = "first_bfp_collection_test"
  85. print(f"\n测试 create_hybrid_collection 方法...")
  86. vectorstore = manager.create_hybrid_collection(
  87. collection_name=collection_name,
  88. documents=all_documents, # ← 用目录里解析出的所有 documents
  89. drop_old=True
  90. )
  91. print("create_hybrid_collection 执行成功!")
  92. print(f"返回的 vectorstore 类型: {type(vectorstore)}")
  93. return True
  94. except Exception as e:
  95. print(f"测试失败: {e}")
  96. import traceback
  97. traceback.print_exc()
  98. return False
  99. if __name__ == "__main__":
  100. success = test_basic_functionality()
  101. print("\n" + "=" * 50)
  102. print(f"测试结果: {'成功' if success else '失败'}")
  103. if success:
  104. print("修复验证成功!")
  105. print("- text_to_vector 方法正常工作")
  106. print("- create_hybrid_collection 方法正常工作")
  107. print("- hybrid_search 方法正常工作")