Răsfoiți Sursa

Merge branch 'dev' of http://47.109.151.80:15030/CRBC-MaaS-Platform-Project/LQAdminPlatform into dev

lingmin_package@163.com 1 lună în urmă
părinte
comite
d5241033b1

+ 3 - 2
src/app/base/mineru_connection.py

@@ -126,8 +126,9 @@ class MinerUManager:
             self.update_db_status(doc_id, status=1)
             self.update_db_status(doc_id, status=1)
             
             
             # 2. 下载原始文件
             # 2. 下载原始文件
-            logger.info(f"正在下载文件: {file_url}...")
-            resp = requests.get(file_url, timeout=60)
+            full_file_url = self.minio_manager.get_full_url(file_url)
+            logger.info(f"正在下载文件: {full_file_url}...")
+            resp = requests.get(full_file_url, timeout=60)
             resp.raise_for_status()
             resp.raise_for_status()
             file_content = resp.content
             file_content = resp.content
             
             

+ 26 - 9
src/app/base/minio_connection.py

@@ -111,16 +111,33 @@ class MinioManager:
         return f"{self.file_base_url}{relative_path}"
         return f"{self.file_base_url}{relative_path}"
 
 
     def get_object_content(self, file_url: str) -> Optional[str]:
     def get_object_content(self, file_url: str) -> Optional[str]:
-        """根据文件 URL 获取内容,支持多种编码以解决乱码问题"""
+        """根据文件 URL 或相对路径获取内容,支持多种编码以解决乱码问题"""
         try:
         try:
-            # 从 URL 中提取 object_name
-            # URL 格式: http://{endpoint}/{bucket}/{object_name}
-            parts = file_url.split(f"/{self.bucket_name}/")
-            if len(parts) < 2:
-                logger.error(f"无效的 MinIO URL: {file_url}")
-                return None
+            object_name = None
             
             
-            object_name = parts[1]
+            # 1. 尝试从 URL 中提取 object_name
+            if file_url.startswith("http"):
+                # 优先匹配 file_base_url
+                if file_url.startswith(self.file_base_url):
+                    relative_path = file_url[len(self.file_base_url):]
+                    # 确保相对路径以 / 开头,然后去掉它以构造 object_name
+                    if not relative_path.startswith("/"):
+                        relative_path = "/" + relative_path
+                    object_name = f"{self.base_path}{relative_path}"
+                else:
+                    # 兼容旧的 URL 格式: http://{endpoint}/{bucket}/{object_name}
+                    parts = file_url.split(f"/{self.bucket_name}/")
+                    if len(parts) >= 2:
+                        object_name = parts[1]
+            
+            # 2. 如果不是 URL,或者是无法识别的 URL,则视为相对路径
+            if not object_name:
+                relative_path = file_url
+                if not relative_path.startswith("/"):
+                    relative_path = "/" + relative_path
+                object_name = f"{self.base_path}{relative_path}"
+            
+            logger.info(f"正在从 MinIO 获取对象内容: bucket={self.bucket_name}, object={object_name}")
             response = self.client.get_object(self.bucket_name, object_name)
             response = self.client.get_object(self.bucket_name, object_name)
             try:
             try:
                 data = response.read()
                 data = response.read()
@@ -139,7 +156,7 @@ class MinioManager:
             # 如果都失败了,尝试忽略错误解码(最后手段)
             # 如果都失败了,尝试忽略错误解码(最后手段)
             return data.decode('utf-8', errors='ignore')
             return data.decode('utf-8', errors='ignore')
         except Exception as e:
         except Exception as e:
-            logger.error(f"获取 MinIO 对象内容失败: {e}, URL: {file_url}")
+            logger.error(f"获取 MinIO 对象内容失败: {e}, URL/Path: {file_url}")
             return None
             return None
 
 
 def get_minio_manager() -> MinioManager:
 def get_minio_manager() -> MinioManager:

+ 64 - 62
src/app/base/pymilvus_store_database.py

@@ -3,6 +3,7 @@ import re
 import json
 import json
 import hashlib
 import hashlib
 import logging
 import logging
+import time
 from typing import List, Dict, Any, Optional, Tuple
 from typing import List, Dict, Any, Optional, Tuple
 
 
 from langchain_core.documents import Document
 from langchain_core.documents import Document
