|
|
@@ -7,6 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
from sqlalchemy import select, func, or_, delete as sql_delete
|
|
|
from datetime import datetime
|
|
|
import uuid
|
|
|
+import asyncio
|
|
|
|
|
|
from app.sample.models.knowledge_base import KnowledgeBase
|
|
|
from app.sample.models.metadata import SampleMetadata
|
|
|
@@ -97,7 +98,8 @@ class KnowledgeBaseService:
|
|
|
if res and isinstance(res, list) and "count(*)" in res[0]:
|
|
|
return int(res[0]["count(*)"])
|
|
|
except Exception as e:
|
|
|
- print(f"Get collection row count error for {collection_name}: {e}")
|
|
|
+ # print(f"Get collection row count error for {collection_name}: {e}")
|
|
|
+ pass
|
|
|
|
|
|
# Fallback: 使用 get_collection_stats (可能包含已删除未 Compaction 的数据)
|
|
|
try:
|
|
|
@@ -111,16 +113,20 @@ class KnowledgeBaseService:
|
|
|
[Internal] 从 Milvus 数据中推断元数据并保存到 DB
|
|
|
仅当 DB 中没有定义元数据时调用
|
|
|
"""
|
|
|
+ target_col = kb.collection_name1
|
|
|
+ if not target_col:
|
|
|
+ return
|
|
|
+
|
|
|
try:
|
|
|
# 检查是否已加载(避免不必要的错误)
|
|
|
- # if milvus_service.get_collection_state(kb.collection_name) != "Loaded":
|
|
|
- # milvus_service.load_collection(kb.collection_name)
|
|
|
+ # if milvus_service.get_collection_state(target_col) != "Loaded":
|
|
|
+ # milvus_service.load_collection(target_col)
|
|
|
|
|
|
# 采样查询 (获取前10条)
|
|
|
res = []
|
|
|
try:
|
|
|
# 先检查 metadata 字段是否存在,避免报错 field metadata not exist
|
|
|
- desc = milvus_service.client.describe_collection(kb.collection_name)
|
|
|
+ desc = milvus_service.client.describe_collection(target_col)
|
|
|
fields = [f['name'] for f in desc.get('fields', [])]
|
|
|
if "metadata" not in fields:
|
|
|
return # 集合无 metadata 字段,无需推断
|
|
|
@@ -147,7 +153,7 @@ class KnowledgeBaseService:
|
|
|
filter_expr = f'{pk_field} != ""'
|
|
|
|
|
|
res = milvus_service.client.query(
|
|
|
- collection_name=kb.collection_name,
|
|
|
+ collection_name=target_col,
|
|
|
filter=filter_expr,
|
|
|
output_fields=["metadata"],
|
|
|
limit=10
|
|
|
@@ -158,7 +164,7 @@ class KnowledgeBaseService:
|
|
|
# 再次确认 metadata 是否存在 (防止上面的检查因某种原因失效或异常被捕获)
|
|
|
if "metadata" in fields:
|
|
|
res = milvus_service.client.query(
|
|
|
- collection_name=kb.collection_name,
|
|
|
+ collection_name=target_col,
|
|
|
filter="",
|
|
|
output_fields=["metadata"],
|
|
|
limit=10
|
|
|
@@ -199,9 +205,9 @@ class KnowledgeBaseService:
|
|
|
db.add(new_metadata)
|
|
|
|
|
|
# 注意:调用方负责 commit,这里不 commit 以支持批量事务
|
|
|
- print(f"Auto inferred metadata for {kb.collection_name}: {inferred_keys}")
|
|
|
+ print(f"Auto inferred metadata for {target_col}: {inferred_keys}")
|
|
|
except Exception as e:
|
|
|
- print(f"Failed to infer metadata for {kb.collection_name}: {e}")
|
|
|
+ print(f"Failed to infer metadata for {target_col}: {e}")
|
|
|
|
|
|
async def get_list(
|
|
|
self,
|
|
|
@@ -213,7 +219,7 @@ class KnowledgeBaseService:
|
|
|
) -> Tuple[List[KnowledgeBase], PaginationSchema]:
|
|
|
"""获取知识库列表"""
|
|
|
|
|
|
- # --- 同步 Milvus 数据 ---
|
|
|
+ # --- 同步 Milvus 数据 (简化版:仅更新现有KB的计数和状态) ---
|
|
|
try:
|
|
|
# 1. 获取 Milvus 所有集合
|
|
|
milvus_names = milvus_service.client.list_collections()
|
|
|
@@ -221,97 +227,28 @@ class KnowledgeBaseService:
|
|
|
# 2. 获取 DB 中已有的集合
|
|
|
result = await db.execute(select(KnowledgeBase).where(KnowledgeBase.is_deleted == 0))
|
|
|
existing_kbs = result.scalars().all()
|
|
|
- existing_map = {kb.collection_name: kb for kb in existing_kbs}
|
|
|
|
|
|
- # 3. 同步逻辑
|
|
|
+ # 3. 更新现有KB的统计
|
|
|
has_changes = False
|
|
|
- for m_name in milvus_names:
|
|
|
- # 获取统计信息
|
|
|
- row_count = await self._get_collection_row_count(m_name)
|
|
|
-
|
|
|
- if m_name not in existing_map:
|
|
|
- # 新增
|
|
|
- new_kb = KnowledgeBase(
|
|
|
- id=str(uuid.uuid4()),
|
|
|
- name=m_name,
|
|
|
- collection_name=m_name,
|
|
|
- description="Synced from Milvus",
|
|
|
- status="normal",
|
|
|
- document_count=row_count,
|
|
|
- created_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
|
- updated_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
- )
|
|
|
- db.add(new_kb)
|
|
|
-
|
|
|
- # [新增逻辑] 对新发现的知识库,立即尝试推断元数据
|
|
|
- if row_count > 0:
|
|
|
- await self._infer_and_save_metadata(db, new_kb)
|
|
|
-
|
|
|
- has_changes = True
|
|
|
- else:
|
|
|
- # 更新统计
|
|
|
- kb = existing_map[m_name]
|
|
|
- if kb.document_count != row_count:
|
|
|
- kb.document_count = row_count
|
|
|
- # kb.created_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 统计更新不一定更新时间
|
|
|
- has_changes = True
|
|
|
-
|
|
|
- # [新增逻辑] 如果已有知识库有数据但没有元数据定义,尝试推断 (Lazy check)
|
|
|
- # 为了性能,这里不每次都查元数据表。只有当 row_count > 0 时才考虑。
|
|
|
- # 且为了避免每次都 query Milvus,我们可以假设如果 DB 中没有 metadata 记录才去推断。
|
|
|
- # 但在循环中查 DB (select count) 也是性能损耗。
|
|
|
- # 既然用户现在的痛点是“必须点击查看”,我们可以放宽一点:
|
|
|
- # 仅在新发现时推断,或者增加一个明确的“同步所有”按钮。
|
|
|
- # 或者,这里简单做个优化:如果 row_count > 0,我们尝试去 infer。
|
|
|
- # _infer_and_save_metadata 内部不包含 "检查DB是否有元数据" 的逻辑,需要补充。
|
|
|
-
|
|
|
- if has_changes:
|
|
|
- await db.commit()
|
|
|
+ for kb in existing_kbs:
|
|
|
+ total_count = 0
|
|
|
|
|
|
- # [补充逻辑] 对所有现有 KB,如果 document_count > 0,检查是否需要推断
|
|
|
- # 为了不影响性能,这里只对本次循环中更新了 count 的,或者...
|
|
|
- # 还是保持现状:只对新发现的做推断。对于老的,用户可能需要手动触发或者我们做一个批量接口。
|
|
|
- # 但用户说“如果不点击查看就没有办法自动推断”,暗示他希望列表页能解决。
|
|
|
- # 我们可以查一次所有 KB 的 ID,再查 SampleMetadata 表中哪些 KB ID 已经有数据了。
|
|
|
-
|
|
|
- # 1. 获取所有有数据的 KB ID
|
|
|
- # active_kb_ids = [kb.id for kb in existing_kbs if kb.document_count > 0]
|
|
|
- # if active_kb_ids:
|
|
|
- # # 2. 查哪些已经有元数据
|
|
|
- # meta_res = await db.execute(select(SampleMetadata.knowledge_base_id).where(SampleMetadata.knowledge_base_id.in_(active_kb_ids)).distinct())
|
|
|
- # has_meta_ids = set(meta_res.scalars().all())
|
|
|
- #
|
|
|
- # # 3. 找出需要推断的
|
|
|
- # need_infer_kbs = [kb for kb in existing_kbs if kb.id in active_kb_ids and kb.id not in has_meta_ids]
|
|
|
- #
|
|
|
- # for kb in need_infer_kbs:
|
|
|
- # await self._infer_and_save_metadata(db, kb)
|
|
|
- # has_changes = True
|
|
|
-
|
|
|
- # 上述逻辑比较完善,加入代码中:
|
|
|
-
|
|
|
- # 获取所有当前存在的 KB (包括刚新增的,如果 session 未 commit 可能查不到 ID,所以最好在 commit 后再做,或者分两步)
|
|
|
- # 简单起见,我们把上面的 commit 放在这之前是不行的,因为 new_kb 还没 ID (uuid 是手动生成的,其实有 ID)。
|
|
|
-
|
|
|
- # 优化:只对 existing_kbs 做检查。new_kb 已经在上面处理了。
|
|
|
- if existing_kbs:
|
|
|
- active_kbs = [kb for kb in existing_kbs if kb.document_count > 0]
|
|
|
- if active_kbs:
|
|
|
- active_ids = [kb.id for kb in active_kbs]
|
|
|
- # 批量查询已存在元数据的 KB ID
|
|
|
- meta_res = await db.execute(select(SampleMetadata.knowledge_base_id).where(SampleMetadata.knowledge_base_id.in_(active_ids)).distinct())
|
|
|
- has_meta_ids = set(meta_res.scalars().all())
|
|
|
+ # 统计 collection1
|
|
|
+ if kb.collection_name1 and kb.collection_name1 in milvus_names:
|
|
|
+ total_count += await self._get_collection_row_count(kb.collection_name1)
|
|
|
|
|
|
- for kb in active_kbs:
|
|
|
- if kb.id not in has_meta_ids:
|
|
|
- await self._infer_and_save_metadata(db, kb)
|
|
|
- has_changes = True
|
|
|
+ # 统计 collection2
|
|
|
+ if kb.collection_name2 and kb.collection_name2 in milvus_names:
|
|
|
+ total_count += await self._get_collection_row_count(kb.collection_name2)
|
|
|
+
|
|
|
+ if kb.document_count != total_count:
|
|
|
+ kb.document_count = total_count
|
|
|
+ has_changes = True
|
|
|
|
|
|
if has_changes:
|
|
|
await db.commit()
|
|
|
|
|
|
except Exception as e:
|
|
|
- # 同步失败不影响查询,只打印日志
|
|
|
print(f"Sync Milvus collections failed: {e}")
|
|
|
# ----------------------
|
|
|
|
|
|
@@ -321,262 +258,34 @@ class KnowledgeBaseService:
|
|
|
if keyword:
|
|
|
query = query.where(or_(
|
|
|
KnowledgeBase.name.like(f"%{keyword}%"),
|
|
|
- KnowledgeBase.collection_name.like(f"%{keyword}%")
|
|
|
+ KnowledgeBase.collection_name1.like(f"%{keyword}%"),
|
|
|
+ KnowledgeBase.collection_name2.like(f"%{keyword}%")
|
|
|
))
|
|
|
|
|
|
if status:
|
|
|
query = query.where(KnowledgeBase.status == status)
|
|
|
|
|
|
- # [Modified] 过滤掉 _parent 结尾的集合,只显示主集合 (现在不需要过滤了,因为用户想要分开创建)
|
|
|
- # 但用户又说“知识库名称只有一个”,但表名是两行。
|
|
|
- # 如果前端发了两次请求创建了两个 KB,那么 DB 里会有两个 KB。
|
|
|
- # 现在的要求是:
|
|
|
- # 1. 创建时:分开填表名 (Done, 前端已回退)
|
|
|
- # 2. 列表显示时:合并显示 (Same as before)
|
|
|
-
|
|
|
- # 关键点:如何识别哪两个是一对?
|
|
|
- # 之前的逻辑是根据 _parent 后缀。
|
|
|
- # 如果用户手动命名,比如 kb_child 和 kb_daddy,我们无法自动识别它们是一对。
|
|
|
- # 除非我们在 DB 里加 parent_id 字段。
|
|
|
-
|
|
|
- # 用户原话:“知识库新增界面的集合名称不用改,依旧还是分为父知识库和子知识库方便用户自主命名”
|
|
|
- # 意思是创建时保留两个输入框。
|
|
|
- # “我要求创建后前端页面的知识库名称只有一个,然后父子的知识库表名成两行形式放在知识库表名字段中”
|
|
|
- # 意思是列表页要合并。
|
|
|
-
|
|
|
- # 问题:如果用户随便命名,我们怎么合并?
|
|
|
- # 假设:用户会遵循某种命名约定?或者我们在创建时记录关联?
|
|
|
- # 之前的代码自动加 _parent 是为了方便关联。
|
|
|
- # 如果用户自主命名,我们很难猜测。
|
|
|
-
|
|
|
- # 重新理解用户的意图:
|
|
|
- # 用户可能并不是想完全随便命名,而是希望前端能输入两个名字。
|
|
|
- # 我们可以强制要求父集合必须是 子集合名 + _parent ? 不,用户说要自主命名。
|
|
|
-
|
|
|
- # 唯一的解法:
|
|
|
- # 在创建时,将这两个 KB 关联起来。
|
|
|
- # 既然我们没有改数据库 Schema 加 parent_id,我们只能通过名称匹配,或者
|
|
|
- # 只能修改创建逻辑,只创建一个“主 KB”记录,但是在该记录中存储两个 collection_name?
|
|
|
- # Schema 中只有一个 collection_name。
|
|
|
-
|
|
|
- # 让我们再看一眼用户的需求:
|
|
|
- # "知识库新增界面的集合名称不用改,依旧还是分为父知识库和子知识库方便用户自主命名" -> 前端两个框。
|
|
|
- # "创建后前端页面的知识库名称只有一个" -> 列表页一行。
|
|
|
-
|
|
|
- # 方案:
|
|
|
- # 前端发送两次 create 请求(如刚刚回退的代码所示)。
|
|
|
- # 这样 DB 里会有两条记录。
|
|
|
- # 后端 get_list 如何把它们合并成一行?
|
|
|
- # 如果没有命名规律,后端无法合并。
|
|
|
-
|
|
|
- # 也许用户所谓的“自主命名”只是想自己决定前缀?
|
|
|
- # 不,如果是那样,之前的自动 _parent 逻辑也可以满足(只要输入前缀)。
|
|
|
- # 用户特意强调“分为父知识库和子知识库方便用户自主命名”,说明他想完全控制两个名字,比如 "law_docs" 和 "law_chunks"。
|
|
|
-
|
|
|
- # 如果必须合并显示,必须有对应关系。
|
|
|
- # 临时方案:
|
|
|
- # 假定用户命名的父集合包含子集合名?或者我们只显示子集合,然后在“表名”列把所有“孤儿”集合都列出来?
|
|
|
- # 不太可能。
|
|
|
-
|
|
|
- # 让我们回顾一下之前的实现:
|
|
|
- # 之前是前端只输一个名,后端自动加 _parent。
|
|
|
- # 现在用户要输两个名。
|
|
|
-
|
|
|
- # 最佳实践:
|
|
|
- # 修改 create 接口,接收 parent_collection_name 和 child_collection_name。
|
|
|
- # 在 DB 里存一条主记录(Child),并把 Parent 的名字存在 Description 或者 Metadata 里?
|
|
|
- # 或者存两条记录,但通过 Description 标记?
|
|
|
-
|
|
|
- # 鉴于不能改 Schema,我们可以约定:
|
|
|
- # 在创建时,给父 KB 的 description 加一个标记,比如 "Parent of {child_name}"。
|
|
|
- # 或者给子 KB 加 "Child of {parent_name}"。
|
|
|
-
|
|
|
- # 但这样查询效率极低。
|
|
|
-
|
|
|
- # 另一种可能:
|
|
|
- # 用户并不介意我们只显示“子集合”,但他希望在“表名”列能看到父集合的名字。
|
|
|
- # 如果我们无法自动关联,就无法显示。
|
|
|
-
|
|
|
- # 让我们采用最稳妥的方案:
|
|
|
- # 恢复后端 create 逻辑为“只创建一条主记录,但接收两个集合名参数”。
|
|
|
- # 等等,create 接口参数是 KnowledgeBaseCreate,只有一个 collection_name。
|
|
|
- # 如果前端调两次 create,那就是两条独立的记录。
|
|
|
-
|
|
|
- # 除非... 我们修改 KnowledgeBaseCreate Schema,允许传 parent_collection_name。
|
|
|
- # 然后后端只创建一条 KnowledgeBase 记录,但是该记录对应两个 Milvus 集合?
|
|
|
- # 不行,KnowledgeBase 模型是一对一映射到 collection_name 的。
|
|
|
-
|
|
|
- # 让我们假设用户接受“自动 _parent 后缀”的约束,只是希望前端能看到两个框?
|
|
|
- # 如果用户一定要自主命名且不遵循后缀,且要合并显示,且不改 DB Schema,这在逻辑上是不可能的(无法找回关系)。
|
|
|
-
|
|
|
- # 猜测:用户可能没意识到“自主命名”和“合并显示”之间的矛盾。
|
|
|
- # 或者,用户希望我们用 "Name" (知识库名称) 来关联?
|
|
|
- # 前端传过来的 name 分别是 "XX (父)" 和 "XX (子)"。
|
|
|
- # 我们可以通过去除后缀的 Name 来关联!
|
|
|
-
|
|
|
- # 逻辑:
|
|
|
- # 1. 获取所有 KB。
|
|
|
- # 2. 按 Name 去除 " (父)" / " (子)" 后分组。
|
|
|
- # 3. 如果一组里有两个,合并显示。
|
|
|
-
|
|
|
- # 让我们试试这个方案。
|
|
|
-
|
|
|
- # 查询未删除的 KB
|
|
|
- query = select(KnowledgeBase).where(KnowledgeBase.is_deleted == 0)
|
|
|
-
|
|
|
- if keyword:
|
|
|
- query = query.where(or_(
|
|
|
- KnowledgeBase.name.like(f"%{keyword}%"),
|
|
|
- KnowledgeBase.collection_name.like(f"%{keyword}%")
|
|
|
- ))
|
|
|
-
|
|
|
- if status:
|
|
|
- query = query.where(KnowledgeBase.status == status)
|
|
|
-
|
|
|
- # 获取所有符合条件的记录(不再分页查询,为了手动合并)
|
|
|
- # 注意:如果数据量大,这会有性能问题。但目前是内部系统。
|
|
|
- # 为了分页准确,我们必须在内存中合并后再分页。
|
|
|
-
|
|
|
- result = await db.execute(query.order_by(KnowledgeBase.created_time.desc()))
|
|
|
- all_items = result.scalars().all()
|
|
|
-
|
|
|
- # 内存合并逻辑
|
|
|
- merged_items = []
|
|
|
- # 用字典暂存: base_name -> {child: kb, parent: kb}
|
|
|
- kb_groups = {}
|
|
|
-
|
|
|
- for item in all_items:
|
|
|
- # 尝试解析名字
|
|
|
- base_name = item.name
|
|
|
- is_parent = False
|
|
|
- is_child = False
|
|
|
-
|
|
|
- if item.name.endswith(" (父)"):
|
|
|
- base_name = item.name[:-4]
|
|
|
- is_parent = True
|
|
|
- elif item.name.endswith(" (子)"):
|
|
|
- base_name = item.name[:-4]
|
|
|
- is_child = True
|
|
|
-
|
|
|
- # 如果不符合命名规范,就当作独立项
|
|
|
- if not (is_parent or is_child):
|
|
|
- # 检查是否之前自动生成的 (Name没有后缀,Collection有 _parent)
|
|
|
- if item.collection_name.endswith("_parent"):
|
|
|
- # 尝试找对应的主集合
|
|
|
- guess_child_coll = item.collection_name[:-7]
|
|
|
- # 这里比较麻烦,因为我们是按 Name 分组。
|
|
|
- # 简单起见,如果 Name 不匹配模式,直接显示。
|
|
|
- merged_items.append(item)
|
|
|
- continue
|
|
|
- else:
|
|
|
- # 可能是旧数据,或者单集合
|
|
|
- # 也有可能是旧逻辑生成的子集合(Name没后缀)
|
|
|
- # 尝试找它的 Parent (collection_name + _parent)
|
|
|
- # 这个逻辑太复杂了。
|
|
|
-
|
|
|
- # 让我们只针对新逻辑(Name带后缀)做合并。
|
|
|
- # 对旧逻辑(自动 _parent),我们沿用之前的“过滤 _parent”逻辑?
|
|
|
- # 不,用户说“依旧还是分为...”,说明他可能习惯旧的操作方式?
|
|
|
- # 不,之前的操作方式是只输一个名。
|
|
|
-
|
|
|
- # 让我们统一逻辑:
|
|
|
- # 以前端传来的 Name 模式 "XXX (父)" / "XXX (子)" 为主键进行合并。
|
|
|
- merged_items.append(item)
|
|
|
- continue
|
|
|
-
|
|
|
- if base_name not in kb_groups:
|
|
|
- kb_groups[base_name] = {"child": None, "parent": None, "others": []}
|
|
|
-
|
|
|
- if is_child:
|
|
|
- kb_groups[base_name]["child"] = item
|
|
|
- elif is_parent:
|
|
|
- kb_groups[base_name]["parent"] = item
|
|
|
-
|
|
|
- # 生成合并后的列表
|
|
|
- final_list = []
|
|
|
+ query = query.order_by(KnowledgeBase.created_time.desc())
|
|
|
|
|
|
- # 处理分组
|
|
|
- for base_name, group in kb_groups.items():
|
|
|
- child = group["child"]
|
|
|
- parent = group["parent"]
|
|
|
-
|
|
|
- if child and parent:
|
|
|
- # 合并显示,使用 Child 的记录,但修改 collection_name
|
|
|
- child.collection_name = f"{child.collection_name}\n{parent.collection_name}"
|
|
|
- # 修改 Name 去掉后缀
|
|
|
- child.name = base_name
|
|
|
- final_list.append(child)
|
|
|
- elif child:
|
|
|
- final_list.append(child)
|
|
|
- elif parent:
|
|
|
- # 只有父没有子?显示父
|
|
|
- final_list.append(parent)
|
|
|
-
|
|
|
- # 把那些不符合命名规范的(在循环中直接 append 的)也加进来
|
|
|
- # 等等,上面的循环逻辑有漏洞,merged_items 和 kb_groups 是分开的。
|
|
|
-
|
|
|
- # 重写合并逻辑:
|
|
|
- # 1. 遍历所有 items
|
|
|
- # 2. 如果是 "(子)",放入 map 等待 "(父)"
|
|
|
- # 3. 如果是 "(父)",放入 map 等待 "(子)"
|
|
|
- # 4. 如果都不是,直接放入 final_list
|
|
|
-
|
|
|
- # 修正:
|
|
|
- # 我们不能简单地把 Parent 藏起来。
|
|
|
- # 如果是 create 出来的,有两个记录。
|
|
|
- # 我们希望在列表中只显示一行。
|
|
|
+ # 分页
|
|
|
+ total = await db.scalar(select(func.count()).select_from(query.subquery()))
|
|
|
|
|
|
- processed_ids = set()
|
|
|
- final_list = []
|
|
|
-
|
|
|
- # 建立 lookup map
|
|
|
- name_map = {kb.name: kb for kb in all_items}
|
|
|
+ # 计算总页数
|
|
|
+ total_pages = ceil(total / page_size) if page_size else 0
|
|
|
|
|
|
- for item in all_items:
|
|
|
- if item.id in processed_ids:
|
|
|
- continue
|
|
|
-
|
|
|
- if item.name.endswith(" (子)"):
|
|
|
- base_name = item.name[:-4]
|
|
|
- parent_name = f"{base_name} (父)"
|
|
|
-
|
|
|
- if parent_name in name_map:
|
|
|
- parent_kb = name_map[parent_name]
|
|
|
- # 找到了一对!
|
|
|
- # 合并信息到 item (子)
|
|
|
- item.name = base_name # 展示时去掉后缀
|
|
|
- item.collection_name = f"{item.collection_name}\n{parent_kb.collection_name}"
|
|
|
-
|
|
|
- final_list.append(item)
|
|
|
- processed_ids.add(item.id)
|
|
|
- processed_ids.add(parent_kb.id)
|
|
|
- continue
|
|
|
-
|
|
|
- if item.name.endswith(" (父)"):
|
|
|
- # 如果是未匹配的父集合,隐藏不显示,以免重复
|
|
|
- continue
|
|
|
-
|
|
|
- # 普通项,单独显示
|
|
|
- final_list.append(item)
|
|
|
- processed_ids.add(item.id)
|
|
|
-
|
|
|
- # 重新计算分页
|
|
|
- total = len(final_list)
|
|
|
- start = (page - 1) * page_size
|
|
|
- end = start + page_size
|
|
|
- items = final_list[start:end]
|
|
|
+ # 获取当前页数据
|
|
|
+ result = await db.execute(query.offset((page - 1) * page_size).limit(page_size))
|
|
|
+ items = result.scalars().all()
|
|
|
|
|
|
- # 设置 is_synced (这里 collection_name 已经被改了,包含换行)
|
|
|
+ # 设置 is_synced (辅助字段,不存库)
|
|
|
+ milvus_names_set = set(milvus_service.client.list_collections())
|
|
|
for item in items:
|
|
|
- # 简单的 split
|
|
|
- names = item.collection_name.split('\n')
|
|
|
- # 只要有一个 sync 了就算 sync?或者都 sync?
|
|
|
- # 只要主集合 sync 了就行
|
|
|
- main_coll = names[0]
|
|
|
- item.is_synced = main_coll in milvus_names
|
|
|
- # 注意:这里 item.collection_name 已经包含了 parent,前端会展示
|
|
|
-
|
|
|
- total_pages = ceil(total / page_size) if page_size else 0
|
|
|
+ c1_ok = item.collection_name1 in milvus_names_set
|
|
|
+ c2_ok = True
|
|
|
+ if item.collection_name2:
|
|
|
+ c2_ok = item.collection_name2 in milvus_names_set
|
|
|
+
|
|
|
+ item.is_synced = c1_ok and c2_ok
|
|
|
|
|
|
meta = PaginationSchema(
|
|
|
page=page,
|
|
|
@@ -590,53 +299,45 @@ class KnowledgeBaseService:
|
|
|
async def create(self, db: AsyncSession, payload: KnowledgeBaseCreate) -> KnowledgeBase:
|
|
|
"""创建新知识库"""
|
|
|
# 1. 检查 DB 是否已存在
|
|
|
- exists = await db.execute(select(KnowledgeBase).where(
|
|
|
- KnowledgeBase.collection_name == payload.collection_name,
|
|
|
+ # 检查 collection_name1
|
|
|
+ exists1 = await db.execute(select(KnowledgeBase).where(
|
|
|
+ KnowledgeBase.collection_name1 == payload.collection_name1,
|
|
|
KnowledgeBase.is_deleted == 0
|
|
|
))
|
|
|
- if exists.scalars().first():
|
|
|
- raise ValueError("知识库集合名称已存在")
|
|
|
-
|
|
|
- # 额外检查:是否在软删除记录中存在?如果存在,建议先彻底删除或恢复
|
|
|
- # 这里我们简单处理:如果存在软删除记录,直接物理删除它,以便重新创建
|
|
|
- # 或者在 exists 查询中去掉 is_deleted 条件,禁止重名
|
|
|
-
|
|
|
- soft_deleted = await db.execute(select(KnowledgeBase).where(
|
|
|
- KnowledgeBase.collection_name == payload.collection_name,
|
|
|
- KnowledgeBase.is_deleted == 1
|
|
|
- ))
|
|
|
- soft_deleted_kb = soft_deleted.scalars().first()
|
|
|
- if soft_deleted_kb:
|
|
|
- # 物理删除软删除记录,允许重建
|
|
|
- await db.delete(soft_deleted_kb)
|
|
|
- await db.flush() # 立即执行删除
|
|
|
-
|
|
|
- # 2. 检查 Milvus 是否已存在 (如果之前残留)
|
|
|
- # if milvus_service.has_collection(payload.collection_name):
|
|
|
- # raise ValueError("Milvus集合已存在,请使用其他名称")
|
|
|
+ if exists1.scalars().first():
|
|
|
+ raise ValueError(f"集合名称 {payload.collection_name1} 已存在")
|
|
|
+
|
|
|
+ # 检查 collection_name2
|
|
|
+ if payload.collection_name2:
|
|
|
+ exists2 = await db.execute(select(KnowledgeBase).where(
|
|
|
+ or_(
|
|
|
+ KnowledgeBase.collection_name1 == payload.collection_name2,
|
|
|
+ KnowledgeBase.collection_name2 == payload.collection_name2
|
|
|
+ ),
|
|
|
+ KnowledgeBase.is_deleted == 0
|
|
|
+ ))
|
|
|
+ if exists2.scalars().first():
|
|
|
+ raise ValueError(f"集合名称 {payload.collection_name2} 已存在")
|
|
|
|
|
|
try:
|
|
|
- # 3. 创建 Milvus 集合 (延迟到点击同步按钮时创建)
|
|
|
- # milvus_service.create_collection(...)
|
|
|
-
|
|
|
- # 4. 创建 DB 记录 (主集合)
|
|
|
+ # 3. 创建 DB 记录
|
|
|
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
new_kb = KnowledgeBase(
|
|
|
id=str(uuid.uuid4()),
|
|
|
name=payload.name,
|
|
|
- collection_name=payload.collection_name,
|
|
|
+ collection_name1=payload.collection_name1,
|
|
|
+ collection_name2=payload.collection_name2,
|
|
|
description=payload.description,
|
|
|
status=payload.status or "normal",
|
|
|
- created_by="admin", # 暂时硬编码为 admin,后续对接用户系统
|
|
|
+ created_by="admin",
|
|
|
updated_by="admin",
|
|
|
created_time=now,
|
|
|
- updated_time=now
|
|
|
+ updated_time=now,
|
|
|
+ document_count=0
|
|
|
)
|
|
|
db.add(new_kb)
|
|
|
|
|
|
- # [Modified] 移除自动创建 Parent 集合记录,因为前端现在会明确创建两次
|
|
|
-
|
|
|
- # 5. 保存元数据定义 (如果有)
|
|
|
+ # 4. 保存元数据定义 (如果有)
|
|
|
if payload.metadata_fields:
|
|
|
for field in payload.metadata_fields:
|
|
|
new_metadata = SampleMetadata(
|
|
|
@@ -648,14 +349,6 @@ class KnowledgeBaseService:
|
|
|
remark=field.remark
|
|
|
)
|
|
|
db.add(new_metadata)
|
|
|
-
|
|
|
- # [Modified] 移除自动为 Parent 添加元数据
|
|
|
-
|
|
|
-
|
|
|
- # 6. 保存自定义Schema定义 (如果有) - 已废弃,使用固定Schema
|
|
|
- # if payload.custom_schemas:
|
|
|
- # for schema_field in payload.custom_schemas:
|
|
|
- # ...
|
|
|
|
|
|
await db.commit()
|
|
|
await db.refresh(new_kb)
|
|
|
@@ -673,64 +366,28 @@ class KnowledgeBaseService:
|
|
|
if not kb:
|
|
|
raise ValueError("知识库不存在")
|
|
|
|
|
|
- # [Fix] 如果是合并显示的子知识库,需要同时更新对应的父知识库
|
|
|
- # 查找对应的父知识库
|
|
|
- parent_kb = None
|
|
|
- if kb.name.endswith(" (子)"):
|
|
|
- base_name = kb.name[:-4]
|
|
|
- parent_name = f"{base_name} (父)"
|
|
|
- parent_res = await db.execute(select(KnowledgeBase).where(
|
|
|
- KnowledgeBase.name == parent_name,
|
|
|
- KnowledgeBase.is_deleted == 0
|
|
|
- ))
|
|
|
- parent_kb = parent_res.scalars().first()
|
|
|
-
|
|
|
try:
|
|
|
if payload.name is not None:
|
|
|
- # 如果修改了名称,需要保持 (子)/(父) 后缀
|
|
|
- # 注意:前端传来的 name 可能是不带后缀的 base_name (因为列表页去掉了后缀)
|
|
|
- # 这里的 payload.name 到底带不带后缀?
|
|
|
- # 如果是前端表单直接提交,通常是用户输入的 base_name。
|
|
|
- # 我们假设 payload.name 是新的 base_name。
|
|
|
-
|
|
|
- # 如果原名带后缀,则加上后缀
|
|
|
- if kb.name.endswith(" (子)"):
|
|
|
- kb.name = f"{payload.name} (子)"
|
|
|
- if parent_kb:
|
|
|
- parent_kb.name = f"{payload.name} (父)"
|
|
|
- elif kb.name.endswith(" (父)"):
|
|
|
- # 理论上只编辑子集合,但防御性编程
|
|
|
- kb.name = f"{payload.name} (父)"
|
|
|
- else:
|
|
|
- # 普通集合
|
|
|
- kb.name = payload.name
|
|
|
+ kb.name = payload.name
|
|
|
|
|
|
if payload.description is not None:
|
|
|
kb.description = payload.description
|
|
|
# 同步更新 Milvus 描述 (如果 Milvus 中存在该集合)
|
|
|
- if milvus_service.has_collection(kb.collection_name):
|
|
|
- milvus_service.update_collection_description(kb.collection_name, payload.description)
|
|
|
-
|
|
|
- if parent_kb:
|
|
|
- parent_kb.description = payload.description
|
|
|
- if milvus_service.has_collection(parent_kb.collection_name):
|
|
|
- milvus_service.update_collection_description(parent_kb.collection_name, payload.description)
|
|
|
+ if kb.collection_name1 and milvus_service.has_collection(kb.collection_name1):
|
|
|
+ milvus_service.update_collection_description(kb.collection_name1, payload.description)
|
|
|
+ if kb.collection_name2 and milvus_service.has_collection(kb.collection_name2):
|
|
|
+ milvus_service.update_collection_description(kb.collection_name2, payload.description)
|
|
|
|
|
|
if payload.status is not None:
|
|
|
kb.status = payload.status
|
|
|
- if parent_kb:
|
|
|
- parent_kb.status = payload.status
|
|
|
|
|
|
# 更新元数据字段 (Metadata Fields)
|
|
|
if payload.metadata_fields is not None:
|
|
|
# 1. 删除旧的元数据字段
|
|
|
await db.execute(sql_delete(SampleMetadata).where(SampleMetadata.knowledge_base_id == id))
|
|
|
- if parent_kb:
|
|
|
- await db.execute(sql_delete(SampleMetadata).where(SampleMetadata.knowledge_base_id == parent_kb.id))
|
|
|
|
|
|
# 2. 插入新的元数据字段
|
|
|
for field in payload.metadata_fields:
|
|
|
- # 子集合元数据
|
|
|
new_metadata = SampleMetadata(
|
|
|
id=str(uuid.uuid4()),
|
|
|
knowledge_base_id=kb.id,
|
|
|
@@ -740,29 +397,12 @@ class KnowledgeBaseService:
|
|
|
remark=field.remark
|
|
|
)
|
|
|
db.add(new_metadata)
|
|
|
-
|
|
|
- # 父集合元数据
|
|
|
- if parent_kb:
|
|
|
- new_metadata_parent = SampleMetadata(
|
|
|
- id=str(uuid.uuid4()),
|
|
|
- knowledge_base_id=parent_kb.id,
|
|
|
- field_zh_name=field.field_zh_name,
|
|
|
- field_en_name=field.field_en_name,
|
|
|
- field_type=field.field_type,
|
|
|
- remark=field.remark
|
|
|
- )
|
|
|
- db.add(new_metadata_parent)
|
|
|
|
|
|
kb.updated_by = "admin" # 暂时硬编码
|
|
|
- kb.updated_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 使用 updated_time 而不是 created_time
|
|
|
- if parent_kb:
|
|
|
- parent_kb.updated_by = "admin"
|
|
|
- parent_kb.updated_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+ kb.updated_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
await db.commit()
|
|
|
await db.refresh(kb)
|
|
|
- if parent_kb:
|
|
|
- await db.refresh(parent_kb)
|
|
|
|
|
|
return kb
|
|
|
except Exception as e:
|
|
|
@@ -779,13 +419,19 @@ class KnowledgeBaseService:
|
|
|
|
|
|
try:
|
|
|
kb.status = status
|
|
|
- kb.created_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+ kb.updated_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+
|
|
|
+ # 同步操作 Milvus Load/Release
|
|
|
+ targets = []
|
|
|
+ if kb.collection_name1: targets.append(kb.collection_name1)
|
|
|
+ if kb.collection_name2: targets.append(kb.collection_name2)
|
|
|
|
|
|
- # 可选:同步操作 Milvus Load/Release
|
|
|
- if status == "normal":
|
|
|
- milvus_service.set_collection_state(kb.collection_name, "load")
|
|
|
- elif status == "disabled":
|
|
|
- milvus_service.set_collection_state(kb.collection_name, "release")
|
|
|
+ for col in targets:
|
|
|
+ if milvus_service.has_collection(col):
|
|
|
+ if status == "normal":
|
|
|
+ milvus_service.set_collection_state(col, "load")
|
|
|
+ elif status == "disabled":
|
|
|
+ milvus_service.set_collection_state(col, "release")
|
|
|
|
|
|
await db.commit()
|
|
|
await db.refresh(kb)
|
|
|
@@ -803,26 +449,35 @@ class KnowledgeBaseService:
|
|
|
raise ValueError("知识库不存在")
|
|
|
|
|
|
# 检查文档数量
|
|
|
- # 先尝试从 Milvus 获取最新的实时数量
|
|
|
+ # 只要任意一个集合有文档,就阻止删除
|
|
|
current_count = kb.document_count
|
|
|
- if kb.collection_name and milvus_service.has_collection(kb.collection_name):
|
|
|
- try:
|
|
|
- # 使用统一的计数方法
|
|
|
- current_count = await self._get_collection_row_count(kb.collection_name)
|
|
|
- except Exception:
|
|
|
- # 获取失败则使用 DB 中的缓存值
|
|
|
- pass
|
|
|
+
|
|
|
+ # 尝试获取最新计数
|
|
|
+ try:
|
|
|
+ real_count = 0
|
|
|
+ if kb.collection_name1 and milvus_service.has_collection(kb.collection_name1):
|
|
|
+ real_count += await self._get_collection_row_count(kb.collection_name1)
|
|
|
+ if kb.collection_name2 and milvus_service.has_collection(kb.collection_name2):
|
|
|
+ real_count += await self._get_collection_row_count(kb.collection_name2)
|
|
|
+ current_count = real_count
|
|
|
+ except:
|
|
|
+ pass
|
|
|
|
|
|
if current_count > 0:
|
|
|
raise ValueError(f"知识库中仍有 {current_count} 条文档,请先清空文档后再删除")
|
|
|
|
|
|
try:
|
|
|
- # 1. 删除 Milvus 集合 (强制删除)
|
|
|
- try:
|
|
|
- if milvus_service.has_collection(kb.collection_name):
|
|
|
- milvus_service.drop_collection(kb.collection_name)
|
|
|
- except Exception as milvus_err:
|
|
|
- print(f"Ignore Milvus error during delete: {milvus_err}")
|
|
|
+ # 1. 删除 Milvus 集合
|
|
|
+ targets = []
|
|
|
+ if kb.collection_name1: targets.append(kb.collection_name1)
|
|
|
+ if kb.collection_name2: targets.append(kb.collection_name2)
|
|
|
+
|
|
|
+ for col in targets:
|
|
|
+ try:
|
|
|
+ if milvus_service.has_collection(col):
|
|
|
+ milvus_service.drop_collection(col)
|
|
|
+ except Exception as milvus_err:
|
|
|
+ print(f"Ignore Milvus error during delete {col}: {milvus_err}")
|
|
|
|
|
|
# 2. 软删除 DB 记录
|
|
|
kb.is_deleted = 1
|
|
|
@@ -834,31 +489,6 @@ class KnowledgeBaseService:
|
|
|
# 4. 删除关联的自定义Schema (硬删除)
|
|
|
await db.execute(sql_delete(CustomSchema).where(CustomSchema.knowledge_base_id == id))
|
|
|
|
|
|
- # [Modified] 级联删除 Parent 集合 (如果存在)
|
|
|
- if not kb.collection_name.endswith("_parent"):
|
|
|
- parent_collection_name = f"{kb.collection_name}_parent"
|
|
|
- parent_kb_res = await db.execute(select(KnowledgeBase).where(
|
|
|
- KnowledgeBase.collection_name == parent_collection_name
|
|
|
- )) # 不限制 is_deleted,即使已软删除也可能需要清理
|
|
|
- parent_kb = parent_kb_res.scalars().first()
|
|
|
-
|
|
|
- if parent_kb:
|
|
|
- # 递归调用自身删除 Parent KB (注意:如果 Parent 也有 count > 0 可能会抛出异常,这里假设应该一起清空)
|
|
|
- # 为了避免循环调用和复杂性,这里直接执行删除逻辑
|
|
|
-
|
|
|
- # 1. 删除 Milvus
|
|
|
- try:
|
|
|
- if milvus_service.has_collection(parent_collection_name):
|
|
|
- milvus_service.drop_collection(parent_collection_name)
|
|
|
- except: pass
|
|
|
-
|
|
|
- # 2. 软删除 DB
|
|
|
- parent_kb.is_deleted = 1
|
|
|
- parent_kb.updated_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
-
|
|
|
- # 3. 删除关联元数据
|
|
|
- await db.execute(sql_delete(SampleMetadata).where(SampleMetadata.knowledge_base_id == parent_kb.id))
|
|
|
-
|
|
|
await db.commit()
|
|
|
except Exception as e:
|
|
|
await db.rollback()
|
|
|
@@ -872,119 +502,44 @@ class KnowledgeBaseService:
|
|
|
if not kb:
|
|
|
raise ValueError("知识库不存在")
|
|
|
|
|
|
- if milvus_service.has_collection(kb.collection_name):
|
|
|
- raise ValueError("Milvus集合已存在")
|
|
|
-
|
|
|
# 使用固定的 Schema 定义
|
|
|
- # 这些字段必须与 snippet_service.py 中 insert 的数据结构一致
|
|
|
fields = [
|
|
|
- {
|
|
|
- "name": "pk",
|
|
|
- "type": "INT64",
|
|
|
- "is_primary": True,
|
|
|
- "description": "主键"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "text",
|
|
|
- "type": "VARCHAR",
|
|
|
- "max_length": 65535,
|
|
|
- "description": "内容"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "vector",
|
|
|
- "type": "FLOAT_VECTOR",
|
|
|
- "description": "向量列"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "sparse",
|
|
|
- "type": "BM25",
|
|
|
- "description": "内容的BM25关键字检索"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "document_id",
|
|
|
- "type": "VARCHAR",
|
|
|
- "max_length": 128,
|
|
|
- "description": "样本中心上传文档ID"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "parent_id",
|
|
|
- "type": "VARCHAR",
|
|
|
- "max_length": 128,
|
|
|
- "description": "父段ID"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "index",
|
|
|
- "type": "INT64",
|
|
|
- "description": "索引序号"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "tag_list",
|
|
|
- "type": "VARCHAR",
|
|
|
- "max_length": 2048,
|
|
|
- "description": "标签"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "permission",
|
|
|
- "type": "JSON",
|
|
|
- "description": "权限"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "metadata",
|
|
|
- "type": "JSON",
|
|
|
- "description": "元数据"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "is_deleted",
|
|
|
- "type": "BOOL",
|
|
|
- "description": "删除标志"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "created_by",
|
|
|
- "type": "VARCHAR",
|
|
|
- "max_length": 128,
|
|
|
- "description": "创建人"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "created_time",
|
|
|
- "type": "INT64",
|
|
|
- "description": "创建时间"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "updated_by",
|
|
|
- "type": "VARCHAR",
|
|
|
- "max_length": 128,
|
|
|
- "description": "修改人"
|
|
|
- },
|
|
|
- {
|
|
|
- "name": "updated_time",
|
|
|
- "type": "INT64",
|
|
|
- "description": "修改时间"
|
|
|
- }
|
|
|
+ {"name": "pk", "type": "INT64", "is_primary": True, "description": "主键"},
|
|
|
+ {"name": "text", "type": "VARCHAR", "max_length": 65535, "description": "内容"},
|
|
|
+ {"name": "vector", "type": "FLOAT_VECTOR", "description": "向量列"},
|
|
|
+ {"name": "sparse", "type": "BM25", "description": "内容的BM25关键字检索"},
|
|
|
+ {"name": "document_id", "type": "VARCHAR", "max_length": 128, "description": "样本中心上传文档ID"},
|
|
|
+ {"name": "parent_id", "type": "VARCHAR", "max_length": 128, "description": "父段ID"},
|
|
|
+ {"name": "index", "type": "INT64", "description": "索引序号"},
|
|
|
+ {"name": "tag_list", "type": "VARCHAR", "max_length": 2048, "description": "标签"},
|
|
|
+ {"name": "permission", "type": "JSON", "description": "权限"},
|
|
|
+ {"name": "metadata", "type": "JSON", "description": "元数据"},
|
|
|
+ {"name": "is_deleted", "type": "BOOL", "description": "删除标志"},
|
|
|
+ {"name": "created_by", "type": "VARCHAR", "max_length": 128, "description": "创建人"},
|
|
|
+ {"name": "created_time", "type": "INT64", "description": "创建时间"},
|
|
|
+ {"name": "updated_by", "type": "VARCHAR", "max_length": 128, "description": "修改人"},
|
|
|
+ {"name": "updated_time", "type": "INT64", "description": "修改时间"}
|
|
|
]
|
|
|
|
|
|
- # 2. 自动添加 metadata 字段 (JSON类型) - 已包含在 fields 中,移除此段逻辑
|
|
|
- # 即使没有定义元数据字段,通常也需要一个 JSON 类型的 metadata 字段来存储灵活的元数据
|
|
|
- # 如果用户在 t_samp_metadata 中定义了元数据结构,这些结构实际上是存储在 metadata 字段中的 KV 对
|
|
|
- # 但为了方便检索,我们也可以选择将 metadata 作为一个独立的 JSON 字段存在 Milvus 中
|
|
|
-
|
|
|
- # 检查是否已经有名为 'metadata' 的自定义字段,避免冲突
|
|
|
- # has_metadata_field = any(f['name'] == 'metadata' for f in fields)
|
|
|
- # if not has_metadata_field:
|
|
|
- # fields.append({
|
|
|
- # "name": "metadata",
|
|
|
- # "type": "JSON",
|
|
|
- # "description": "默认元数据字段"
|
|
|
- # })
|
|
|
-
|
|
|
try:
|
|
|
- # 暂时无法获取维度信息,默认768,或者应该在数据库中存储维度
|
|
|
- # 假设默认 768,后续可以在 KnowledgeBase 模型中增加 dimension 字段
|
|
|
- milvus_service.create_collection(
|
|
|
- name=kb.collection_name,
|
|
|
- dimension=milvus_service.DENSE_DIM,
|
|
|
- description=kb.description or "",
|
|
|
- fields=fields
|
|
|
- )
|
|
|
+ # 创建 collection1
|
|
|
+ if kb.collection_name1 and not milvus_service.has_collection(kb.collection_name1):
|
|
|
+ milvus_service.create_collection(
|
|
|
+ name=kb.collection_name1,
|
|
|
+ dimension=milvus_service.DENSE_DIM,
|
|
|
+ description=kb.description or "",
|
|
|
+ fields=fields
|
|
|
+ )
|
|
|
+
|
|
|
+ # 创建 collection2
|
|
|
+ if kb.collection_name2 and not milvus_service.has_collection(kb.collection_name2):
|
|
|
+ milvus_service.create_collection(
|
|
|
+ name=kb.collection_name2,
|
|
|
+ dimension=milvus_service.DENSE_DIM,
|
|
|
+ description=kb.description or "",
|
|
|
+ fields=fields
|
|
|
+ )
|
|
|
+
|
|
|
return kb
|
|
|
except Exception as e:
|
|
|
raise e
|
|
|
@@ -1002,13 +557,14 @@ class KnowledgeBaseService:
|
|
|
meta_result = await db.execute(meta_query)
|
|
|
metadata_fields = [f.to_dict() for f in meta_result.scalars().all()]
|
|
|
|
|
|
- # 自动推断逻辑:如果 DB 中没有定义元数据,且 Milvus 中有数据,尝试推断
|
|
|
- if not metadata_fields and kb.collection_name and milvus_service.has_collection(kb.collection_name):
|
|
|
+ # 自动推断逻辑:如果 DB 中没有定义元数据,且 Milvus 中有数据,尝试推断 (优先推断 collection1)
|
|
|
+ target_col = kb.collection_name1
|
|
|
+ if not metadata_fields and target_col and milvus_service.has_collection(target_col):
|
|
|
try:
|
|
|
# 采样查询 (获取前10条)
|
|
|
try:
|
|
|
res = milvus_service.client.query(
|
|
|
- collection_name=kb.collection_name,
|
|
|
+ collection_name=target_col,
|
|
|
filter="is_deleted == false",
|
|
|
output_fields=["metadata"],
|
|
|
limit=10
|
|
|
@@ -1016,7 +572,7 @@ class KnowledgeBaseService:
|
|
|
except Exception as e:
|
|
|
# 如果 filter 查询失败(可能不支持 is_deleted),尝试无 filter 查询
|
|
|
res = milvus_service.client.query(
|
|
|
- collection_name=kb.collection_name,
|
|
|
+ collection_name=target_col,
|
|
|
filter="",
|
|
|
output_fields=["metadata"],
|
|
|
limit=10
|
|
|
@@ -1058,9 +614,9 @@ class KnowledgeBaseService:
|
|
|
|
|
|
await db.commit()
|
|
|
metadata_fields = new_fields
|
|
|
- print(f"Auto inferred metadata for {kb.collection_name}: {inferred_keys}")
|
|
|
+ print(f"Auto inferred metadata for {target_col}: {inferred_keys}")
|
|
|
except Exception as e:
|
|
|
- print(f"Failed to infer metadata for {kb.collection_name}: {e}")
|
|
|
+ print(f"Failed to infer metadata for {target_col}: {e}")
|
|
|
# 推断失败不影响正常返回
|
|
|
|
|
|
# 返回空的 custom_schemas,因为现在是固定 Schema
|
|
|
@@ -1086,38 +642,48 @@ class KnowledgeBaseService:
|
|
|
|
|
|
async def update_doc_count(self, db: AsyncSession, collection_name: str) -> int:
|
|
|
"""根据 Milvus 实时数据更新知识库文档数量
|
|
|
-
|
|
|
返回最新的行计数(int),调用方可以使用该返回值立即更新前端显示。
|
|
|
"""
|
|
|
- # 查找知识库
|
|
|
+ # 查找知识库 (匹配任意一个 collection_name)
|
|
|
result = await db.execute(select(KnowledgeBase).where(
|
|
|
- KnowledgeBase.collection_name == collection_name,
|
|
|
+ or_(
|
|
|
+ KnowledgeBase.collection_name1 == collection_name,
|
|
|
+ KnowledgeBase.collection_name2 == collection_name
|
|
|
+ ),
|
|
|
KnowledgeBase.is_deleted == 0
|
|
|
))
|
|
|
kb = result.scalars().first()
|
|
|
|
|
|
if kb and milvus_service.has_collection(collection_name):
|
|
|
try:
|
|
|
- # 确保集合已加载以获取准确计数 (query 需要 Loaded 状态)
|
|
|
+ # 确保集合已加载
|
|
|
state = milvus_service.get_collection_state(collection_name)
|
|
|
if state != "Loaded":
|
|
|
- print(f"Collection {collection_name} is {state}, loading...")
|
|
|
+ # print(f"Collection {collection_name} is {state}, loading...")
|
|
|
milvus_service.set_collection_state(collection_name, "load")
|
|
|
|
|
|
- # 使用统一的计数方法
|
|
|
- row_count = await self._get_collection_row_count(collection_name)
|
|
|
+ # 获取该集合的计数
|
|
|
+ # 注意:这里我们应该更新总计数,所以需要获取两个集合的计数
|
|
|
+ count1 = 0
|
|
|
+ count2 = 0
|
|
|
+
|
|
|
+ if kb.collection_name1:
|
|
|
+ count1 = await self._get_collection_row_count(kb.collection_name1)
|
|
|
+ if kb.collection_name2:
|
|
|
+ count2 = await self._get_collection_row_count(kb.collection_name2)
|
|
|
+
|
|
|
+ total_count = count1 + count2
|
|
|
|
|
|
# 更新数据库
|
|
|
- if kb.document_count != row_count:
|
|
|
- print(f"Updating doc count for {collection_name}: {kb.document_count} -> {row_count}")
|
|
|
- kb.document_count = row_count
|
|
|
+ if kb.document_count != total_count:
|
|
|
+ # print(f"Updating doc count for KB {kb.name}: {kb.document_count} -> {total_count}")
|
|
|
+ kb.document_count = total_count
|
|
|
kb.updated_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
await db.commit()
|
|
|
- return row_count
|
|
|
+ return total_count
|
|
|
except Exception as e:
|
|
|
print(f"Failed to update doc count for {collection_name}: {e}")
|
|
|
return 0
|
|
|
- # 如果没有对应的 KB 或者 Milvus 无此集合,返回 0 以便调用方处理
|
|
|
return 0
|
|
|
|
|
|
-knowledge_base_service = KnowledgeBaseService()
|
|
|
+knowledge_base_service = KnowledgeBaseService()
|