Bläddra i källkod

feat:基本完成标签管理的接口

ZengChao 1 månad sedan
förälder
incheckning
f3fb7112c4

+ 1 - 0
src/app/models/user.py

@@ -24,6 +24,7 @@ class User(BaseModel):
     last_login_ip = Column(String(45), nullable=True, comment="最后登录IP")
     failed_login_attempts = Column(Integer, default=0, comment="失败登录次数")
     locked_until = Column(DateTime, nullable=True, comment="锁定直到时间")
+    id = Column(CHAR(36), primary_key=True, comment="用户ID")
     
     # 关联关系
     profile = relationship("UserProfile", back_populates="user", uselist=False)

+ 29 - 0
src/app/sample/models/tag_category.py

@@ -0,0 +1,29 @@
+"""
+标签分类数据库模型
+"""
+from sqlalchemy import Column, BigInteger, Integer, String, SmallInteger, DateTime, Text
+from sqlalchemy.dialects.mysql import CHAR
+from datetime import datetime
+from app.base.async_mysql_connection import Base
+
+
+class TagCategory(Base):
+    """标签分类模型 - 分类树结构"""
+    __tablename__ = "tag_category"
+
+    id = Column(BigInteger, primary_key=True, autoincrement=True, comment="主键ID")
+    parent_id = Column(BigInteger, nullable=False, default=0, comment="父级分类ID(0表示根节点)")
+    name = Column(String(100), nullable=False, comment="分类名称")
+    path = Column(String(500), nullable=False, default="/", comment="分类路径(物化路径),例如 /1/3/9/")
+    level = Column(Integer, nullable=False, default=1, comment="分类层级(1=一级,2=二级,依次递增)")
+    sort_no = Column(Integer, nullable=False, default=0, comment="同级排序号(越小越靠前)")
+    status = Column(SmallInteger, nullable=False, default=1, comment="状态(1=启用,0=禁用)")
+    is_deleted = Column(SmallInteger, nullable=False, default=0, comment="是否删除(0=未删除,1=已删除)")
+    
+    created_by = Column(CHAR(100), nullable=True, comment="创建人UUID")
+    created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="创建时间")
+    updated_by = Column(CHAR(100), nullable=True, comment="修改人UUID")
+    updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment="修改时间")
+
+    def __repr__(self):
+        return f"<TagCategory {self.id}:{self.name}>"

+ 78 - 0
src/app/sample/schemas/tag_category.py

@@ -0,0 +1,78 @@
+"""
+标签分类 Schema(请求/响应模型)
+"""
+from typing import Optional, List
+from pydantic import BaseModel, Field
+from app.schemas.base import BaseModelSchema
+from datetime import datetime
+
+
+class TagCategoryBase(BaseModel):
+    """标签分类基础模型"""
+    parent_id: int = Field(0, description="父级分类ID(0表示根节点)")
+    name: str = Field(..., min_length=1, max_length=100, description="分类名称")
+    sort_no: Optional[int] = Field(0, description="同级排序号(越小越靠前)")
+    status: Optional[int] = Field(1, description="状态(1=启用,0=禁用)")
+
+
+class TagCategoryCreate(TagCategoryBase):
+    """创建标签分类请求"""
+    pass
+
+
+class TagCategoryUpdate(BaseModel):
+    """更新标签分类请求"""
+    parent_id: Optional[int] = None
+    name: Optional[str] = Field(None, min_length=1, max_length=100)
+    sort_no: Optional[int] = None
+    status: Optional[int] = None
+
+
+class TagCategoryResponse(BaseModelSchema):
+    """标签分类响应模型"""
+    id: int
+    parent_id: int
+    name: str
+    path: str
+    level: int
+    sort_no: int
+    status: int
+    is_deleted: int
+    created_by: Optional[str] = None
+    created_by_name: Optional[str] = None
+    created_at: Optional[datetime] = None
+    updated_by: Optional[str] = None
+    updated_by_name: Optional[str] = None
+    updated_at: Optional[datetime] = None
+
+    class Config:
+        from_attributes = True
+        json_encoders = {
+            datetime: lambda v: v.strftime("%Y-%m-%d %H:%M:%S") if v else None
+        }
+
+
+class TagCategoryTreeNode(BaseModel):
+    """标签分类树节点(用于树形结构返回)"""
+    id: int
+    parent_id: int
+    name: str
+    path: str
+    level: int
+    sort_no: int
+    status: int
+    children: Optional[List['TagCategoryTreeNode']] = None
+
+    class Config:
+        from_attributes = True
+
+
+class TagCategoryBatchDeleteRequest(BaseModel):
+    """批量删除标签分类请求"""
+    ids: List[int] = Field(..., description="分类ID列表")
+
+
+class TagCategoryMoveRequest(BaseModel):
+    """移动标签分类请求"""
+    id: int = Field(..., description="分类ID")
+    parent_id: int = Field(..., description="新的父级分类ID")