@@ -24,14 +25,14 @@ logger = logging.getLogger(__name__)
 # 一、配置区 (从项目配置中读取默认值)
 # 一、配置区 (从项目配置中读取默认值)
 # =============================
 # =============================
 
 
-# 默认处理目录,建议通过环境变量或配置修改
+# 默认处理目录
 ROOT_DIR = config_handler.get("admin_app", "MILVUS_IMPORT_ROOT", r"C:\Users\ZengChao\Desktop\新建文件夹")
 ROOT_DIR = config_handler.get("admin_app", "MILVUS_IMPORT_ROOT", r"C:\Users\ZengChao\Desktop\新建文件夹")
 
 
 # ✅ 父表 / 子表
 # ✅ 父表 / 子表
-PARENT_COLLECTION_NAME = "test_22_parent"
-CHILD_COLLECTION_NAME = "test_22_child"
+PARENT_COLLECTION_NAME = config_handler.get("admin_app", "PARENT_COLLECTION_NAME", "test_27_parent")
+CHILD_COLLECTION_NAME = config_handler.get("admin_app", "CHILD_COLLECTION_NAME", "test_27_child")
 
 
-DENSE_DIM_FALLBACK = 1024
+DENSE_DIM_FALLBACK = 4096
 CHUNK_ID_START = 0
 CHUNK_ID_START = 0
 
 
 # ✅ 父段最大长度(超过就把父段切成多条父表记录,但它们 parent_id 相同)
 # ✅ 父段最大长度(超过就把父段切成多条父表记录,但它们 parent_id 相同)
@@ -39,17 +40,16 @@ PARENT_MAX_CHARS = 6000
 
 
 # ✅ 标量字段(用于过滤)
 # ✅ 标量字段(用于过滤)
 BASE_SCALAR_FIELDS = {
 BASE_SCALAR_FIELDS = {
-    "is_deleted": False,
-    "parent_id": 0,  # ✅ 注意:这里最终会变成 “父段组ID”
-    "doc_id": "DOC_123",
-    "doc_version": 20260118,
-    "tags": "policy,hr",
-}
-
-# ✅ metadata(放 JSON metadata 字段)
-BASE_METADATA_JSON = {
-    "source_type": "pdf",
-    "source_uri": "s3://kb/a.pdf",
+    "is_deleted": 0,
+    "parent_id": "",  # ✅ 字符串格式的 SHA-1
+    "document_id": "DOC_123",
+    "index": 0,
+    "tag_list": "policy,hr",
+    "permission": {},
+    "created_by": "system",
+    "created_time": int(time.time() * 1000),
+    "updated_by": "system",
+    "updated_time": int(time.time() * 1000),
 }
 }
 
 
 # =============================
 # =============================
@@ -118,13 +118,13 @@ def split_md_by_h1_sections(md: str) -> List[Tuple[str, str]]:
     return sections
     return sections
 
 
 
 
-def make_parent_id(doc_id: str, doc_version: int, doc_name: str, h1_title: str, parent_seq: int) -> int:
+def make_parent_id(doc_id: str, doc_version: int, doc_name: str, h1_title: str, parent_seq: int) -> str:
     """
     """
     ✅ 生成稳定 parent_id(父段ID)
     ✅ 生成稳定 parent_id(父段ID)
     同一个 # 一级标题段无论父表切成几条记录,都共享同一个 parent_id
     同一个 # 一级标题段无论父表切成几条记录,都共享同一个 parent_id
     """
     """
     raw = f"{doc_id}|{doc_version}|{doc_name}|{parent_seq}|{h1_title}".encode("utf-8")
     raw = f"{doc_id}|{doc_version}|{doc_name}|{parent_seq}|{h1_title}".encode("utf-8")
-    return int(hashlib.sha1(raw).hexdigest()[:16], 16) & ((1 << 63) - 1)
+    return hashlib.sha1(raw).hexdigest()
 
 
 
 
 def split_text_by_max_chars(text: str, max_chars: int) -> List[str]:
 def split_text_by_max_chars(text: str, max_chars: int) -> List[str]:
@@ -184,34 +184,34 @@ def build_parent_and_child_documents_from_md(md_text: str, file_name: str) -> Tu
     3. 最后处理超长的父块(父块太长再切成多条父记录,共享同一个 parent_id)
     3. 最后处理超长的父块(父块太长再切成多条父记录,共享同一个 parent_id)
     """
     """
     doc_name = guess_doc_name_from_filename(file_name)
     doc_name = guess_doc_name_from_filename(file_name)
