milvus入库脚本.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. #!/usr/bin/env python3
  2. """
  3. 测试修复后的 Milvus 向量实现
  4. """
  5. import sys
  6. import os
  7. import json
  8. # 添加项目根目录到路径
  9. # 解决模块导入问题:添加项目根目录到 Python 路径
  10. current_script_path = os.path.abspath(__file__)
  11. script_dir = os.path.dirname(current_script_path)
  12. project_root = os.path.abspath(os.path.join(script_dir, "../../"))
  13. if project_root not in sys.path:
  14. sys.path.insert(0, project_root)
  15. def load_documents_from_file(json_path: str):
  16. """
  17. 从单个 JSON 文件读取所有 chunks,生成:
  18. [{'content': str, 'metadata': dict}, ...]
  19. 只保留 chunk 自身的 content 和 metadata,不混入文件级元数据
  20. """
  21. documents = []
  22. if not os.path.isfile(json_path):
  23. print(f"[WARN] JSON 文件不存在: {json_path}")
  24. return documents
  25. try:
  26. with open(json_path, "r", encoding="utf-8") as f:
  27. data = json.load(f)
  28. except Exception as e:
  29. print(f"[ERROR] 读取 JSON 文件失败: {json_path}, error: {e}")
  30. return documents
  31. chunks = data.get("chunks", [])
  32. if not isinstance(chunks, list):
  33. print(f"[WARN] 文件 {json_path} 中的 chunks 字段不是 list,跳过")
  34. return documents
  35. for idx, chunk in enumerate(chunks):
  36. if not isinstance(chunk, dict):
  37. continue
  38. # 提取 content
  39. content = chunk.get("content", "")
  40. if not content or not str(content).strip():
  41. # 空内容不入库
  42. continue
  43. # 只用 chunk 自己的 metadata
  44. metadata = chunk.get("metadata", {}) or {}
  45. # 如果你也想保留 chunk 的索引,可以打开下面这行
  46. # metadata["chunk_index"] = idx
  47. documents.append({
  48. "content": content,
  49. "metadata": metadata,
  50. })
  51. print(f"[INFO] 文件 {os.path.basename(json_path)} 提取出 {len(documents)} 条 chunk 文档")
  52. return documents
  53. def test_basic_functionality():
  54. """测试基本功能"""
  55. try:
  56. # 导入并初始化 MilvusVectorManager
  57. from foundation.database.base.vector.milvus_vector import MilvusVectorManager
  58. print("成功导入 MilvusVectorManager")
  59. # 初始化管理器
  60. manager = MilvusVectorManager()
  61. print("MilvusVectorManager 初始化成功")
  62. # 测试 text_to_vector 方法
  63. test_text = "桥梁建设技术"
  64. vector = manager.text_to_vector(test_text)
  65. print(f"text_to_vector 测试成功,向量维度: {len(vector)}")
  66. # ====== 关键改动:从文件夹读取所有 JSON 文件,生成 documents ======
  67. # 指定你的 JSON 文件夹路径
  68. json_dir = "data_pipeline/test_rawdata"
  69. all_documents = []
  70. if not os.path.isdir(json_dir):
  71. print(f"[ERROR] 目录不存在: {json_dir}")
  72. return False
  73. # 遍历文件夹下所有文件
  74. for filename in os.listdir(json_dir):
  75. # 只处理 .json 文件(如果你是其它后缀,改这里)
  76. if not filename.lower().endswith(".json"):
  77. continue
  78. json_path = os.path.join(json_dir, filename)
  79. docs = load_documents_from_file(json_path)
  80. if docs:
  81. all_documents.extend(docs)
  82. print(f"[INFO] 总共从目录 {json_dir} 中解析出 {len(all_documents)} 条文档")
  83. if not all_documents:
  84. print("[ERROR] 未从任何文件中解析到文档,停止测试")
  85. return False
  86. # ====== 关键改动结束 ======
  87. collection_name = "first_bfp_collection_test"
  88. print(f"\n测试 create_hybrid_collection 方法...")
  89. vectorstore = manager.create_hybrid_collection(
  90. collection_name=collection_name,
  91. documents=all_documents # ← 用目录里解析出的所有 documents
  92. )
  93. print("create_hybrid_collection 执行成功!")
  94. print(f"返回的 vectorstore 类型: {type(vectorstore)}")
  95. return True
  96. except Exception as e:
  97. print(f"测试失败: {e}")
  98. import traceback
  99. traceback.print_exc()
  100. return False
  101. if __name__ == "__main__":
  102. success = test_basic_functionality()
  103. print("\n" + "=" * 50)
  104. print(f"测试结果: {'成功' if success else '失败'}")
  105. if success:
  106. print("修复验证成功!")
  107. print("- text_to_vector 方法正常工作")
  108. print("- create_hybrid_collection 方法正常工作")
  109. print("- hybrid_search 方法正常工作")