| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- """
- 知识库视图路由
- """
- from fastapi import APIRouter, Depends, Query, Path
- from sqlalchemy.ext.asyncio import AsyncSession
- from typing import Optional
- from app.base.async_mysql_connection import get_db
- from app.services.knowledge_base_service import knowledge_base_service
- from app.sample.schemas.knowledge_base import (
- KnowledgeBaseCreate,
- KnowledgeBaseUpdate,
- KnowledgeBaseResponse
- )
- from app.schemas.base import PaginatedResponseSchema, ResponseSchema
- from app.services.jwt_token import verify_token
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
- router = APIRouter(prefix="/sample/knowledge-base", tags=["样本中心-知识库"])
- security = HTTPBearer()
- @router.get("", response_model=PaginatedResponseSchema)
- async def get_knowledge_bases(
- page: int = Query(1, ge=1, description="页码"),
- page_size: int = Query(10, ge=1, le=1000, description="每页数量"),
- keyword: str = Query(None, description="搜索关键词"),
- status: str = Query(None, description="状态筛选"),
- db: AsyncSession = Depends(get_db),
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """获取知识库列表"""
- # 鉴权
- payload = verify_token(credentials.credentials)
- if not payload:
- return PaginatedResponseSchema(code=401, message="无效的访问令牌", data=[], meta=None)
- items, meta = await knowledge_base_service.get_list(
- db, page=page, page_size=page_size, keyword=keyword, status=status
- )
- return PaginatedResponseSchema(
- code=0,
- message="获取知识库列表成功",
- data=[KnowledgeBaseResponse.model_validate(item) for item in items],
- meta=meta,
- )
- @router.get("/list", response_model=ResponseSchema)
- async def get_knowledge_base_simple_list(
- db: AsyncSession = Depends(get_db),
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """获取知识库简单列表 (用于下拉选择)"""
- payload = verify_token(credentials.credentials)
- if not payload:
- return ResponseSchema(code=401, message="无效的访问令牌")
- # 只获取状态正常的知识库
- items, _ = await knowledge_base_service.get_list(db, page=1, page_size=1000, status="normal")
- return ResponseSchema(
- code=0,
- message="获取成功",
- data=[{"id": item.id, "name": item.name, "collection_name": item.collection_name_children or item.collection_name_parent} for item in items]
- )
- @router.post("", response_model=ResponseSchema)
- async def create_knowledge_base(
- payload: KnowledgeBaseCreate,
- db: AsyncSession = Depends(get_db),
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """创建新知识库"""
- payload_token = verify_token(credentials.credentials)
- if not payload_token:
- return ResponseSchema(code=401, message="无效的访问令牌")
- try:
- new_kb = await knowledge_base_service.create(db, payload)
- return ResponseSchema(code=0, message="创建成功", data=KnowledgeBaseResponse.model_validate(new_kb))
- except ValueError as e:
- return ResponseSchema(code=400, message=str(e))
- except Exception:
- return ResponseSchema(code=500, message="创建失败")
- @router.post("/{id}", response_model=ResponseSchema)
- async def update_knowledge_base(
- payload: KnowledgeBaseUpdate,
- id: str = Path(..., description="知识库ID"),
- db: AsyncSession = Depends(get_db),
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """更新知识库信息"""
- payload_token = verify_token(credentials.credentials)
- if not payload_token:
- return ResponseSchema(code=401, message="无效的访问令牌")
- try:
- kb = await knowledge_base_service.update(db, id, payload)
- return ResponseSchema(code=0, message="更新成功", data=KnowledgeBaseResponse.model_validate(kb))
- except ValueError as e:
- return ResponseSchema(code=400, message=str(e))
- except Exception:
- return ResponseSchema(code=500, message="更新失败")
- @router.post("/{id}/status", response_model=ResponseSchema)
- async def update_knowledge_base_status(
- id: str = Path(..., description="知识库ID"),
- status: str = Query(..., description="状态: normal/test/disabled"),
- db: AsyncSession = Depends(get_db),
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """更新知识库状态"""
- payload_token = verify_token(credentials.credentials)
- if not payload_token:
- return ResponseSchema(code=401, message="无效的访问令牌")
- await knowledge_base_service.update_status(db, id, status)
- return ResponseSchema(code=0, message=f"状态已更新为 {status}")
- @router.post("/{id}/delete", response_model=ResponseSchema)
- async def delete_knowledge_base(
- id: str = Path(..., description="知识库ID"),
- db: AsyncSession = Depends(get_db),
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """删除知识库"""
- payload_token = verify_token(credentials.credentials)
- if not payload_token:
- return ResponseSchema(code=401, message="无效的访问令牌")
- await knowledge_base_service.delete(db, id)
- return ResponseSchema(code=0, message="删除成功")
- @router.get("/{id}/metadata", response_model=ResponseSchema)
- async def get_knowledge_base_metadata(
- id: str = Path(..., description="知识库ID"),
- db: AsyncSession = Depends(get_db),
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """获取知识库的元数据字段定义和自定义Schema"""
- payload_token = verify_token(credentials.credentials)
- if not payload_token:
- return ResponseSchema(code=401, message="无效的访问令牌")
- try:
- data = await knowledge_base_service.get_metadata_and_schema(db, id)
- return ResponseSchema(code=0, message="获取成功", data=data)
- except ValueError as e:
- return ResponseSchema(code=400, message=str(e))
- except Exception as e:
- return ResponseSchema(code=500, message=f"获取失败: {str(e)}")
- @router.post("/{id}/sync", response_model=ResponseSchema)
- async def sync_knowledge_base(
- id: str = Path(..., description="知识库ID"),
- db: AsyncSession = Depends(get_db),
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """同步创建Milvus集合"""
- payload_token = verify_token(credentials.credentials)
- if not payload_token:
- return ResponseSchema(code=401, message="无效的访问令牌")
- try:
- await knowledge_base_service.sync_to_milvus(db, id)
- return ResponseSchema(code=0, message="同步成功")
- except ValueError as e:
- return ResponseSchema(code=400, message=str(e))
- except Exception as e:
- return ResponseSchema(code=500, message=f"同步失败: {str(e)}")
|