+    doc_version = 20260127 # 默认版本
 
 
     # 1) 按 # 一级标题切父块
     # 1) 按 # 一级标题切父块
     parent_sections = split_md_by_h1_sections(md_text)
     parent_sections = split_md_by_h1_sections(md_text)
 
 
-    parent_seq_to_parent_id: Dict[int, int] = {}
+    parent_seq_to_id: Dict[int, str] = {}
     # 先生成所有 parent_id
     # 先生成所有 parent_id
     for parent_seq, (h1_title, sec_text) in enumerate(parent_sections):
     for parent_seq, (h1_title, sec_text) in enumerate(parent_sections):
-        parent_id = make_parent_id(
-            doc_id=str(BASE_SCALAR_FIELDS["doc_id"]),
-            doc_version=int(BASE_SCALAR_FIELDS["doc_version"]),
+        p_id = make_parent_id(
+            doc_id=str(BASE_SCALAR_FIELDS["document_id"]),
+            doc_version=doc_version,
             doc_name=doc_name,
             doc_name=doc_name,
             h1_title=h1_title,
             h1_title=h1_title,
             parent_seq=parent_seq,
             parent_seq=parent_seq,
         )
         )
-        parent_seq_to_parent_id[parent_seq] = parent_id
+        parent_seq_to_id[parent_seq] = p_id
 
 
     # 2) 用切好的父块来处理子块(按空行切,但在父块范围内)
     # 2) 用切好的父块来处理子块(按空行切,但在父块范围内)
     child_docs: List[Document] = []
     child_docs: List[Document] = []
-    chunk_id_counter = CHUNK_ID_START
 
 
     for parent_seq, (h1_title, sec_text) in enumerate(parent_sections):
     for parent_seq, (h1_title, sec_text) in enumerate(parent_sections):
-        parent_id = parent_seq_to_parent_id[parent_seq]
+        p_id = parent_seq_to_id[parent_seq]
         
         
         # 在该父块范围内按空行切子块
         # 在该父块范围内按空行切子块
         chunks = split_md_by_blank_lines(sec_text)
         chunks = split_md_by_blank_lines(sec_text)
         heading_path: List[str] = []
         heading_path: List[str] = []
 
 
-        for chunk in chunks:
+        for c_idx, chunk in enumerate(chunks):
             # 子 chunk outline_path
             # 子 chunk outline_path
             heading_info = is_heading_chunk(chunk)
             heading_info = is_heading_chunk(chunk)
             if heading_info:
             if heading_info:
@@ -223,16 +223,14 @@ def build_parent_and_child_documents_from_md(md_text: str, file_name: str) -> Tu
                 outline_path = outline_path_str(heading_path)
                 outline_path = outline_path_str(heading_path)
 
 
             scalar_md = dict(BASE_SCALAR_FIELDS)
             scalar_md = dict(BASE_SCALAR_FIELDS)
-            scalar_md["chunk_id"] = chunk_id_counter
-            scalar_md["parent_id"] = int(parent_id)
+            scalar_md["index"] = int(c_idx)
+            scalar_md["parent_id"] = p_id
 
 
-            # ✅ metadata 只包含:source_uri, source_type, doc_name, outline_path, chunk_id
+            # ✅ metadata 包含:doc_name, outline_path, doc_version
             metadata_json = {
             metadata_json = {
-                "source_uri": BASE_METADATA_JSON.get("source_uri", ""),
-                "source_type": BASE_METADATA_JSON.get("source_type", ""),
                 "doc_name": doc_name,
                 "doc_name": doc_name,
                 "outline_path": outline_path,
                 "outline_path": outline_path,
-                "chunk_id": chunk_id_counter,
+                "doc_version": doc_version,
             }
             }
 
 
             child_docs.append(
             child_docs.append(
@@ -241,22 +239,21 @@ def build_parent_and_child_documents_from_md(md_text: str, file_name: str) -> Tu
                     metadata={**scalar_md, "metadata": metadata_json},
                     metadata={**scalar_md, "metadata": metadata_json},
                 )
                 )
             )
             )
-            chunk_id_counter += 1
 
 
     # 3) 处理超长的父块(父块太长再切成多条父记录)
     # 3) 处理超长的父块(父块太长再切成多条父记录)
     parent_docs: List[Document] = []
     parent_docs: List[Document] = []
 
 
     for parent_seq, (h1_title, sec_text) in enumerate(parent_sections):
     for parent_seq, (h1_title, sec_text) in enumerate(parent_sections):
