""" 知识库视图路由 """ from fastapi import APIRouter, Depends, Query, Path from sqlalchemy.ext.asyncio import AsyncSession from typing import Optional from app.base.async_mysql_connection import get_db from app.services.knowledge_base_service import knowledge_base_service from app.sample.schemas.knowledge_base import ( KnowledgeBaseCreate, KnowledgeBaseUpdate, KnowledgeBaseResponse ) from app.schemas.base import PaginatedResponseSchema, ResponseSchema from app.services.jwt_token import verify_token from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials router = APIRouter(prefix="/sample/knowledge-base", tags=["样本中心-知识库"]) security = HTTPBearer() @router.get("", response_model=PaginatedResponseSchema) async def get_knowledge_bases( page: int = Query(1, ge=1, description="页码"), page_size: int = Query(10, ge=1, le=1000, description="每页数量"), keyword: str = Query(None, description="搜索关键词"), status: str = Query(None, description="状态筛选"), db: AsyncSession = Depends(get_db), credentials: HTTPAuthorizationCredentials = Depends(security) ): """获取知识库列表""" # 鉴权 payload = verify_token(credentials.credentials) if not payload: return PaginatedResponseSchema(code=401, message="无效的访问令牌", data=[], meta=None) items, meta = await knowledge_base_service.get_list( db, page=page, page_size=page_size, keyword=keyword, status=status ) return PaginatedResponseSchema( code=0, message="获取知识库列表成功", data=[KnowledgeBaseResponse.model_validate(item) for item in items], meta=meta, ) @router.get("/list", response_model=ResponseSchema) async def get_knowledge_base_simple_list( db: AsyncSession = Depends(get_db), credentials: HTTPAuthorizationCredentials = Depends(security) ): """获取知识库简单列表 (用于下拉选择)""" payload = verify_token(credentials.credentials) if not payload: return ResponseSchema(code=401, message="无效的访问令牌") # 只获取状态正常的知识库 items, _ = await knowledge_base_service.get_list(db, page=1, page_size=1000, status="normal") return ResponseSchema( code=0, message="获取成功", data=[{"id": item.id, "name": item.name, "collection_name": item.collection_name_children or item.collection_name_parent} for item in items] ) @router.post("", response_model=ResponseSchema) async def create_knowledge_base( payload: KnowledgeBaseCreate, db: AsyncSession = Depends(get_db), credentials: HTTPAuthorizationCredentials = Depends(security) ): """创建新知识库""" payload_token = verify_token(credentials.credentials) if not payload_token: return ResponseSchema(code=401, message="无效的访问令牌") try: new_kb = await knowledge_base_service.create(db, payload) return ResponseSchema(code=0, message="创建成功", data=KnowledgeBaseResponse.model_validate(new_kb)) except ValueError as e: return ResponseSchema(code=400, message=str(e)) except Exception: return ResponseSchema(code=500, message="创建失败") @router.post("/{id}", response_model=ResponseSchema) async def update_knowledge_base( payload: KnowledgeBaseUpdate, id: str = Path(..., description="知识库ID"), db: AsyncSession = Depends(get_db), credentials: HTTPAuthorizationCredentials = Depends(security) ): """更新知识库信息""" payload_token = verify_token(credentials.credentials) if not payload_token: return ResponseSchema(code=401, message="无效的访问令牌") try: kb = await knowledge_base_service.update(db, id, payload) return ResponseSchema(code=0, message="更新成功", data=KnowledgeBaseResponse.model_validate(kb)) except ValueError as e: return ResponseSchema(code=400, message=str(e)) except Exception: return ResponseSchema(code=500, message="更新失败") @router.post("/{id}/status", response_model=ResponseSchema) async def update_knowledge_base_status( id: str = Path(..., description="知识库ID"), status: str = Query(..., description="状态: normal/test/disabled"), db: AsyncSession = Depends(get_db), credentials: HTTPAuthorizationCredentials = Depends(security) ): """更新知识库状态""" payload_token = verify_token(credentials.credentials) if not payload_token: return ResponseSchema(code=401, message="无效的访问令牌") await knowledge_base_service.update_status(db, id, status) return ResponseSchema(code=0, message=f"状态已更新为 {status}") @router.post("/{id}/delete", response_model=ResponseSchema) async def delete_knowledge_base( id: str = Path(..., description="知识库ID"), db: AsyncSession = Depends(get_db), credentials: HTTPAuthorizationCredentials = Depends(security) ): """删除知识库""" payload_token = verify_token(credentials.credentials) if not payload_token: return ResponseSchema(code=401, message="无效的访问令牌") await knowledge_base_service.delete(db, id) return ResponseSchema(code=0, message="删除成功") @router.get("/{id}/metadata", response_model=ResponseSchema) async def get_knowledge_base_metadata( id: str = Path(..., description="知识库ID"), db: AsyncSession = Depends(get_db), credentials: HTTPAuthorizationCredentials = Depends(security) ): """获取知识库的元数据字段定义和自定义Schema""" payload_token = verify_token(credentials.credentials) if not payload_token: return ResponseSchema(code=401, message="无效的访问令牌") try: data = await knowledge_base_service.get_metadata_and_schema(db, id) return ResponseSchema(code=0, message="获取成功", data=data) except ValueError as e: return ResponseSchema(code=400, message=str(e)) except Exception as e: return ResponseSchema(code=500, message=f"获取失败: {str(e)}") @router.post("/{id}/sync", response_model=ResponseSchema) async def sync_knowledge_base( id: str = Path(..., description="知识库ID"), db: AsyncSession = Depends(get_db), credentials: HTTPAuthorizationCredentials = Depends(security) ): """同步创建Milvus集合""" payload_token = verify_token(credentials.credentials) if not payload_token: return ResponseSchema(code=401, message="无效的访问令牌") try: await knowledge_base_service.sync_to_milvus(db, id) return ResponseSchema(code=0, message="同步成功") except ValueError as e: return ResponseSchema(code=400, message=str(e)) except Exception as e: return ResponseSchema(code=500, message=f"同步失败: {str(e)}")