|
|
@@ -21,6 +21,99 @@ from app.schemas.base import PaginationSchema
|
|
|
|
|
|
class KnowledgeBaseService:
|
|
|
|
|
|
+ async def _get_collection_row_count(self, collection_name: str) -> int:
|
|
|
+ """获取集合行数(优先尝试 count(*) 以获取实时准确值)"""
|
|
|
+ try:
|
|
|
+ # 尝试使用 count(*) 获取准确的实时数量
|
|
|
+ # 过滤掉已标记删除的数据 (is_deleted == false)
|
|
|
+ # 注意:如果 Schema 中没有 is_deleted 字段,这里可能会报错,需要根据实际 Schema 调整
|
|
|
+ # 但之前的代码中 Schema 确实包含了 is_deleted
|
|
|
+ try:
|
|
|
+ # 检查集合是否已加载
|
|
|
+ if milvus_service.get_collection_state(collection_name) == "Loaded":
|
|
|
+ res = milvus_service.client.query(collection_name, filter="is_deleted == false", output_fields=["count(*)"])
|
|
|
+ if res and isinstance(res, list) and "count(*)" in res[0]:
|
|
|
+ return int(res[0]["count(*)"])
|
|
|
+ except Exception:
|
|
|
+ # 再次尝试不过滤
|
|
|
+ if milvus_service.get_collection_state(collection_name) == "Loaded":
|
|
|
+ res = milvus_service.client.query(collection_name, filter="", output_fields=["count(*)"])
|
|
|
+ if res and isinstance(res, list) and "count(*)" in res[0]:
|
|
|
+ return int(res[0]["count(*)"])
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ # Fallback: 使用 get_collection_stats (可能包含已删除未 Compaction 的数据)
|
|
|
+ try:
|
|
|
+ stats = milvus_service.client.get_collection_stats(collection_name)
|
|
|
+ return int(stats.get("row_count", 0))
|
|
|
+ except Exception:
|
|
|
+ return 0
|
|
|
+
|
|
|
+ async def _infer_and_save_metadata(self, db: AsyncSession, kb: KnowledgeBase) -> None:
|
|
|
+ """
|
|
|
+ [Internal] 从 Milvus 数据中推断元数据并保存到 DB
|
|
|
+ 仅当 DB 中没有定义元数据时调用
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 检查是否已加载(避免不必要的错误)
|
|
|
+ # if milvus_service.get_collection_state(kb.collection_name) != "Loaded":
|
|
|
+ # milvus_service.load_collection(kb.collection_name)
|
|
|
+
|
|
|
+ # 采样查询 (获取前10条)
|
|
|
+ try:
|
|
|
+ res = milvus_service.client.query(
|
|
|
+ collection_name=kb.collection_name,
|
|
|
+ filter="is_deleted == false",
|
|
|
+ output_fields=["metadata"],
|
|
|
+ limit=10
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ # 如果 filter 查询失败(可能不支持 is_deleted),尝试无 filter 查询
|
|
|
+ res = milvus_service.client.query(
|
|
|
+ collection_name=kb.collection_name,
|
|
|
+ filter="",
|
|
|
+ output_fields=["metadata"],
|
|
|
+ limit=10
|
|
|
+ )
|
|
|
+
|
|
|
+ if res:
|
|
|
+ inferred_keys = set()
|
|
|
+ for item in res:
|
|
|
+ meta = item.get("metadata") or {}
|
|
|
+ # Milvus 可能会返回 JSON 字符串
|
|
|
+ if isinstance(meta, str):
|
|
|
+ try:
|
|
|
+ import json
|
|
|
+ meta = json.loads(meta)
|
|
|
+ except:
|
|
|
+ meta = {}
|
|
|
+
|
|
|
+ if isinstance(meta, dict):
|
|
|
+ inferred_keys.update(meta.keys())
|
|
|
+
|
|
|
+ # 过滤掉默认字段
|
|
|
+ ignore_keys = {"doc_name", "file_name", "title", "source", "chunk_id"}
|
|
|
+ inferred_keys = inferred_keys - ignore_keys
|
|
|
+
|
|
|
+ if inferred_keys:
|
|
|
+ # 自动生成并保存到 DB
|
|
|
+ for key in inferred_keys:
|
|
|
+ new_metadata = SampleMetadata(
|
|
|
+ id=str(uuid.uuid4()),
|
|
|
+ knowledge_base_id=kb.id,
|
|
|
+ field_zh_name=key, # 默认用英文名
|
|
|
+ field_en_name=key,
|
|
|
+ field_type="text", # 默认推断为 text
|
|
|
+ remark="Auto inferred from Milvus data"
|
|
|
+ )
|
|
|
+ db.add(new_metadata)
|
|
|
+
|
|
|
+ # 注意:调用方负责 commit,这里不 commit 以支持批量事务
|
|
|
+ print(f"Auto inferred metadata for {kb.collection_name}: {inferred_keys}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Failed to infer metadata for {kb.collection_name}: {e}")
|
|
|
+
|
|
|
async def get_list(
|
|
|
self,
|
|
|
db: AsyncSession,
|
|
|
@@ -45,11 +138,7 @@ class KnowledgeBaseService:
|
|
|
has_changes = False
|
|
|
for m_name in milvus_names:
|
|
|
# 获取统计信息
|
|
|
- try:
|
|
|
- stats = milvus_service.client.get_collection_stats(m_name)
|
|
|
- row_count = int(stats.get("row_count", 0))
|
|
|
- except Exception:
|
|
|
- row_count = 0
|
|
|
+ row_count = await self._get_collection_row_count(m_name)
|
|
|
|
|
|
if m_name not in existing_map:
|
|
|
# 新增
|
|
|
@@ -64,6 +153,11 @@ class KnowledgeBaseService:
|
|
|
updated_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
)
|
|
|
db.add(new_kb)
|
|
|
+
|
|
|
+ # [新增逻辑] 对新发现的知识库,立即尝试推断元数据
|
|
|
+ if row_count > 0:
|
|
|
+ await self._infer_and_save_metadata(db, new_kb)
|
|
|
+
|
|
|
has_changes = True
|
|
|
else:
|
|
|
# 更新统计
|
|
|
@@ -72,10 +166,61 @@ class KnowledgeBaseService:
|
|
|
kb.document_count = row_count
|
|
|
# kb.created_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 统计更新不一定更新时间
|
|
|
has_changes = True
|
|
|
+
|
|
|
+ # [新增逻辑] 如果已有知识库有数据但没有元数据定义,尝试推断 (Lazy check)
|
|
|
+ # 为了性能,这里不每次都查元数据表。只有当 row_count > 0 时才考虑。
|
|
|
+ # 且为了避免每次都 query Milvus,我们可以假设如果 DB 中没有 metadata 记录才去推断。
|
|
|
+ # 但在循环中查 DB (select count) 也是性能损耗。
|
|
|
+ # 既然用户现在的痛点是“必须点击查看”,我们可以放宽一点:
|
|
|
+ # 仅在新发现时推断,或者增加一个明确的“同步所有”按钮。
|
|
|
+ # 或者,这里简单做个优化:如果 row_count > 0,我们尝试去 infer。
|
|
|
+ # _infer_and_save_metadata 内部不包含 "检查DB是否有元数据" 的逻辑,需要补充。
|
|
|
|
|
|
if has_changes:
|
|
|
await db.commit()
|
|
|
|
|
|
+ # [补充逻辑] 对所有现有 KB,如果 document_count > 0,检查是否需要推断
|
|
|
+ # 为了不影响性能,这里只对本次循环中更新了 count 的,或者...
|
|
|
+ # 还是保持现状:只对新发现的做推断。对于老的,用户可能需要手动触发或者我们做一个批量接口。
|
|
|
+ # 但用户说“如果不点击查看就没有办法自动推断”,暗示他希望列表页能解决。
|
|
|
+ # 我们可以查一次所有 KB 的 ID,再查 SampleMetadata 表中哪些 KB ID 已经有数据了。
|
|
|
+
|
|
|
+ # 1. 获取所有有数据的 KB ID
|
|
|
+ # active_kb_ids = [kb.id for kb in existing_kbs if kb.document_count > 0]
|
|
|
+ # if active_kb_ids:
|
|
|
+ # # 2. 查哪些已经有元数据
|
|
|
+ # meta_res = await db.execute(select(SampleMetadata.knowledge_base_id).where(SampleMetadata.knowledge_base_id.in_(active_kb_ids)).distinct())
|
|
|
+ # has_meta_ids = set(meta_res.scalars().all())
|
|
|
+ #
|
|
|
+ # # 3. 找出需要推断的
|
|
|
+ # need_infer_kbs = [kb for kb in existing_kbs if kb.id in active_kb_ids and kb.id not in has_meta_ids]
|
|
|
+ #
|
|
|
+ # for kb in need_infer_kbs:
|
|
|
+ # await self._infer_and_save_metadata(db, kb)
|
|
|
+ # has_changes = True
|
|
|
+
|
|
|
+ # 上述逻辑比较完善,加入代码中:
|
|
|
+
|
|
|
+ # 获取所有当前存在的 KB (包括刚新增的,如果 session 未 commit 可能查不到 ID,所以最好在 commit 后再做,或者分两步)
|
|
|
+ # 简单起见,我们把上面的 commit 放在这之前是不行的,因为 new_kb 还没 ID (uuid 是手动生成的,其实有 ID)。
|
|
|
+
|
|
|
+ # 优化:只对 existing_kbs 做检查。new_kb 已经在上面处理了。
|
|
|
+ if existing_kbs:
|
|
|
+ active_kbs = [kb for kb in existing_kbs if kb.document_count > 0]
|
|
|
+ if active_kbs:
|
|
|
+ active_ids = [kb.id for kb in active_kbs]
|
|
|
+ # 批量查询已存在元数据的 KB ID
|
|
|
+ meta_res = await db.execute(select(SampleMetadata.knowledge_base_id).where(SampleMetadata.knowledge_base_id.in_(active_ids)).distinct())
|
|
|
+ has_meta_ids = set(meta_res.scalars().all())
|
|
|
+
|
|
|
+ for kb in active_kbs:
|
|
|
+ if kb.id not in has_meta_ids:
|
|
|
+ await self._infer_and_save_metadata(db, kb)
|
|
|
+ has_changes = True
|
|
|
+
|
|
|
+ if has_changes:
|
|
|
+ await db.commit()
|
|
|
+
|
|
|
except Exception as e:
|
|
|
# 同步失败不影响查询,只打印日志
|
|
|
print(f"Sync Milvus collections failed: {e}")
|
|
|
@@ -272,8 +417,8 @@ class KnowledgeBaseService:
|
|
|
current_count = kb.document_count
|
|
|
if kb.collection_name and milvus_service.has_collection(kb.collection_name):
|
|
|
try:
|
|
|
- stats = milvus_service.client.get_collection_stats(kb.collection_name)
|
|
|
- current_count = int(stats.get("row_count", 0))
|
|
|
+ # 使用统一的计数方法
|
|
|
+ current_count = await self._get_collection_row_count(kb.collection_name)
|
|
|
except Exception:
|
|
|
# 获取失败则使用 DB 中的缓存值
|
|
|
pass
|
|
|
@@ -292,7 +437,7 @@ class KnowledgeBaseService:
|
|
|
|
|
|
# 2. 软删除 DB 记录
|
|
|
kb.is_deleted = 1
|
|
|
- kb.created_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+ kb.updated_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
# 3. 删除关联的元数据 (硬删除)
|
|
|
await db.execute(sql_delete(SampleMetadata).where(SampleMetadata.knowledge_base_id == id))
|
|
|
@@ -431,7 +576,7 @@ class KnowledgeBaseService:
|
|
|
raise e
|
|
|
|
|
|
async def get_metadata_and_schema(self, db: AsyncSession, kb_id: str) -> Dict[str, List[dict]]:
|
|
|
- """获取知识库的元数据字段列表 (Schema已固定,不再返回自定义Schema)"""
|
|
|
+ """获取知识库的元数据字段列表 (如果 DB 中没有定义,尝试从 Milvus 数据推断)"""
|
|
|
# 检查知识库是否存在
|
|
|
result = await db.execute(select(KnowledgeBase).where(KnowledgeBase.id == kb_id, KnowledgeBase.is_deleted == 0))
|
|
|
kb = result.scalars().first()
|
|
|
@@ -443,6 +588,67 @@ class KnowledgeBaseService:
|
|
|
meta_result = await db.execute(meta_query)
|
|
|
metadata_fields = [f.to_dict() for f in meta_result.scalars().all()]
|
|
|
|
|
|
+ # 自动推断逻辑:如果 DB 中没有定义元数据,且 Milvus 中有数据,尝试推断
|
|
|
+ if not metadata_fields and kb.collection_name and milvus_service.has_collection(kb.collection_name):
|
|
|
+ try:
|
|
|
+ # 采样查询 (获取前10条)
|
|
|
+ try:
|
|
|
+ res = milvus_service.client.query(
|
|
|
+ collection_name=kb.collection_name,
|
|
|
+ filter="is_deleted == false",
|
|
|
+ output_fields=["metadata"],
|
|
|
+ limit=10
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ # 如果 filter 查询失败(可能不支持 is_deleted),尝试无 filter 查询
|
|
|
+ res = milvus_service.client.query(
|
|
|
+ collection_name=kb.collection_name,
|
|
|
+ filter="",
|
|
|
+ output_fields=["metadata"],
|
|
|
+ limit=10
|
|
|
+ )
|
|
|
+
|
|
|
+ if res:
|
|
|
+ inferred_keys = set()
|
|
|
+ for item in res:
|
|
|
+ meta = item.get("metadata") or {}
|
|
|
+ # Milvus 可能会返回 JSON 字符串,尝试解析
|
|
|
+ if isinstance(meta, str):
|
|
|
+ try:
|
|
|
+ import json
|
|
|
+ meta = json.loads(meta)
|
|
|
+ except:
|
|
|
+ meta = {}
|
|
|
+
|
|
|
+ if isinstance(meta, dict):
|
|
|
+ inferred_keys.update(meta.keys())
|
|
|
+
|
|
|
+ # 过滤掉一些默认字段,避免干扰
|
|
|
+ ignore_keys = {"doc_name", "file_name", "title", "source", "chunk_id"}
|
|
|
+ inferred_keys = inferred_keys - ignore_keys
|
|
|
+
|
|
|
+ if inferred_keys:
|
|
|
+ # 自动生成并保存到 DB
|
|
|
+ new_fields = []
|
|
|
+ for key in inferred_keys:
|
|
|
+ new_metadata = SampleMetadata(
|
|
|
+ id=str(uuid.uuid4()),
|
|
|
+ knowledge_base_id=kb.id,
|
|
|
+ field_zh_name=key, # 默认用英文名
|
|
|
+ field_en_name=key,
|
|
|
+ field_type="text", # 默认推断为 text
|
|
|
+ remark="Auto inferred from Milvus data"
|
|
|
+ )
|
|
|
+ db.add(new_metadata)
|
|
|
+ new_fields.append(new_metadata.to_dict())
|
|
|
+
|
|
|
+ await db.commit()
|
|
|
+ metadata_fields = new_fields
|
|
|
+ print(f"Auto inferred metadata for {kb.collection_name}: {inferred_keys}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Failed to infer metadata for {kb.collection_name}: {e}")
|
|
|
+ # 推断失败不影响正常返回
|
|
|
+
|
|
|
# 返回空的 custom_schemas,因为现在是固定 Schema
|
|
|
return {
|
|
|
"metadata_fields": metadata_fields,
|
|
|
@@ -464,4 +670,26 @@ class KnowledgeBaseService:
|
|
|
|
|
|
return [f.to_dict() for f in fields]
|
|
|
|
|
|
+ async def update_doc_count(self, db: AsyncSession, collection_name: str) -> None:
|
|
|
+ """根据 Milvus 实时数据更新知识库文档数量"""
|
|
|
+ # 查找知识库
|
|
|
+ result = await db.execute(select(KnowledgeBase).where(
|
|
|
+ KnowledgeBase.collection_name == collection_name,
|
|
|
+ KnowledgeBase.is_deleted == 0
|
|
|
+ ))
|
|
|
+ kb = result.scalars().first()
|
|
|
+
|
|
|
+ if kb and milvus_service.has_collection(collection_name):
|
|
|
+ try:
|
|
|
+ # 使用统一的计数方法
|
|
|
+ row_count = await self._get_collection_row_count(collection_name)
|
|
|
+
|
|
|
+ # 更新数据库
|
|
|
+ if kb.document_count != row_count:
|
|
|
+ kb.document_count = row_count
|
|
|
+ kb.updated_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+ await db.commit()
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Failed to update doc count for {collection_name}: {e}")
|
|
|
+
|
|
|
knowledge_base_service = KnowledgeBaseService()
|