|
|
@@ -24,24 +24,80 @@ class KnowledgeBaseService:
|
|
|
async def _get_collection_row_count(self, collection_name: str) -> int:
|
|
|
"""获取集合行数(优先尝试 count(*) 以获取实时准确值)"""
|
|
|
try:
|
|
|
+ # 确保集合已加载
|
|
|
+ state = milvus_service.get_collection_state(collection_name)
|
|
|
+ if state != "Loaded":
|
|
|
+ # [Fix] 检查是否存在索引,如果没有索引则不能加载,避免抛出 index not found 错误
|
|
|
+ try:
|
|
|
+ indexes = milvus_service.client.list_indexes(collection_name)
|
|
|
+ if not indexes:
|
|
|
+ # 无索引无法加载,直接跳过,进入 Fallback 使用 stats
|
|
|
+ # print(f"Collection {collection_name} has no index, skipping load.")
|
|
|
+ raise Exception("Collection has no index, cannot load")
|
|
|
+ except Exception:
|
|
|
+ # list_indexes 失败也视为无法加载
|
|
|
+ raise Exception("Failed to check indexes or no index")
|
|
|
+
|
|
|
+ # print(f"Auto loading collection {collection_name} for counting...")
|
|
|
+ milvus_service.set_collection_state(collection_name, "load")
|
|
|
+
|
|
|
# 尝试使用 count(*) 获取准确的实时数量
|
|
|
# 过滤掉已标记删除的数据 (is_deleted == false)
|
|
|
- # 注意:如果 Schema 中没有 is_deleted 字段,这里可能会报错,需要根据实际 Schema 调整
|
|
|
- # 但之前的代码中 Schema 确实包含了 is_deleted
|
|
|
try:
|
|
|
- # 检查集合是否已加载
|
|
|
- if milvus_service.get_collection_state(collection_name) == "Loaded":
|
|
|
- res = milvus_service.client.query(collection_name, filter="is_deleted == false", output_fields=["count(*)"])
|
|
|
- if res and isinstance(res, list) and "count(*)" in res[0]:
|
|
|
- return int(res[0]["count(*)"])
|
|
|
- except Exception:
|
|
|
- # 再次尝试不过滤
|
|
|
+ # [Fix] 动态检测 is_deleted 类型,自适应 Int64 或 Bool
|
|
|
+ desc = milvus_service.client.describe_collection(collection_name)
|
|
|
+ fields = desc.get('fields', [])
|
|
|
+ is_del_field = next((f for f in fields if f['name'] == 'is_deleted'), None)
|
|
|
+
|
|
|
+ filter_expr = ""
|
|
|
+ if is_del_field:
|
|
|
+ if is_del_field.get('type') == 1: # Bool=1
|
|
|
+ filter_expr = "is_deleted == false"
|
|
|
+ else:
|
|
|
+ filter_expr = "is_deleted == 0" # 默认 Int
|
|
|
+ else:
|
|
|
+ # 字段不存在,使用恒真条件
|
|
|
+ pk_field = "id"
|
|
|
+ for f in fields:
|
|
|
+ if f.get('is_primary') or f.get('primary_key'):
|
|
|
+ pk_field = f.get('name')
|
|
|
+ break
|
|
|
+ # 使用 PK 过滤
|
|
|
+ filter_expr = f"{pk_field} >= 0" # 假设是 Int PK,如果是 VarChar 需调整
|
|
|
+ # 检测 PK 类型
|
|
|
+ pk_type = next((f.get('type') for f in fields if f.get('name') == pk_field), None)
|
|
|
+ # Milvus DataType: INT64=5, VARCHAR=21
|
|
|
+ if pk_type == 21 or pk_type == 'VarChar':
|
|
|
+ filter_expr = f'{pk_field} != ""'
|
|
|
+
|
|
|
+ # 注意:count(*) 聚合查询不支持分页参数 (limit/offset),也不能用于获取实体
|
|
|
+ res = milvus_service.client.query(collection_name, filter=filter_expr, output_fields=["count(*)"])
|
|
|
+ if res and isinstance(res, list) and "count(*)" in res[0]:
|
|
|
+ return int(res[0]["count(*)"])
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Query count with filter error for {collection_name}: {e}")
|
|
|
+ # 再次尝试不过滤 (使用恒真表达式)
|
|
|
if milvus_service.get_collection_state(collection_name) == "Loaded":
|
|
|
- res = milvus_service.client.query(collection_name, filter="", output_fields=["count(*)"])
|
|
|
- if res and isinstance(res, list) and "count(*)" in res[0]:
|
|
|
+ # 获取 PK 字段名
|
|
|
+ desc = milvus_service.client.describe_collection(collection_name)
|
|
|
+ fields = desc.get('fields', [])
|
|
|
+ pk_field = "id"
|
|
|
+ pk_type = 5
|
|
|
+ for f in fields:
|
|
|
+ if f.get('is_primary') or f.get('primary_key'):
|
|
|
+ pk_field = f.get('name')
|
|
|
+ pk_type = f.get('type')
|
|
|
+ break
|
|
|
+
|
|
|
+ filter_expr = f"{pk_field} >= 0"
|
|
|
+ if pk_type == 21 or pk_type == 'VarChar':
|
|
|
+ filter_expr = f'{pk_field} != ""'
|
|
|
+
|
|
|
+ res = milvus_service.client.query(collection_name, filter=filter_expr, output_fields=["count(*)"])
|
|
|
+ if res and isinstance(res, list) and "count(*)" in res[0]:
|
|
|
return int(res[0]["count(*)"])
|
|
|
- except Exception:
|
|
|
- pass
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Get collection row count error for {collection_name}: {e}")
|
|
|
|
|
|
# Fallback: 使用 get_collection_stats (可能包含已删除未 Compaction 的数据)
|
|
|
try:
|
|
|
@@ -61,21 +117,54 @@ class KnowledgeBaseService:
|
|
|
# milvus_service.load_collection(kb.collection_name)
|
|
|
|
|
|
# 采样查询 (获取前10条)
|
|
|
+ res = []
|
|
|
try:
|
|
|
+ # 先检查 metadata 字段是否存在,避免报错 field metadata not exist
|
|
|
+ desc = milvus_service.client.describe_collection(kb.collection_name)
|
|
|
+ fields = [f['name'] for f in desc.get('fields', [])]
|
|
|
+ if "metadata" not in fields:
|
|
|
+ return # 集合无 metadata 字段,无需推断
|
|
|
+
|
|
|
+ # [Fix] 动态检测 is_deleted 类型,自适应 Int64 或 Bool
|
|
|
+ is_del_field = next((f for f in fields if f['name'] == 'is_deleted'), None)
|
|
|
+ filter_expr = "is_deleted == 0"
|
|
|
+ if is_del_field and is_del_field.get('type') == 1: # Bool=1
|
|
|
+ filter_expr = "is_deleted == false"
|
|
|
+ elif not is_del_field:
|
|
|
+ # 如果没有 is_deleted 字段,则不使用过滤
|
|
|
+ filter_expr = ""
|
|
|
+
|
|
|
+ # 如果 filter_expr 为空,使用恒真表达式以避免空 filter 错误
|
|
|
+ if not filter_expr:
|
|
|
+ pk_field = "id"
|
|
|
+ for f in fields:
|
|
|
+ if f.get('is_primary') or f.get('primary_key'):
|
|
|
+ pk_field = f.get('name')
|
|
|
+ break
|
|
|
+ filter_expr = f"{pk_field} >= 0"
|
|
|
+ pk_type = next((f.get('type') for f in fields if f.get('name') == pk_field), None)
|
|
|
+ if pk_type == 21 or pk_type == 'VarChar':
|
|
|
+ filter_expr = f'{pk_field} != ""'
|
|
|
+
|
|
|
res = milvus_service.client.query(
|
|
|
collection_name=kb.collection_name,
|
|
|
- filter="is_deleted == false",
|
|
|
+ filter=filter_expr,
|
|
|
output_fields=["metadata"],
|
|
|
limit=10
|
|
|
)
|
|
|
except Exception as e:
|
|
|
- # 如果 filter 查询失败(可能不支持 is_deleted),尝试无 filter 查询
|
|
|
- res = milvus_service.client.query(
|
|
|
- collection_name=kb.collection_name,
|
|
|
- filter="",
|
|
|
- output_fields=["metadata"],
|
|
|
- limit=10
|
|
|
- )
|
|
|
+ try:
|
|
|
+ # 如果 filter 查询失败(可能不支持 is_deleted),尝试无 filter 查询
|
|
|
+ # 再次确认 metadata 是否存在 (防止上面的检查因某种原因失效或异常被捕获)
|
|
|
+ if "metadata" in fields:
|
|
|
+ res = milvus_service.client.query(
|
|
|
+ collection_name=kb.collection_name,
|
|
|
+ filter="",
|
|
|
+ output_fields=["metadata"],
|
|
|
+ limit=10
|
|
|
+ )
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
|
|
|
if res:
|
|
|
inferred_keys = set()
|
|
|
@@ -226,6 +315,113 @@ class KnowledgeBaseService:
|
|
|
print(f"Sync Milvus collections failed: {e}")
|
|
|
# ----------------------
|
|
|
|
|
|
+ # 查询未删除的 KB
|
|
|
+ query = select(KnowledgeBase).where(KnowledgeBase.is_deleted == 0)
|
|
|
+
|
|
|
+ if keyword:
|
|
|
+ query = query.where(or_(
|
|
|
+ KnowledgeBase.name.like(f"%{keyword}%"),
|
|
|
+ KnowledgeBase.collection_name.like(f"%{keyword}%")
|
|
|
+ ))
|
|
|
+
|
|
|
+ if status:
|
|
|
+ query = query.where(KnowledgeBase.status == status)
|
|
|
+
|
|
|
+ # [Modified] 过滤掉 _parent 结尾的集合,只显示主集合 (现在不需要过滤了,因为用户想要分开创建)
|
|
|
+ # 但用户又说“知识库名称只有一个”,但表名是两行。
|
|
|
+ # 如果前端发了两次请求创建了两个 KB,那么 DB 里会有两个 KB。
|
|
|
+ # 现在的要求是:
|
|
|
+ # 1. 创建时:分开填表名 (Done, 前端已回退)
|
|
|
+ # 2. 列表显示时:合并显示 (Same as before)
|
|
|
+
|
|
|
+ # 关键点:如何识别哪两个是一对?
|
|
|
+ # 之前的逻辑是根据 _parent 后缀。
|
|
|
+ # 如果用户手动命名,比如 kb_child 和 kb_daddy,我们无法自动识别它们是一对。
|
|
|
+ # 除非我们在 DB 里加 parent_id 字段。
|
|
|
+
|
|
|
+ # 用户原话:“知识库新增界面的集合名称不用改,依旧还是分为父知识库和子知识库方便用户自主命名”
|
|
|
+ # 意思是创建时保留两个输入框。
|
|
|
+ # “我要求创建后前端页面的知识库名称只有一个,然后父子的知识库表名成两行形式放在知识库表名字段中”
|
|
|
+ # 意思是列表页要合并。
|
|
|
+
|
|
|
+ # 问题:如果用户随便命名,我们怎么合并?
|
|
|
+ # 假设:用户会遵循某种命名约定?或者我们在创建时记录关联?
|
|
|
+ # 之前的代码自动加 _parent 是为了方便关联。
|
|
|
+ # 如果用户自主命名,我们很难猜测。
|
|
|
+
|
|
|
+ # 重新理解用户的意图:
|
|
|
+ # 用户可能并不是想完全随便命名,而是希望前端能输入两个名字。
|
|
|
+ # 我们可以强制要求父集合必须是 子集合名 + _parent ? 不,用户说要自主命名。
|
|
|
+
|
|
|
+ # 唯一的解法:
|
|
|
+ # 在创建时,将这两个 KB 关联起来。
|
|
|
+ # 既然我们没有改数据库 Schema 加 parent_id,我们只能通过名称匹配,或者
|
|
|
+ # 只能修改创建逻辑,只创建一个“主 KB”记录,但是在该记录中存储两个 collection_name?
|
|
|
+ # Schema 中只有一个 collection_name。
|
|
|
+
|
|
|
+ # 让我们再看一眼用户的需求:
|
|
|
+ # "知识库新增界面的集合名称不用改,依旧还是分为父知识库和子知识库方便用户自主命名" -> 前端两个框。
|
|
|
+ # "创建后前端页面的知识库名称只有一个" -> 列表页一行。
|
|
|
+
|
|
|
+ # 方案:
|
|
|
+ # 前端发送两次 create 请求(如刚刚回退的代码所示)。
|
|
|
+ # 这样 DB 里会有两条记录。
|
|
|
+ # 后端 get_list 如何把它们合并成一行?
|
|
|
+ # 如果没有命名规律,后端无法合并。
|
|
|
+
|
|
|
+ # 也许用户所谓的“自主命名”只是想自己决定前缀?
|
|
|
+ # 不,如果是那样,之前的自动 _parent 逻辑也可以满足(只要输入前缀)。
|
|
|
+ # 用户特意强调“分为父知识库和子知识库方便用户自主命名”,说明他想完全控制两个名字,比如 "law_docs" 和 "law_chunks"。
|
|
|
+
|
|
|
+ # 如果必须合并显示,必须有对应关系。
|
|
|
+ # 临时方案:
|
|
|
+ # 假定用户命名的父集合包含子集合名?或者我们只显示子集合,然后在“表名”列把所有“孤儿”集合都列出来?
|
|
|
+ # 不太可能。
|
|
|
+
|
|
|
+ # 让我们回顾一下之前的实现:
|
|
|
+ # 之前是前端只输一个名,后端自动加 _parent。
|
|
|
+ # 现在用户要输两个名。
|
|
|
+
|
|
|
+ # 最佳实践:
|
|
|
+ # 修改 create 接口,接收 parent_collection_name 和 child_collection_name。
|
|
|
+ # 在 DB 里存一条主记录(Child),并把 Parent 的名字存在 Description 或者 Metadata 里?
|
|
|
+ # 或者存两条记录,但通过 Description 标记?
|
|
|
+
|
|
|
+ # 鉴于不能改 Schema,我们可以约定:
|
|
|
+ # 在创建时,给父 KB 的 description 加一个标记,比如 "Parent of {child_name}"。
|
|
|
+ # 或者给子 KB 加 "Child of {parent_name}"。
|
|
|
+
|
|
|
+ # 但这样查询效率极低。
|
|
|
+
|
|
|
+ # 另一种可能:
|
|
|
+ # 用户并不介意我们只显示“子集合”,但他希望在“表名”列能看到父集合的名字。
|
|
|
+ # 如果我们无法自动关联,就无法显示。
|
|
|
+
|
|
|
+ # 让我们采用最稳妥的方案:
|
|
|
+ # 恢复后端 create 逻辑为“只创建一条主记录,但接收两个集合名参数”。
|
|
|
+ # 等等,create 接口参数是 KnowledgeBaseCreate,只有一个 collection_name。
|
|
|
+ # 如果前端调两次 create,那就是两条独立的记录。
|
|
|
+
|
|
|
+ # 除非... 我们修改 KnowledgeBaseCreate Schema,允许传 parent_collection_name。
|
|
|
+ # 然后后端只创建一条 KnowledgeBase 记录,但是该记录对应两个 Milvus 集合?
|
|
|
+ # 不行,KnowledgeBase 模型是一对一映射到 collection_name 的。
|
|
|
+
|
|
|
+ # 让我们假设用户接受“自动 _parent 后缀”的约束,只是希望前端能看到两个框?
|
|
|
+ # 如果用户一定要自主命名且不遵循后缀,且要合并显示,且不改 DB Schema,这在逻辑上是不可能的(无法找回关系)。
|
|
|
+
|
|
|
+ # 猜测:用户可能没意识到“自主命名”和“合并显示”之间的矛盾。
|
|
|
+ # 或者,用户希望我们用 "Name" (知识库名称) 来关联?
|
|
|
+ # 前端传过来的 name 分别是 "XX (父)" 和 "XX (子)"。
|
|
|
+ # 我们可以通过去除后缀的 Name 来关联!
|
|
|
+
|
|
|
+ # 逻辑:
|
|
|
+ # 1. 获取所有 KB。
|
|
|
+ # 2. 按 Name 去除 " (父)" / " (子)" 后分组。
|
|
|
+ # 3. 如果一组里有两个,合并显示。
|
|
|
+
|
|
|
+ # 让我们试试这个方案。
|
|
|
+
|
|
|
+ # 查询未删除的 KB
|
|
|
query = select(KnowledgeBase).where(KnowledgeBase.is_deleted == 0)
|
|
|
|
|
|
if keyword:
|
|
|
@@ -237,18 +433,148 @@ class KnowledgeBaseService:
|
|
|
if status:
|
|
|
query = query.where(KnowledgeBase.status == status)
|
|
|
|
|
|
- # 计算总数
|
|
|
- count_query = select(func.count()).select_from(query.subquery())
|
|
|
- total = await db.scalar(count_query) or 0
|
|
|
+ # 获取所有符合条件的记录(不再分页查询,为了手动合并)
|
|
|
+ # 注意:如果数据量大,这会有性能问题。但目前是内部系统。
|
|
|
+ # 为了分页准确,我们必须在内存中合并后再分页。
|
|
|
+
|
|
|
+ result = await db.execute(query.order_by(KnowledgeBase.created_time.desc()))
|
|
|
+ all_items = result.scalars().all()
|
|
|
+
|
|
|
+ # 内存合并逻辑
|
|
|
+ merged_items = []
|
|
|
+ # 用字典暂存: base_name -> {child: kb, parent: kb}
|
|
|
+ kb_groups = {}
|
|
|
+
|
|
|
+ for item in all_items:
|
|
|
+ # 尝试解析名字
|
|
|
+ base_name = item.name
|
|
|
+ is_parent = False
|
|
|
+ is_child = False
|
|
|
+
|
|
|
+ if item.name.endswith(" (父)"):
|
|
|
+ base_name = item.name[:-4]
|
|
|
+ is_parent = True
|
|
|
+ elif item.name.endswith(" (子)"):
|
|
|
+ base_name = item.name[:-4]
|
|
|
+ is_child = True
|
|
|
+
|
|
|
+ # 如果不符合命名规范,就当作独立项
|
|
|
+ if not (is_parent or is_child):
|
|
|
+ # 检查是否之前自动生成的 (Name没有后缀,Collection有 _parent)
|
|
|
+ if item.collection_name.endswith("_parent"):
|
|
|
+ # 尝试找对应的主集合
|
|
|
+ guess_child_coll = item.collection_name[:-7]
|
|
|
+ # 这里比较麻烦,因为我们是按 Name 分组。
|
|
|
+ # 简单起见,如果 Name 不匹配模式,直接显示。
|
|
|
+ merged_items.append(item)
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ # 可能是旧数据,或者单集合
|
|
|
+ # 也有可能是旧逻辑生成的子集合(Name没后缀)
|
|
|
+ # 尝试找它的 Parent (collection_name + _parent)
|
|
|
+ # 这个逻辑太复杂了。
|
|
|
+
|
|
|
+ # 让我们只针对新逻辑(Name带后缀)做合并。
|
|
|
+ # 对旧逻辑(自动 _parent),我们沿用之前的“过滤 _parent”逻辑?
|
|
|
+ # 不,用户说“依旧还是分为...”,说明他可能习惯旧的操作方式?
|
|
|
+ # 不,之前的操作方式是只输一个名。
|
|
|
+
|
|
|
+ # 让我们统一逻辑:
|
|
|
+ # 以前端传来的 Name 模式 "XXX (父)" / "XXX (子)" 为主键进行合并。
|
|
|
+ merged_items.append(item)
|
|
|
+ continue
|
|
|
|
|
|
- # 分页查询
|
|
|
- query = query.order_by(KnowledgeBase.created_time.desc()).offset((page - 1) * page_size).limit(page_size)
|
|
|
- result = await db.execute(query)
|
|
|
- items = result.scalars().all()
|
|
|
+ if base_name not in kb_groups:
|
|
|
+ kb_groups[base_name] = {"child": None, "parent": None, "others": []}
|
|
|
+
|
|
|
+ if is_child:
|
|
|
+ kb_groups[base_name]["child"] = item
|
|
|
+ elif is_parent:
|
|
|
+ kb_groups[base_name]["parent"] = item
|
|
|
+
|
|
|
+ # 生成合并后的列表
|
|
|
+ final_list = []
|
|
|
+
|
|
|
+ # 处理分组
|
|
|
+ for base_name, group in kb_groups.items():
|
|
|
+ child = group["child"]
|
|
|
+ parent = group["parent"]
|
|
|
+
|
|
|
+ if child and parent:
|
|
|
+ # 合并显示,使用 Child 的记录,但修改 collection_name
|
|
|
+ child.collection_name = f"{child.collection_name}\n{parent.collection_name}"
|
|
|
+ # 修改 Name 去掉后缀
|
|
|
+ child.name = base_name
|
|
|
+ final_list.append(child)
|
|
|
+ elif child:
|
|
|
+ final_list.append(child)
|
|
|
+ elif parent:
|
|
|
+ # 只有父没有子?显示父
|
|
|
+ final_list.append(parent)
|
|
|
+
|
|
|
+ # 把那些不符合命名规范的(在循环中直接 append 的)也加进来
|
|
|
+ # 等等,上面的循环逻辑有漏洞,merged_items 和 kb_groups 是分开的。
|
|
|
+
|
|
|
+ # 重写合并逻辑:
|
|
|
+ # 1. 遍历所有 items
|
|
|
+ # 2. 如果是 "(子)",放入 map 等待 "(父)"
|
|
|
+ # 3. 如果是 "(父)",放入 map 等待 "(子)"
|
|
|
+ # 4. 如果都不是,直接放入 final_list
|
|
|
+
|
|
|
+ # 修正:
|
|
|
+ # 我们不能简单地把 Parent 藏起来。
|
|
|
+ # 如果是 create 出来的,有两个记录。
|
|
|
+ # 我们希望在列表中只显示一行。
|
|
|
+
|
|
|
+ processed_ids = set()
|
|
|
+ final_list = []
|
|
|
+
|
|
|
+ # 建立 lookup map
|
|
|
+ name_map = {kb.name: kb for kb in all_items}
|
|
|
+
|
|
|
+ for item in all_items:
|
|
|
+ if item.id in processed_ids:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if item.name.endswith(" (子)"):
|
|
|
+ base_name = item.name[:-4]
|
|
|
+ parent_name = f"{base_name} (父)"
|
|
|
+
|
|
|
+ if parent_name in name_map:
|
|
|
+ parent_kb = name_map[parent_name]
|
|
|
+ # 找到了一对!
|
|
|
+ # 合并信息到 item (子)
|
|
|
+ item.name = base_name # 展示时去掉后缀
|
|
|
+ item.collection_name = f"{item.collection_name}\n{parent_kb.collection_name}"
|
|
|
+
|
|
|
+ final_list.append(item)
|
|
|
+ processed_ids.add(item.id)
|
|
|
+ processed_ids.add(parent_kb.id)
|
|
|
+ continue
|
|
|
+
|
|
|
+ if item.name.endswith(" (父)"):
|
|
|
+ # 如果是未匹配的父集合,隐藏不显示,以免重复
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 普通项,单独显示
|
|
|
+ final_list.append(item)
|
|
|
+ processed_ids.add(item.id)
|
|
|
+
|
|
|
+ # 重新计算分页
|
|
|
+ total = len(final_list)
|
|
|
+ start = (page - 1) * page_size
|
|
|
+ end = start + page_size
|
|
|
+ items = final_list[start:end]
|
|
|
|
|
|
- # 设置 is_synced 属性 (非数据库字段,用于前端展示)
|
|
|
+ # 设置 is_synced (这里 collection_name 已经被改了,包含换行)
|
|
|
for item in items:
|
|
|
- item.is_synced = item.collection_name in milvus_names
|
|
|
+ # 简单的 split
|
|
|
+ names = item.collection_name.split('\n')
|
|
|
+ # 只要有一个 sync 了就算 sync?或者都 sync?
|
|
|
+ # 只要主集合 sync 了就行
|
|
|
+ main_coll = names[0]
|
|
|
+ item.is_synced = main_coll in milvus_names
|
|
|
+ # 注意:这里 item.collection_name 已经包含了 parent,前端会展示
|
|
|
|
|
|
total_pages = ceil(total / page_size) if page_size else 0
|
|
|
|
|
|
@@ -293,7 +619,7 @@ class KnowledgeBaseService:
|
|
|
# 3. 创建 Milvus 集合 (延迟到点击同步按钮时创建)
|
|
|
# milvus_service.create_collection(...)
|
|
|
|
|
|
- # 4. 创建 DB 记录
|
|
|
+ # 4. 创建 DB 记录 (主集合)
|
|
|
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
new_kb = KnowledgeBase(
|
|
|
id=str(uuid.uuid4()),
|
|
|
@@ -308,6 +634,8 @@ class KnowledgeBaseService:
|
|
|
)
|
|
|
db.add(new_kb)
|
|
|
|
|
|
+ # [Modified] 移除自动创建 Parent 集合记录,因为前端现在会明确创建两次
|
|
|
+
|
|
|
# 5. 保存元数据定义 (如果有)
|
|
|
if payload.metadata_fields:
|
|
|
for field in payload.metadata_fields:
|
|
|
@@ -320,6 +648,9 @@ class KnowledgeBaseService:
|
|
|
remark=field.remark
|
|
|
)
|
|
|
db.add(new_metadata)
|
|
|
+
|
|
|
+ # [Modified] 移除自动为 Parent 添加元数据
|
|
|
+
|
|
|
|
|
|
# 6. 保存自定义Schema定义 (如果有) - 已废弃,使用固定Schema
|
|
|
# if payload.custom_schemas:
|
|
|
@@ -342,23 +673,64 @@ class KnowledgeBaseService:
|
|
|
if not kb:
|
|
|
raise ValueError("知识库不存在")
|
|
|
|
|
|
+ # [Fix] 如果是合并显示的子知识库,需要同时更新对应的父知识库
|
|
|
+ # 查找对应的父知识库
|
|
|
+ parent_kb = None
|
|
|
+ if kb.name.endswith(" (子)"):
|
|
|
+ base_name = kb.name[:-4]
|
|
|
+ parent_name = f"{base_name} (父)"
|
|
|
+ parent_res = await db.execute(select(KnowledgeBase).where(
|
|
|
+ KnowledgeBase.name == parent_name,
|
|
|
+ KnowledgeBase.is_deleted == 0
|
|
|
+ ))
|
|
|
+ parent_kb = parent_res.scalars().first()
|
|
|
+
|
|
|
try:
|
|
|
if payload.name is not None:
|
|
|
- kb.name = payload.name
|
|
|
+ # 如果修改了名称,需要保持 (子)/(父) 后缀
|
|
|
+ # 注意:前端传来的 name 可能是不带后缀的 base_name (因为列表页去掉了后缀)
|
|
|
+ # 这里的 payload.name 到底带不带后缀?
|
|
|
+ # 如果是前端表单直接提交,通常是用户输入的 base_name。
|
|
|
+ # 我们假设 payload.name 是新的 base_name。
|
|
|
+
|
|
|
+ # 如果原名带后缀,则加上后缀
|
|
|
+ if kb.name.endswith(" (子)"):
|
|
|
+ kb.name = f"{payload.name} (子)"
|
|
|
+ if parent_kb:
|
|
|
+ parent_kb.name = f"{payload.name} (父)"
|
|
|
+ elif kb.name.endswith(" (父)"):
|
|
|
+ # 理论上只编辑子集合,但防御性编程
|
|
|
+ kb.name = f"{payload.name} (父)"
|
|
|
+ else:
|
|
|
+ # 普通集合
|
|
|
+ kb.name = payload.name
|
|
|
+
|
|
|
if payload.description is not None:
|
|
|
kb.description = payload.description
|
|
|
- # 同步更新 Milvus 描述
|
|
|
- milvus_service.update_collection_description(kb.collection_name, payload.description)
|
|
|
+ # 同步更新 Milvus 描述 (如果 Milvus 中存在该集合)
|
|
|
+ if milvus_service.has_collection(kb.collection_name):
|
|
|
+ milvus_service.update_collection_description(kb.collection_name, payload.description)
|
|
|
+
|
|
|
+ if parent_kb:
|
|
|
+ parent_kb.description = payload.description
|
|
|
+ if milvus_service.has_collection(parent_kb.collection_name):
|
|
|
+ milvus_service.update_collection_description(parent_kb.collection_name, payload.description)
|
|
|
+
|
|
|
if payload.status is not None:
|
|
|
kb.status = payload.status
|
|
|
+ if parent_kb:
|
|
|
+ parent_kb.status = payload.status
|
|
|
|
|
|
# 更新元数据字段 (Metadata Fields)
|
|
|
if payload.metadata_fields is not None:
|
|
|
# 1. 删除旧的元数据字段
|
|
|
await db.execute(sql_delete(SampleMetadata).where(SampleMetadata.knowledge_base_id == id))
|
|
|
+ if parent_kb:
|
|
|
+ await db.execute(sql_delete(SampleMetadata).where(SampleMetadata.knowledge_base_id == parent_kb.id))
|
|
|
|
|
|
# 2. 插入新的元数据字段
|
|
|
for field in payload.metadata_fields:
|
|
|
+ # 子集合元数据
|
|
|
new_metadata = SampleMetadata(
|
|
|
id=str(uuid.uuid4()),
|
|
|
knowledge_base_id=kb.id,
|
|
|
@@ -368,11 +740,29 @@ class KnowledgeBaseService:
|
|
|
remark=field.remark
|
|
|
)
|
|
|
db.add(new_metadata)
|
|
|
+
|
|
|
+ # 父集合元数据
|
|
|
+ if parent_kb:
|
|
|
+ new_metadata_parent = SampleMetadata(
|
|
|
+ id=str(uuid.uuid4()),
|
|
|
+ knowledge_base_id=parent_kb.id,
|
|
|
+ field_zh_name=field.field_zh_name,
|
|
|
+ field_en_name=field.field_en_name,
|
|
|
+ field_type=field.field_type,
|
|
|
+ remark=field.remark
|
|
|
+ )
|
|
|
+ db.add(new_metadata_parent)
|
|
|
|
|
|
kb.updated_by = "admin" # 暂时硬编码
|
|
|
kb.updated_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 使用 updated_time 而不是 created_time
|
|
|
+ if parent_kb:
|
|
|
+ parent_kb.updated_by = "admin"
|
|
|
+ parent_kb.updated_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+
|
|
|
await db.commit()
|
|
|
await db.refresh(kb)
|
|
|
+ if parent_kb:
|
|
|
+ await db.refresh(parent_kb)
|
|
|
|
|
|
return kb
|
|
|
except Exception as e:
|
|
|
@@ -432,7 +822,6 @@ class KnowledgeBaseService:
|
|
|
if milvus_service.has_collection(kb.collection_name):
|
|
|
milvus_service.drop_collection(kb.collection_name)
|
|
|
except Exception as milvus_err:
|
|
|
- # 如果是命名不规范等导致的错误,忽略它,继续删除数据库记录
|
|
|
print(f"Ignore Milvus error during delete: {milvus_err}")
|
|
|
|
|
|
# 2. 软删除 DB 记录
|
|
|
@@ -445,6 +834,31 @@ class KnowledgeBaseService:
|
|
|
# 4. 删除关联的自定义Schema (硬删除)
|
|
|
await db.execute(sql_delete(CustomSchema).where(CustomSchema.knowledge_base_id == id))
|
|
|
|
|
|
+ # [Modified] 级联删除 Parent 集合 (如果存在)
|
|
|
+ if not kb.collection_name.endswith("_parent"):
|
|
|
+ parent_collection_name = f"{kb.collection_name}_parent"
|
|
|
+ parent_kb_res = await db.execute(select(KnowledgeBase).where(
|
|
|
+ KnowledgeBase.collection_name == parent_collection_name
|
|
|
+ )) # 不限制 is_deleted,即使已软删除也可能需要清理
|
|
|
+ parent_kb = parent_kb_res.scalars().first()
|
|
|
+
|
|
|
+ if parent_kb:
|
|
|
+ # 递归调用自身删除 Parent KB (注意:如果 Parent 也有 count > 0 可能会抛出异常,这里假设应该一起清空)
|
|
|
+ # 为了避免循环调用和复杂性,这里直接执行删除逻辑
|
|
|
+
|
|
|
+ # 1. 删除 Milvus
|
|
|
+ try:
|
|
|
+ if milvus_service.has_collection(parent_collection_name):
|
|
|
+ milvus_service.drop_collection(parent_collection_name)
|
|
|
+ except: pass
|
|
|
+
|
|
|
+ # 2. 软删除 DB
|
|
|
+ parent_kb.is_deleted = 1
|
|
|
+ parent_kb.updated_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+
|
|
|
+ # 3. 删除关联元数据
|
|
|
+ await db.execute(sql_delete(SampleMetadata).where(SampleMetadata.knowledge_base_id == parent_kb.id))
|
|
|
+
|
|
|
await db.commit()
|
|
|
except Exception as e:
|
|
|
await db.rollback()
|
|
|
@@ -670,8 +1084,11 @@ class KnowledgeBaseService:
|
|
|
|
|
|
return [f.to_dict() for f in fields]
|
|
|
|
|
|
- async def update_doc_count(self, db: AsyncSession, collection_name: str) -> None:
|
|
|
- """根据 Milvus 实时数据更新知识库文档数量"""
|
|
|
+ async def update_doc_count(self, db: AsyncSession, collection_name: str) -> int:
|
|
|
+ """根据 Milvus 实时数据更新知识库文档数量
|
|
|
+
|
|
|
+ 返回最新的行计数(int),调用方可以使用该返回值立即更新前端显示。
|
|
|
+ """
|
|
|
# 查找知识库
|
|
|
result = await db.execute(select(KnowledgeBase).where(
|
|
|
KnowledgeBase.collection_name == collection_name,
|
|
|
@@ -681,15 +1098,26 @@ class KnowledgeBaseService:
|
|
|
|
|
|
if kb and milvus_service.has_collection(collection_name):
|
|
|
try:
|
|
|
+ # 确保集合已加载以获取准确计数 (query 需要 Loaded 状态)
|
|
|
+ state = milvus_service.get_collection_state(collection_name)
|
|
|
+ if state != "Loaded":
|
|
|
+ print(f"Collection {collection_name} is {state}, loading...")
|
|
|
+ milvus_service.set_collection_state(collection_name, "load")
|
|
|
+
|
|
|
# 使用统一的计数方法
|
|
|
row_count = await self._get_collection_row_count(collection_name)
|
|
|
|
|
|
# 更新数据库
|
|
|
if kb.document_count != row_count:
|
|
|
+ print(f"Updating doc count for {collection_name}: {kb.document_count} -> {row_count}")
|
|
|
kb.document_count = row_count
|
|
|
kb.updated_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
await db.commit()
|
|
|
+ return row_count
|
|
|
except Exception as e:
|
|
|
print(f"Failed to update doc count for {collection_name}: {e}")
|
|
|
+ return 0
|
|
|
+ # 如果没有对应的 KB 或者 Milvus 无此集合,返回 0 以便调用方处理
|
|
|
+ return 0
|
|
|
|
|
|
knowledge_base_service = KnowledgeBaseService()
|