|
|
@@ -3,6 +3,7 @@ import re
|
|
|
import json
|
|
|
import hashlib
|
|
|
import logging
|
|
|
+import time
|
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
|
|
|
from langchain_core.documents import Document
|
|
|
@@ -24,14 +25,14 @@ logger = logging.getLogger(__name__)
|
|
|
# 一、配置区 (从项目配置中读取默认值)
|
|
|
# =============================
|
|
|
|
|
|
-# 默认处理目录,建议通过环境变量或配置修改
|
|
|
+# 默认处理目录
|
|
|
ROOT_DIR = config_handler.get("admin_app", "MILVUS_IMPORT_ROOT", r"C:\Users\ZengChao\Desktop\新建文件夹")
|
|
|
|
|
|
# ✅ 父表 / 子表
|
|
|
-PARENT_COLLECTION_NAME = "test_22_parent"
|
|
|
-CHILD_COLLECTION_NAME = "test_22_child"
|
|
|
+PARENT_COLLECTION_NAME = config_handler.get("admin_app", "PARENT_COLLECTION_NAME", "test_27_parent")
|
|
|
+CHILD_COLLECTION_NAME = config_handler.get("admin_app", "CHILD_COLLECTION_NAME", "test_27_child")
|
|
|
|
|
|
-DENSE_DIM_FALLBACK = 1024
|
|
|
+DENSE_DIM_FALLBACK = 4096
|
|
|
CHUNK_ID_START = 0
|
|
|
|
|
|
# ✅ 父段最大长度(超过就把父段切成多条父表记录,但它们 parent_id 相同)
|
|
|
@@ -39,17 +40,16 @@ PARENT_MAX_CHARS = 6000
|
|
|
|
|
|
# ✅ 标量字段(用于过滤)
|
|
|
BASE_SCALAR_FIELDS = {
|
|
|
- "is_deleted": False,
|
|
|
- "parent_id": 0, # ✅ 注意:这里最终会变成 “父段组ID”
|
|
|
- "doc_id": "DOC_123",
|
|
|
- "doc_version": 20260118,
|
|
|
- "tags": "policy,hr",
|
|
|
-}
|
|
|
-
|
|
|
-# ✅ metadata(放 JSON metadata 字段)
|
|
|
-BASE_METADATA_JSON = {
|
|
|
- "source_type": "pdf",
|
|
|
- "source_uri": "s3://kb/a.pdf",
|
|
|
+ "is_deleted": 0,
|
|
|
+ "parent_id": "", # ✅ 字符串格式的 SHA-1
|
|
|
+ "document_id": "DOC_123",
|
|
|
+ "index": 0,
|
|
|
+ "tag_list": "policy,hr",
|
|
|
+ "permission": {},
|
|
|
+ "created_by": "system",
|
|
|
+ "created_time": int(time.time() * 1000),
|
|
|
+ "updated_by": "system",
|
|
|
+ "updated_time": int(time.time() * 1000),
|
|
|
}
|
|
|
|
|
|
# =============================
|
|
|
@@ -118,13 +118,13 @@ def split_md_by_h1_sections(md: str) -> List[Tuple[str, str]]:
|
|
|
return sections
|
|
|
|
|
|
|
|
|
-def make_parent_id(doc_id: str, doc_version: int, doc_name: str, h1_title: str, parent_seq: int) -> int:
|
|
|
+def make_parent_id(doc_id: str, doc_version: int, doc_name: str, h1_title: str, parent_seq: int) -> str:
|
|
|
"""
|
|
|
✅ 生成稳定 parent_id(父段ID)
|
|
|
同一个 # 一级标题段无论父表切成几条记录,都共享同一个 parent_id
|
|
|
"""
|
|
|
raw = f"{doc_id}|{doc_version}|{doc_name}|{parent_seq}|{h1_title}".encode("utf-8")
|
|
|
- return int(hashlib.sha1(raw).hexdigest()[:16], 16) & ((1 << 63) - 1)
|
|
|
+ return hashlib.sha1(raw).hexdigest()
|
|
|
|
|
|
|
|
|
def split_text_by_max_chars(text: str, max_chars: int) -> List[str]:
|
|
|
@@ -184,34 +184,34 @@ def build_parent_and_child_documents_from_md(md_text: str, file_name: str) -> Tu
|
|
|
3. 最后处理超长的父块(父块太长再切成多条父记录,共享同一个 parent_id)
|
|
|
"""
|
|
|
doc_name = guess_doc_name_from_filename(file_name)
|
|
|
+ doc_version = 20260127 # 默认版本
|
|
|
|
|
|
# 1) 按 # 一级标题切父块
|
|
|
parent_sections = split_md_by_h1_sections(md_text)
|
|
|
|
|
|
- parent_seq_to_parent_id: Dict[int, int] = {}
|
|
|
+ parent_seq_to_id: Dict[int, str] = {}
|
|
|
# 先生成所有 parent_id
|
|
|
for parent_seq, (h1_title, sec_text) in enumerate(parent_sections):
|
|
|
- parent_id = make_parent_id(
|
|
|
- doc_id=str(BASE_SCALAR_FIELDS["doc_id"]),
|
|
|
- doc_version=int(BASE_SCALAR_FIELDS["doc_version"]),
|
|
|
+ p_id = make_parent_id(
|
|
|
+ doc_id=str(BASE_SCALAR_FIELDS["document_id"]),
|
|
|
+ doc_version=doc_version,
|
|
|
doc_name=doc_name,
|
|
|
h1_title=h1_title,
|
|
|
parent_seq=parent_seq,
|
|
|
)
|
|
|
- parent_seq_to_parent_id[parent_seq] = parent_id
|
|
|
+ parent_seq_to_id[parent_seq] = p_id
|
|
|
|
|
|
# 2) 用切好的父块来处理子块(按空行切,但在父块范围内)
|
|
|
child_docs: List[Document] = []
|
|
|
- chunk_id_counter = CHUNK_ID_START
|
|
|
|
|
|
for parent_seq, (h1_title, sec_text) in enumerate(parent_sections):
|
|
|
- parent_id = parent_seq_to_parent_id[parent_seq]
|
|
|
+ p_id = parent_seq_to_id[parent_seq]
|
|
|
|
|
|
# 在该父块范围内按空行切子块
|
|
|
chunks = split_md_by_blank_lines(sec_text)
|
|
|
heading_path: List[str] = []
|
|
|
|
|
|
- for chunk in chunks:
|
|
|
+ for c_idx, chunk in enumerate(chunks):
|
|
|
# 子 chunk outline_path
|
|
|
heading_info = is_heading_chunk(chunk)
|
|
|
if heading_info:
|
|
|
@@ -223,16 +223,14 @@ def build_parent_and_child_documents_from_md(md_text: str, file_name: str) -> Tu
|
|
|
outline_path = outline_path_str(heading_path)
|
|
|
|
|
|
scalar_md = dict(BASE_SCALAR_FIELDS)
|
|
|
- scalar_md["chunk_id"] = chunk_id_counter
|
|
|
- scalar_md["parent_id"] = int(parent_id)
|
|
|
+ scalar_md["index"] = int(c_idx)
|
|
|
+ scalar_md["parent_id"] = p_id
|
|
|
|
|
|
- # ✅ metadata 只包含:source_uri, source_type, doc_name, outline_path, chunk_id
|
|
|
+ # ✅ metadata 包含:doc_name, outline_path, doc_version
|
|
|
metadata_json = {
|
|
|
- "source_uri": BASE_METADATA_JSON.get("source_uri", ""),
|
|
|
- "source_type": BASE_METADATA_JSON.get("source_type", ""),
|
|
|
"doc_name": doc_name,
|
|
|
"outline_path": outline_path,
|
|
|
- "chunk_id": chunk_id_counter,
|
|
|
+ "doc_version": doc_version,
|
|
|
}
|
|
|
|
|
|
child_docs.append(
|
|
|
@@ -241,22 +239,21 @@ def build_parent_and_child_documents_from_md(md_text: str, file_name: str) -> Tu
|
|
|
metadata={**scalar_md, "metadata": metadata_json},
|
|
|
)
|
|
|
)
|
|
|
- chunk_id_counter += 1
|
|
|
|
|
|
# 3) 处理超长的父块(父块太长再切成多条父记录)
|
|
|
parent_docs: List[Document] = []
|
|
|
|
|
|
for parent_seq, (h1_title, sec_text) in enumerate(parent_sections):
|
|
|
- parent_id = parent_seq_to_parent_id[parent_seq]
|
|
|
+ p_id = parent_seq_to_id[parent_seq]
|
|
|
|
|
|
# 如果父块过长,切成多条
|
|
|
slices = split_text_by_max_chars(sec_text, PARENT_MAX_CHARS)
|
|
|
for slice_idx, slice_text in enumerate(slices):
|
|
|
scalar_md = dict(BASE_SCALAR_FIELDS)
|
|
|
- scalar_md["chunk_id"] = CHUNK_ID_START + parent_seq
|
|
|
- scalar_md["parent_id"] = int(parent_id)
|
|
|
+ scalar_md["index"] = int(parent_seq)
|
|
|
+ scalar_md["parent_id"] = p_id
|
|
|
|
|
|
- # ✅ metadata 只包含:source_uri, source_type, doc_name, outline_path, chunk_id
|
|
|
+ # ✅ metadata 包含:doc_name, outline_path, doc_version
|
|
|
if h1_title == "__PREAMBLE__":
|
|
|
outline_path = doc_name
|
|
|
elif h1_title == "__NO_H1__":
|
|
|
@@ -265,11 +262,9 @@ def build_parent_and_child_documents_from_md(md_text: str, file_name: str) -> Tu
|
|
|
outline_path = h1_title
|
|
|
|
|
|
metadata_json = {
|
|
|
- "source_uri": BASE_METADATA_JSON.get("source_uri", ""),
|
|
|
- "source_type": BASE_METADATA_JSON.get("source_type", ""),
|
|
|
"doc_name": doc_name,
|
|
|
"outline_path": outline_path,
|
|
|
- "chunk_id": CHUNK_ID_START + parent_seq,
|
|
|
+ "doc_version": doc_version,
|
|
|
}
|
|
|
|
|
|
parent_docs.append(
|
|
|
@@ -296,7 +291,10 @@ def save_docs_to_json(docs: List[Document], out_path: str) -> str:
|
|
|
# =============================
|
|
|
|
|
|
def detect_dense_dim(emb) -> int:
|
|
|
- return len(emb.embed_query("dim probe"))
|
|
|
+ try:
|
|
|
+ return len(emb.embed_query("dim probe"))
|
|
|
+ except:
|
|
|
+ return DENSE_DIM_FALLBACK
|
|
|
|
|
|
|
|
|
def ensure_collection(client: MilvusClient, collection_name: str, dense_dim: int):
|
|
|
@@ -304,28 +302,27 @@ def ensure_collection(client: MilvusClient, collection_name: str, dense_dim: int
|
|
|
return
|
|
|
|
|
|
schema = client.create_schema(auto_id=True, enable_dynamic_fields=False)
|
|
|
-
|
|
|
schema.add_field("pk", DataType.INT64, is_primary=True, auto_id=True)
|
|
|
-
|
|
|
- # ✅ BM25 输入字段必须 enable_analyzer=True
|
|
|
schema.add_field("text", DataType.VARCHAR, max_length=65535, enable_analyzer=True)
|
|
|
-
|
|
|
- schema.add_field("dense_vec", DataType.FLOAT_VECTOR, dim=dense_dim)
|
|
|
- schema.add_field("sparse_bm25", DataType.SPARSE_FLOAT_VECTOR)
|
|
|
-
|
|
|
- schema.add_field("is_deleted", DataType.BOOL)
|
|
|
- schema.add_field("parent_id", DataType.INT64) # ✅ 这里的 parent_id = 父段组ID
|
|
|
- schema.add_field("doc_id", DataType.VARCHAR, max_length=256)
|
|
|
- schema.add_field("doc_version", DataType.INT64)
|
|
|
- schema.add_field("tags", DataType.VARCHAR, max_length=2048)
|
|
|
-
|
|
|
+ schema.add_field("dense", DataType.FLOAT_VECTOR, dim=dense_dim)
|
|
|
+ schema.add_field("sparse", DataType.SPARSE_FLOAT_VECTOR)
|
|
|
+ schema.add_field("document_id", DataType.VARCHAR, max_length=256)
|
|
|
+ schema.add_field("parent_id", DataType.VARCHAR, max_length=256)
|
|
|
+ schema.add_field("index", DataType.INT64)
|
|
|
+ schema.add_field("tag_list", DataType.VARCHAR, max_length=2048)
|
|
|
+ schema.add_field("permission", DataType.JSON)
|
|
|
schema.add_field("metadata", DataType.JSON)
|
|
|
+ schema.add_field("is_deleted", DataType.INT64)
|
|
|
+ schema.add_field("created_by", DataType.VARCHAR, max_length=256)
|
|
|
+ schema.add_field("created_time", DataType.INT64)
|
|
|
+ schema.add_field("updated_by", DataType.VARCHAR, max_length=256)
|
|
|
+ schema.add_field("updated_time", DataType.INT64)
|
|
|
|
|
|
schema.add_function(
|
|
|
Function(
|
|
|
name="bm25_fn",
|
|
|
input_field_names=["text"],
|
|
|
- output_field_names=["sparse_bm25"],
|
|
|
+ output_field_names=["sparse"],
|
|
|
function_type=FunctionType.BM25,
|
|
|
)
|
|
|
)
|
|
|
@@ -334,13 +331,13 @@ def ensure_collection(client: MilvusClient, collection_name: str, dense_dim: int
|
|
|
|
|
|
index_params = client.prepare_index_params()
|
|
|
index_params.add_index(
|
|
|
- field_name="dense_vec",
|
|
|
+ field_name="dense",
|
|
|
index_name="dense_idx",
|
|
|
index_type="AUTOINDEX",
|
|
|
metric_type="COSINE",
|
|
|
)
|
|
|
index_params.add_index(
|
|
|
- field_name="sparse_bm25",
|
|
|
+ field_name="sparse",
|
|
|
index_name="bm25_idx",
|
|
|
index_type="SPARSE_INVERTED_INDEX",
|
|
|
metric_type="BM25",
|
|
|
@@ -366,13 +363,18 @@ def docs_to_entities(docs: List[Document], emb) -> List[Dict[str, Any]]:
|
|
|
entities.append(
|
|
|
{
|
|
|
"text": d.page_content,
|
|
|
- "dense_vec": vec,
|
|
|
- "is_deleted": bool(md.get("is_deleted", False)),
|
|
|
- "parent_id": int(md.get("parent_id", 0)),
|
|
|
- "doc_id": str(md.get("doc_id", "")),
|
|
|
- "doc_version": int(md.get("doc_version", 0)),
|
|
|
- "tags": str(md.get("tags", "")),
|
|
|
+ "dense": vec,
|
|
|
+ "is_deleted": int(md.get("is_deleted", 0)),
|
|
|
+ "parent_id": str(md.get("parent_id", "")),
|
|
|
+ "document_id": str(md.get("document_id", "")),
|
|
|
+ "index": int(md.get("index", 0)),
|
|
|
+ "tag_list": str(md.get("tag_list", "")),
|
|
|
+ "permission": md.get("permission", {}) if isinstance(md.get("permission", {}), dict) else {},
|
|
|
"metadata": md.get("metadata", {}) if isinstance(md.get("metadata", {}), dict) else {},
|
|
|
+ "created_by": str(md.get("created_by", "system")),
|
|
|
+ "created_time": int(md.get("created_time", int(time.time() * 1000))),
|
|
|
+ "updated_by": str(md.get("updated_by", "system")),
|
|
|
+ "updated_time": int(md.get("updated_time", int(time.time() * 1000))),
|
|
|
}
|
|
|
)
|
|
|
return entities
|