-        parent_id = parent_seq_to_parent_id[parent_seq]
+        p_id = parent_seq_to_id[parent_seq]
         
         
         # 如果父块过长,切成多条
         # 如果父块过长,切成多条
         slices = split_text_by_max_chars(sec_text, PARENT_MAX_CHARS)
         slices = split_text_by_max_chars(sec_text, PARENT_MAX_CHARS)
         for slice_idx, slice_text in enumerate(slices):
         for slice_idx, slice_text in enumerate(slices):
             scalar_md = dict(BASE_SCALAR_FIELDS)
             scalar_md = dict(BASE_SCALAR_FIELDS)
-            scalar_md["chunk_id"] = CHUNK_ID_START + parent_seq
-            scalar_md["parent_id"] = int(parent_id)
+            scalar_md["index"] = int(parent_seq)
+            scalar_md["parent_id"] = p_id
 
 
-            # ✅ metadata 只包含:source_uri, source_type, doc_name, outline_path, chunk_id
+            # ✅ metadata 包含:doc_name, outline_path, doc_version
             if h1_title == "__PREAMBLE__":
             if h1_title == "__PREAMBLE__":
                 outline_path = doc_name
                 outline_path = doc_name
             elif h1_title == "__NO_H1__":
             elif h1_title == "__NO_H1__":