+ 4 - 0
src/app/schemas/base.py

@@ -54,6 +54,10 @@ class BaseModelSchema(IDSchema, TimestampSchema):
     is_deleted: bool = Field(default=False, description="是否删除")
 
 
+class PageQuery(BaseModel):
+    """分页查询参数"""
+    page: int = Field(1, ge=1)
+    page_size: int = Field(20, ge=1, le=200)
 
 
 class ApiResponse(BaseModel):

+ 2 - 0
src/app/server/app.py

@@ -48,6 +48,7 @@ from views.sample_view import router as sample_router
 from views.auth_view import router as auth_router
 from views.knowledge_base_view import router as knowledge_base_router
 from views.snippet_view import router as snippet_router
+from views.tag_view import router as tag_router
 
 # 导入现有API路由
 from app.api.v1.api_router import api_router
@@ -248,6 +249,7 @@ app.include_router(auth_router, prefix="/api/v1")
 app.include_router(sample_router, prefix="/api/v1")
 app.include_router(knowledge_base_router, prefix="/api/v1")
 app.include_router(snippet_router, prefix="/api/v1")
+app.include_router(tag_router, prefix="/api/v1")
 
 
 def create_app() -> FastAPI:

+ 483 - 0
src/app/services/tag_category_service.py

@@ -0,0 +1,483 @@
+"""
+标签分类服务层
+处理标签分类的业务逻辑、树形结构管理、路径维护等
+"""
+import logging
+from typing import Optional, List, Dict, Tuple, Any
+from datetime import datetime, timezone
+
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select, update, and_, or_
+from app.sample.models.tag_category import TagCategory
+from app.sample.schemas.tag_category import (
+    TagCategoryCreate, TagCategoryUpdate, TagCategoryResponse,
+    TagCategoryTreeNode
+)
+from app.services.user_service import UserService
+
+logger = logging.getLogger(__name__)
+
+
+class TagCategoryService:
+    """标签分类服务类 - db session由路由层注入"""
+    
+    def __init__(self, session: AsyncSession):
+        """初始化服务,接收数据库session"""
+        self.session = session
+        self.user_service = UserService(session)
+    
+    async def create_tag_category(
+        self,
+        tag_data: TagCategoryCreate,
+        created_by: Optional[str] = None
+    ) -> TagCategoryResponse:
+        """创建标签分类"""
+        try:
+            path = "/"
+            level = 1
+            
+            if tag_data.parent_id != 0:
+                result = await self.session.execute(
+                    select(TagCategory).where(
+                        and_(
+                            TagCategory.id == tag_data.parent_id,
+                            TagCategory.is_deleted == 0
+                        )
+                    )
+                )
+                parent = result.scalar_one_or_none()
+                
+                if not parent:
+                    raise ValueError(f"父级分类ID {tag_data.parent_id} 不存在")
+                
+                path = f"{parent.path}{tag_data.parent_id}/"
+                level = parent.level + 1
+            
+            tag_category = TagCategory(
+                parent_id=tag_data.parent_id,
+                name=tag_data.name,
+                path=path,
+                level=level,
+                sort_no=tag_data.sort_no or 0,
+                status=tag_data.status or 1,
+                created_by=created_by,
+                created_at=datetime.now(timezone.utc),
+                updated_by=created_by,
+                updated_at=datetime.now(timezone.utc)
+            )
+            
+            self.session.add(tag_category)
+            await self.session.flush()
+            await self.session.refresh(tag_category)
+            
+            logger.info(f"创建标签分类成功: {tag_category.id} - {tag_category.name}")
+            return await self._enrich_with_user_names(tag_category)
+        
+        except Exception as e:
+            logger.error(f"创建标签分类失败: {str(e)}")
+            raise
+    
+    async def get_tag_category_by_id(self, category_id: int) -> Optional[TagCategoryResponse]:
+        """根据ID获取标签分类"""
+        try:
+            result = await self.session.execute(
+                select(TagCategory).where(
+                    and_(
+                        TagCategory.id == category_id,
+                        TagCategory.is_deleted == 0
+                    )
+                )
+            )
+            tag_category = result.scalar_one_or_none()
+            if tag_category:
+                return await self._enrich_with_user_names(tag_category)
+            return None
+        
+        except Exception as e:
+            logger.error(f"查询标签分类失败: {str(e)}")
+            raise
+    
+    async def list_tag_categories(
+        self,
+        parent_id: Optional[int] = None,
+        status: Optional[int] = None,
+        skip: int = 0,
+        limit: int = 100
+    ) -> List[TagCategoryResponse]:
+        """列表查询标签分类"""
+        try:
+            conditions = [TagCategory.is_deleted == 0]
+            
+            if parent_id is not None:
+                conditions.append(TagCategory.parent_id == parent_id)
+            
+            if status is not None:
+                conditions.append(TagCategory.status == status)
+            
+            query = select(TagCategory).where(and_(*conditions))
+            query = query.order_by(TagCategory.sort_no, TagCategory.id)
+            query = query.offset(skip).limit(limit)
+            
+            result = await self.session.execute(query)
+            categories = result.scalars().all()
+            
+            return await self._enrich_list_with_user_names(categories)
+        
+        except Exception as e:
+            logger.error(f"查询标签分类列表失败: {str(e)}")
+            raise
+    
+    async def update_tag_category(
+        self,
+        category_id: int,
+        tag_data: TagCategoryUpdate,
+        updated_by: Optional[str] = None
+    ) -> TagCategoryResponse:
+        """更新标签分类"""
+        try:
+            result = await self.session.execute(
+                select(TagCategory).where(
+                    and_(
+                        TagCategory.id == category_id,
+                        TagCategory.is_deleted == 0
+                    )
+                )
+            )
+            tag_category = result.scalar_one_or_none()
+            
+            if not tag_category:
+                raise ValueError(f"分类ID {category_id} 不存在")
+            
+            path_changed = tag_data.parent_id is not None and tag_data.parent_id != tag_category.parent_id
+            
+            if path_changed:
+                if tag_data.parent_id != 0:
+                    result = await self.session.execute(
+                        select(TagCategory).where(
+                            and_(
+                                TagCategory.id == tag_data.parent_id,
+                                TagCategory.is_deleted == 0
+                            )
+                        )
+                    )
+                    parent = result.scalar_one_or_none()
+                    
+                    if not parent:
+                        raise ValueError(f"新的父级分类ID {tag_data.parent_id} 不存在")
+                    
+                    if self._is_ancestor(tag_category, parent):
+                        raise ValueError("不能将分类移动到其子级分类下")
+                    
+                    tag_category.path = f"{parent.path}{tag_data.parent_id}/"
+                    tag_category.level = parent.level + 1
+                else:
+                    tag_category.path = "/"
+                    tag_category.level = 1
+                
+                tag_category.parent_id = tag_data.parent_id
+                
+                await self._update_descendants_path(tag_category)
+            
+            if tag_data.name is not None:
+                tag_category.name = tag_data.name
+            if tag_data.sort_no is not None:
+                tag_category.sort_no = tag_data.sort_no
+            if tag_data.status is not None:
+                tag_category.status = tag_data.status
+            
+            tag_category.updated_by = updated_by
+            tag_category.updated_at = datetime.now(timezone.utc)
+            
+            await self.session.flush()
+            await self.session.refresh(tag_category)
+            
+            logger.info(f"更新标签分类成功: {tag_category.id}")
+            return await self._enrich_with_user_names(tag_category)
+        
+        except Exception as e:
+            logger.error(f"更新标签分类失败: {str(e)}")
+            raise
+    
+    async def delete_tag_category(
+        self,
+        category_id: int,
+        soft_delete: bool = True
+    ) -> bool:
+        """删除标签分类"""
+        try:
+            result = await self.session.execute(
+                select(TagCategory).where(TagCategory.id == category_id)
+            )
+            tag_category = result.scalar_one_or_none()
+            
+            if not tag_category:
+                raise ValueError(f"分类ID {category_id} 不存在")
+            
+            if soft_delete:
+                await self._soft_delete_with_children(category_id)
+            else:
+                await self.session.delete(tag_category)
+            
+            logger.info(f"删除标签分类成功: {category_id}")
+            return True
+        
+        except Exception as e:
+            logger.error(f"删除标签分类失败: {str(e)}")
+            raise
+    
+    async def get_category_tree(
+        self,
+        root_id: int = 0,
+        include_disabled: bool = False
+    ) -> List[TagCategoryTreeNode]:
+        """获取标签分类树"""
+        try:
+            conditions = [TagCategory.is_deleted == 0]
+            if not include_disabled:
+                conditions.append(TagCategory.status == 1)
+            
+            result = await self.session.execute(
+                select(TagCategory).where(and_(*conditions))
+                .order_by(TagCategory.sort_no, TagCategory.id)
+            )
+            all_categories = result.scalars().all()
+            
+            return self._build_tree(all_categories, root_id)
+        
+        except Exception as e:
+            logger.error(f"获取分类树失败: {str(e)}")
+            raise
+    
+    @staticmethod
+    def _build_tree(categories: List[TagCategory], parent_id: int = 0) -> List[TagCategoryTreeNode]:
+        """递归构建树形结构"""
+        tree = []
+        for cat in categories:
+            if cat.parent_id == parent_id:
+                children = TagCategoryService._build_tree(categories, cat.id)
+                node = TagCategoryTreeNode(
+                    id=cat.id,
+                    parent_id=cat.parent_id,
+                    name=cat.name,
+                    path=cat.path,
+                    level=cat.level,
+                    sort_no=cat.sort_no,
+                    status=cat.status,
+                    children=children if children else None
+                )
+                tree.append(node)
+        return tree
+    
+    async def get_breadcrumb(self, category_id: int) -> List[Dict[str, Any]]:
+        """获取分类的面包屑路径"""
+        try:
+            result = await self.session.execute(
+                select(TagCategory).where(
+                    and_(
+                        TagCategory.id == category_id,
+                        TagCategory.is_deleted == 0
+                    )
+                )
+            )
+            category = result.scalar_one_or_none()
+            
+            if not category:
+                return []
+            
+            path_ids = [int(x) for x in category.path.strip('/').split('/') if x]
+            path_ids.append(category.id)
+            
+            if not path_ids:
+                return []
+            
+            result = await self.session.execute(
+                select(TagCategory).where(TagCategory.id.in_(path_ids))
+                .order_by(TagCategory.level)
+            )
+            ancestors = result.scalars().all()
+            
+            return [{"id": cat.id, "name": cat.name} for cat in ancestors]
+        
+        except Exception as e:
+            logger.error(f"获取面包屑路径失败: {str(e)}")
+            raise
+    
+    @staticmethod
+    def _is_ancestor(ancestor: TagCategory, descendant: TagCategory) -> bool:
+        """检查ancestor是否是descendant的祖先"""
+        path_ids = [int(x) for x in descendant.path.strip('/').split('/') if x]
+        return ancestor.id in path_ids
+    
+    async def _update_descendants_path(self, parent: TagCategory) -> None:
+        """递归更新所有子分类的path和level"""
+        result = await self.session.execute(
+            select(TagCategory).where(TagCategory.parent_id == parent.id)
+        )
+        children = result.scalars().all()
+        
+        for child in children:
+            child.path = f"{parent.path}{parent.id}/"
+            child.level = parent.level + 1
+            child.updated_at = datetime.now(timezone.utc)
+            
+            await self._update_descendants_path(child)
+    
+    async def _soft_delete_with_children(self, category_id: int) -> None:
+        """软删除分类及其所有子分类"""
+        result = await self.session.execute(
+            select(TagCategory).where(TagCategory.id == category_id)
+        )
+        category = result.scalar_one_or_none()
+        
+        if not category:
+            return
+        
+        path_prefix = f"{category.path}{category_id}/"
+        
+        query = update(TagCategory).where(
+            or_(
+                TagCategory.id == category_id,
+                TagCategory.path.like(f"{path_prefix}%")
+            )
+        ).values(
+            is_deleted=1,
+            updated_at=datetime.now(timezone.utc)
+        )
+        
+        await self.session.execute(query)
+    
+    async def batch_delete_tag_categories(
+        self,
+        category_ids: List[int],
+        soft_delete: bool = True
+    ) -> Tuple[int, str]:
+        """批量删除标签分类"""
+        try:
+            if not category_ids:
+                return 0, "分类ID列表为空"
+            
+            success_count = 0
+            
+            for cat_id in category_ids:
+                try:
+                    result = await self.session.execute(
+                        select(TagCategory).where(TagCategory.id == cat_id)
+                    )
+                    tag_category = result.scalar_one_or_none()
+                    
+                    if not tag_category:
+                        continue
+                    
+                    if soft_delete:
+                        await self._soft_delete_with_children(cat_id)
+                    else:
+                        await self.session.delete(tag_category)
+                    
+                    success_count += 1
+                except Exception as e:
+                    logger.warning(f"删除分类 {cat_id} 失败: {str(e)}")
+            
+            message = f"成功删除 {success_count}/{len(category_ids)} 条分类"
+            return success_count, message
+        
+        except Exception as e:
+            logger.error(f"批量删除分类失败: {str(e)}")
+            raise
+    
+    async def move_tag_category(
+        self,
+        category_id: int,
+        target_parent_id: int,
+        updated_by: Optional[str] = None
+    ) -> TagCategoryResponse:
+        """移动分类到新的父分类"""
+        update_data = TagCategoryUpdate(parent_id=target_parent_id)
+        return await self.update_tag_category(category_id, update_data, updated_by)
+    
+    async def reorder_siblings(
+        self,
+        category_id: int,
+        new_sort_no: int,
+        updated_by: Optional[str] = None
+    ) -> TagCategoryResponse:
+        """重新排序分类"""
+        update_data = TagCategoryUpdate(sort_no=new_sort_no)
+        return await self.update_tag_category(category_id, update_data, updated_by)
+    
+    async def get_children_count(
+        self,
+        category_id: int,
+        recursive: bool = False
+    ) -> int:
+        """获取分类的子分类数量"""
+        try:
+            if recursive:
+                result = await self.session.execute(
+                    select(TagCategory).where(
+                        and_(
+                            TagCategory.path.like(f"/%{category_id}/%"),
+                            TagCategory.is_deleted == 0
+                        )
+                    )
+                )
+            else:
+                result = await self.session.execute(
+                    select(TagCategory).where(
+                        and_(
+                            TagCategory.parent_id == category_id,
+                            TagCategory.is_deleted == 0
+                        )
+                    )
+                )
+            
+            categories = result.scalars().all()
+            return len(categories)
+        
+        except Exception as e:
+            logger.error(f"获取子分类数量失败: {str(e)}")
+            raise
+    
+    async def _enrich_with_user_names(self, category: TagCategory) -> TagCategoryResponse:
+        """为单个分类对象填充用户名"""
+        response = TagCategoryResponse.model_validate(category)
+        
+        if category.created_by:
+            user = await self.user_service.get_user_by_id(category.created_by)
+            response.created_by_name = user.username if user else None
+        
+        if category.updated_by:
+            user = await self.user_service.get_user_by_id(category.updated_by)
+            response.updated_by_name = user.username if user else None
+        
+        return response
+    
+    async def _enrich_list_with_user_names(self, categories: List[TagCategory]) -> List[TagCategoryResponse]:
+        """为分类列表批量填充用户名"""
+        if not categories:
+            return []
+        
+        # 收集所有需要查询的用户ID
+        user_ids = set()
+        for cat in categories:
+            if cat.created_by:
+                user_ids.add(cat.created_by)
+            if cat.updated_by:
+                user_ids.add(cat.updated_by)
+        
+        # 批量查询用户名
+        user_map = {}
+        if user_ids:
+            for user_id in user_ids:
+                user = await self.user_service.get_user_by_id(user_id)
+                if user:
+                    user_map[user_id] = user.username
+        
+        # 构建响应列表
+        result = []
+        for cat in categories:
+            response = TagCategoryResponse.model_validate(cat)
+            response.created_by_name = user_map.get(cat.created_by) if cat.created_by else None
+            response.updated_by_name = user_map.get(cat.updated_by) if cat.updated_by else None
+            result.append(response)
+        
+        return result