@@ -265,11 +262,9 @@ def build_parent_and_child_documents_from_md(md_text: str, file_name: str) -> Tu
                 outline_path = h1_title
                 outline_path = h1_title
             
             
             metadata_json = {
             metadata_json = {
-                "source_uri": BASE_METADATA_JSON.get("source_uri", ""),
-                "source_type": BASE_METADATA_JSON.get("source_type", ""),
                 "doc_name": doc_name,
                 "doc_name": doc_name,
                 "outline_path": outline_path,
                 "outline_path": outline_path,
-                "chunk_id": CHUNK_ID_START + parent_seq,
+                "doc_version": doc_version,
             }
             }
 
 
             parent_docs.append(
             parent_docs.append(
@@ -296,7 +291,10 @@ def save_docs_to_json(docs: List[Document], out_path: str) -> str:
 # =============================
 # =============================
 
 
 def detect_dense_dim(emb) -> int:
 def detect_dense_dim(emb) -> int:
-    return len(emb.embed_query("dim probe"))
+    try:
+        return len(emb.embed_query("dim probe"))
+    except:
+        return DENSE_DIM_FALLBACK
 
 
 
 
 def ensure_collection(client: MilvusClient, collection_name: str, dense_dim: int):
 def ensure_collection(client: MilvusClient, collection_name: str, dense_dim: int):
@@ -304,28 +302,27 @@ def ensure_collection(client: MilvusClient, collection_name: str, dense_dim: int
         return
         return
 
 
     schema = client.create_schema(auto_id=True, enable_dynamic_fields=False)
     schema = client.create_schema(auto_id=True, enable_dynamic_fields=False)
-
     schema.add_field("pk", DataType.INT64, is_primary=True, auto_id=True)
     schema.add_field("pk", DataType.INT64, is_primary=True, auto_id=True)
-
-    # ✅ BM25 输入字段必须 enable_analyzer=True
     schema.add_field("text", DataType.VARCHAR, max_length=65535, enable_analyzer=True)
     schema.add_field("text", DataType.VARCHAR, max_length=65535, enable_analyzer=True)
-
-    schema.add_field("dense_vec", DataType.FLOAT_VECTOR, dim=dense_dim)
-    schema.add_field("sparse_bm25", DataType.SPARSE_FLOAT_VECTOR)
-
-    schema.add_field("is_deleted", DataType.BOOL)
-    schema.add_field("parent_id", DataType.INT64)  # ✅ 这里的 parent_id = 父段组ID
-    schema.add_field("doc_id", DataType.VARCHAR, max_length=256)
-    schema.add_field("doc_version", DataType.INT64)
-    schema.add_field("tags", DataType.VARCHAR, max_length=2048)
-
+    schema.add_field("dense", DataType.FLOAT_VECTOR, dim=dense_dim)
+    schema.add_field("sparse", DataType.SPARSE_FLOAT_VECTOR)
+    schema.add_field("document_id", DataType.VARCHAR, max_length=256)
+    schema.add_field("parent_id", DataType.VARCHAR, max_length=256)
+    schema.add_field("index", DataType.INT64)
+    schema.add_field("tag_list", DataType.VARCHAR, max_length=2048)
+    schema.add_field("permission", DataType.JSON)
     schema.add_field("metadata", DataType.JSON)
     schema.add_field("metadata", DataType.JSON)
+    schema.add_field("is_deleted", DataType.INT64)
+    schema.add_field("created_by", DataType.VARCHAR, max_length=256)
+    schema.add_field("created_time", DataType.INT64)
+    schema.add_field("updated_by", DataType.VARCHAR, max_length=256)
+    schema.add_field("updated_time", DataType.INT64)
 
 
     schema.add_function(
     schema.add_function(
         Function(
         Function(
             name="bm25_fn",
             name="bm25_fn",
             input_field_names=["text"],
             input_field_names=["text"],
-            output_field_names=["sparse_bm25"],
+            output_field_names=["sparse"],
             function_type=FunctionType.BM25,
             function_type=FunctionType.BM25,
         )
         )
     )
     )
@@ -334,13 +331,13 @@ def ensure_collection(client: MilvusClient, collection_name: str, dense_dim: int
 
 
     index_params = client.prepare_index_params()
     index_params = client.prepare_index_params()
     index_params.add_index(
     index_params.add_index(
-        field_name="dense_vec",
+        field_name="dense",
         index_name="dense_idx",
         index_name="dense_idx",
         index_type="AUTOINDEX",
         index_type="AUTOINDEX",
         metric_type="COSINE",
         metric_type="COSINE",
     )
     )
     index_params.add_index(
     index_params.add_index(
-        field_name="sparse_bm25",
+        field_name="sparse",
         index_name="bm25_idx",
         index_name="bm25_idx",
         index_type="SPARSE_INVERTED_INDEX",
         index_type="SPARSE_INVERTED_INDEX",
         metric_type="BM25",
         metric_type="BM25",
@@ -366,13 +363,18 @@ def docs_to_entities(docs: List[Document], emb) -> List[Dict[str, Any]]:
         entities.append(
         entities.append(
             {
             {
                 "text": d.page_content,
                 "text": d.page_content,
-                "dense_vec": vec,
-                "is_deleted": bool(md.get("is_deleted", False)),
-                "parent_id": int(md.get("parent_id", 0)),
-                "doc_id": str(md.get("doc_id", "")),
-                "doc_version": int(md.get("doc_version", 0)),
-                "tags": str(md.get("tags", "")),
+                "dense": vec,
+                "is_deleted": int(md.get("is_deleted", 0)),
+                "parent_id": str(md.get("parent_id", "")),
+                "document_id": str(md.get("document_id", "")),
+                "index": int(md.get("index", 0)),
+                "tag_list": str(md.get("tag_list", "")),
+                "permission": md.get("permission", {}) if isinstance(md.get("permission", {}), dict) else {},
                 "metadata": md.get("metadata", {}) if isinstance(md.get("metadata", {}), dict) else {},
                 "metadata": md.get("metadata", {}) if isinstance(md.get("metadata", {}), dict) else {},
+                "created_by": str(md.get("created_by", "system")),
+                "created_time": int(md.get("created_time", int(time.time() * 1000))),
+                "updated_by": str(md.get("updated_by", "system")),
+                "updated_time": int(md.get("updated_time", int(time.time() * 1000))),
             }
             }
         )
         )
     return entities
     return entities

+ 3 - 0
src/app/services/milvus_service.py

@@ -27,6 +27,9 @@ class MilvusService:
         self.DENSE_DIM = 4096
         self.DENSE_DIM = 4096
         self.H1_RE = re.compile(r"^#\s+(.+?)\s*$", re.MULTILINE)
         self.H1_RE = re.compile(r"^#\s+(.+?)\s*$", re.MULTILINE)
         self.BLANK_SPLIT_RE = re.compile(r"\n\s*\n+")
         self.BLANK_SPLIT_RE = re.compile(r"\n\s*\n+")
+        
+        # 确保集合已创建
+        self.ensure_collections()
 
 
     def has_collection(self, collection_name: str) -> bool:
     def has_collection(self, collection_name: str) -> bool:
         """检查集合是否存在"""
         """检查集合是否存在"""