+ 32 - 0
src/app/services/user_service.py

@@ -0,0 +1,32 @@
+"""
+用户服务层
+处理用户相关的业务逻辑
+"""
+import logging
+from typing import Optional, Dict, List
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+from app.models.user import User
+
+logger = logging.getLogger(__name__)
+
+
+class UserService:
+    """用户服务类"""
+    
+    def __init__(self, session: AsyncSession):
+        """初始化服务,接收数据库session"""
+        self.session = session
+    
+    async def get_user_by_id(self, user_id: str) -> Optional[User]:
+        """根据用户ID获取用户对象"""
+        try:
+            result = await self.session.execute(
+                select(User).where(User.id == user_id)
+            )
+            user = result.scalar_one_or_none()
+            return user
+        except Exception as e:
+            logger.error(f"根据ID获取用户失败: user_id={user_id}, error={str(e)}")
+            return None
+    

+ 236 - 0
src/views/tag_view.py

@@ -0,0 +1,236 @@
+"""
+标签分类视图路由
+处理标签分类的 API 接口
+"""
+from fastapi import APIRouter, Query, Path, Depends
+from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.base.async_mysql_connection import get_db
+from app.services.tag_category_service import TagCategoryService
+from app.services.auth_service import AuthService
+from app.sample.schemas.tag_category import (
+    TagCategoryCreate,
+    TagCategoryUpdate,
+    TagCategoryBatchDeleteRequest,
+    TagCategoryMoveRequest
+)
+from app.schemas.base import ResponseSchema, PaginatedResponseSchema
+
+router = APIRouter(prefix="/sample/tag", tags=["样本中心-标签分类"])
+security = HTTPBearer()
+
+
+async def get_current_user_id(
+    credentials: HTTPAuthorizationCredentials = Depends(security),
+    session: AsyncSession = Depends(get_db)
+) -> str:
+    """获取当前登录用户UUID"""
+    auth_service = AuthService(session)
+    user = await auth_service.get_current_user(credentials.credentials)
+    return user.id
+
+
+@router.post("/create", response_model=ResponseSchema)
+async def create_tag_category(
+    tag_data: TagCategoryCreate,
+    session: AsyncSession = Depends(get_db),
+    current_user_id: str = Depends(get_current_user_id)
+):
+    """创建标签分类"""
+    service = TagCategoryService(session)
+    result = await service.create_tag_category(tag_data, created_by=current_user_id)
+    await session.commit()
+    
+    return ResponseSchema(
+        code=200,
+        message="创建标签分类成功",
+        data=result.model_dump()
+    )
+
+
+@router.get("/detail/{category_id}", response_model=ResponseSchema)
+async def get_tag_category(
+    category_id: int = Path(..., description="分类ID"),
+    session: AsyncSession = Depends(get_db)
+):
+    """获取标签分类详情"""
+    service = TagCategoryService(session)
+    result = await service.get_tag_category_by_id(category_id)
+    
+    return ResponseSchema(
+        code=200,
+        message="获取成功",
+        data=result.model_dump() if result else None
+    )
+
+
+@router.get("/list", response_model=PaginatedResponseSchema)
+async def list_tag_categories(
+    parent_id: int = Query(None, description="父级分类ID"),
+    status: int = Query(None, description="状态筛选"),
+    page: int = Query(1, ge=1, description="页码"),
+    page_size: int = Query(10, ge=1, le=100, description="每页数量"),
+    session: AsyncSession = Depends(get_db)
+):
+    """列表查询标签分类"""
+    skip = (page - 1) * page_size
+    
+    service = TagCategoryService(session)
+    categories = await service.list_tag_categories(
+        parent_id=parent_id,
+        status=status,
+        skip=skip,
+        limit=page_size
+    )
+    
+    return PaginatedResponseSchema(
+        code=200,
+        message="获取标签分类列表成功",
+        data=[cat.model_dump() for cat in categories],
+        meta={
+            "page": page,
+            "page_size": page_size,
+            "total": len(categories)
+        }
+    )
+
+
+@router.post("/update/{category_id}", response_model=ResponseSchema)
+async def update_tag_category(
+    category_id: int = Path(..., description="分类ID"),
+    tag_data: TagCategoryUpdate = None,
+    session: AsyncSession = Depends(get_db)
+):
+    """更新标签分类"""
+    service = TagCategoryService(session)
+    result = await service.update_tag_category(category_id, tag_data)
+    await session.commit()
+    
+    return ResponseSchema(
+        code=200,
+        message="更新标签分类成功",
+        data=result.model_dump()
+    )
+
+
+@router.post("/delete/{category_id}", response_model=ResponseSchema)
+async def delete_tag_category(
+    category_id: int = Path(..., description="分类ID"),
+    soft_delete: bool = Query(True, description="是否软删除"),
+    session: AsyncSession = Depends(get_db)
+):
+    """删除标签分类"""
+    service = TagCategoryService(session)
+    await service.delete_tag_category(category_id, soft_delete=soft_delete)
+    await session.commit()
+    
+    return ResponseSchema(
+        code=200,
+        message="删除标签分类成功"
+    )
+
+
+@router.get("/tree", response_model=ResponseSchema)
+async def get_category_tree(
+    include_disabled: bool = Query(False, description="是否包含禁用分类"),
+    session: AsyncSession = Depends(get_db)
+):
+    """获取标签分类树(层级结构)"""
+    service = TagCategoryService(session)
+    tree = await service.get_category_tree(root_id=0, include_disabled=include_disabled)
+    
+    return ResponseSchema(
+        code=200,
+        message="获取分类树成功",
+        data=[node.model_dump() for node in tree]
+    )
+
+
+@router.get("/breadcrumb/{category_id}", response_model=ResponseSchema)
+async def get_category_breadcrumb(
+    category_id: int = Path(..., description="分类ID"),
+    session: AsyncSession = Depends(get_db)
+):
+    """获取分类的面包屑路径"""
+    service = TagCategoryService(session)
+    breadcrumb = await service.get_breadcrumb(category_id)
+    
+    return ResponseSchema(
+        code=200,
+        message="获取面包屑路径成功",
+        data=breadcrumb
+    )
+
+
+@router.post("/batch/delete", response_model=ResponseSchema)
+async def batch_delete_categories(
+    request: TagCategoryBatchDeleteRequest,
+    soft_delete: bool = Query(True, description="是否软删除"),
+    session: AsyncSession = Depends(get_db)
+):
+    """批量删除标签分类"""
+    service = TagCategoryService(session)
+    success_count, message = await service.batch_delete_tag_categories(
+        request.ids, soft_delete=soft_delete
+    )
+    await session.commit()
+    
+    return ResponseSchema(
+        code=200,
+        message=message,
+        data={"success_count": success_count, "total": len(request.ids)}
+    )
+
+
+@router.post("/move/{category_id}", response_model=ResponseSchema)
+async def move_tag_category(
+    category_id: int = Path(..., description="分类ID"),
+    request: TagCategoryMoveRequest = None,
+    session: AsyncSession = Depends(get_db)
+):
+    """移动分类到新的父分类"""
+    service = TagCategoryService(session)
+    result = await service.move_tag_category(category_id, request.parent_id)
+    await session.commit()
+    
+    return ResponseSchema(
+        code=200,
+        message="移动分类成功",
+        data=result.model_dump()
+    )
+
+
+@router.post("/reorder/{category_id}", response_model=ResponseSchema)
+async def reorder_tag_category(
+    category_id: int = Path(..., description="分类ID"),
+    sort_no: int = Query(..., description="新的排序号"),
+    session: AsyncSession = Depends(get_db)
+):
+    """重新排序分类"""
+    service = TagCategoryService(session)
+    result = await service.reorder_siblings(category_id, sort_no)
+    await session.commit()
+    
+    return ResponseSchema(
+        code=200,
+        message="排序成功",
+        data=result.model_dump()
+    )
+
+
+@router.get("/children-count/{category_id}", response_model=ResponseSchema)
+async def get_children_count(
+    category_id: int = Path(..., description="分类ID"),
+    recursive: bool = Query(False, description="是否递归计算所有后代"),
+    session: AsyncSession = Depends(get_db)
+):
+    """获取分类的子分类数量"""
+    service = TagCategoryService(session)
+    count = await service.get_children_count(category_id, recursive=recursive)
+    
+    return ResponseSchema(
+        code=200,
+        message="获取子分类数量成功",
+        data={"count": count}
+    )