Quellcode durchsuchen

dev:标签管理基本完成

ZengChao vor 1 Monat
Ursprung
Commit
d34dcbd582

+ 1 - 0
src/app/sample/models/tag_category.py

@@ -16,6 +16,7 @@ class TagCategory(Base):
     name = Column(String(100), nullable=False, comment="分类名称")
     path = Column(String(500), nullable=False, default="/", comment="分类路径(物化路径),例如 /1/3/9/")
     level = Column(Integer, nullable=False, default=1, comment="分类层级(1=一级,2=二级,依次递增)")
+    type = Column(CHAR(50), nullable=False, default="label", comment="类型")
     sort_no = Column(Integer, nullable=False, default=0, comment="同级排序号(越小越靠前)")
     status = Column(SmallInteger, nullable=False, default=1, comment="状态(1=启用,0=禁用)")
     is_deleted = Column(SmallInteger, nullable=False, default=0, comment="是否删除(0=未删除,1=已删除)")

+ 19 - 3
src/app/sample/schemas/tag_category.py

@@ -11,6 +11,7 @@ class TagCategoryBase(BaseModel):
     """标签分类基础模型"""
     parent_id: int = Field(0, description="父级分类ID(0表示根节点)")
     name: str = Field(..., min_length=1, max_length=100, description="分类名称")
+    type: Optional[str] = Field("label", max_length=50, description="类型")
     sort_no: Optional[int] = Field(0, description="同级排序号(越小越靠前)")
     status: Optional[int] = Field(1, description="状态(1=启用,0=禁用)")
 
@@ -24,6 +25,7 @@ class TagCategoryUpdate(BaseModel):
     """更新标签分类请求"""
     parent_id: Optional[int] = None
     name: Optional[str] = Field(None, min_length=1, max_length=100)
+    type: Optional[str] = Field(None, max_length=50)
     sort_no: Optional[int] = None
     status: Optional[int] = None
 
@@ -33,8 +35,10 @@ class TagCategoryResponse(BaseModelSchema):
     id: int
     parent_id: int
     name: str
+    parent_name: Optional[str] = None
     path: str
     level: int
+    type: Optional[str] = None
     sort_no: int
     status: int
     is_deleted: int
@@ -52,19 +56,31 @@ class TagCategoryResponse(BaseModelSchema):
         }
 
 
-class TagCategoryTreeNode(BaseModel):
-    """标签分类树节点(用于树形结构返回)"""
+class TagCategoryTreeResponse(BaseModelSchema):
+    """标签分类树响应模型(与 TagCategoryResponse 结构一致,包含子节点)"""
     id: int
     parent_id: int
     name: str
+    parent_name: Optional[str] = None
     path: str
     level: int
+    type: Optional[str] = None
     sort_no: int
     status: int
-    children: Optional[List['TagCategoryTreeNode']] = None
+    is_deleted: int
+    created_by: Optional[str] = None
+    created_by_name: Optional[str] = None
+    created_at: Optional[datetime] = None
+    updated_by: Optional[str] = None
+    updated_by_name: Optional[str] = None
+    updated_at: Optional[datetime] = None
+    children: Optional[List['TagCategoryTreeResponse']] = None
 
     class Config:
         from_attributes = True
+        json_encoders = {
+            datetime: lambda v: v.strftime("%Y-%m-%d %H:%M:%S") if v else None
+        }
 
 
 class TagCategoryBatchDeleteRequest(BaseModel):

+ 80 - 20
src/app/services/tag_category_service.py

@@ -10,8 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
 from sqlalchemy import select, update, and_, or_
 from app.sample.models.tag_category import TagCategory
 from app.sample.schemas.tag_category import (
-    TagCategoryCreate, TagCategoryUpdate, TagCategoryResponse,
-    TagCategoryTreeNode
+    TagCategoryCreate, TagCategoryUpdate, TagCategoryResponse
 )
 from app.services.user_service import UserService
 
@@ -56,6 +55,7 @@ class TagCategoryService:
             tag_category = TagCategory(
                 parent_id=tag_data.parent_id,
                 name=tag_data.name,
+                type=tag_data.type or "label",
                 path=path,
                 level=level,
                 sort_no=tag_data.sort_no or 0,
@@ -106,7 +106,10 @@ class TagCategoryService:
     ) -> List[TagCategoryResponse]:
         """列表查询标签分类"""
         try:
-            conditions = [TagCategory.is_deleted == 0]
+            conditions = [
+                TagCategory.is_deleted == 0,
+                TagCategory.type == 'label'
+            ]
             
             if parent_id is not None:
                 conditions.append(TagCategory.parent_id == parent_id)
@@ -121,7 +124,7 @@ class TagCategoryService:
             result = await self.session.execute(query)
             categories = result.scalars().all()
             
-            return await self._enrich_list_with_user_names(categories)
+            return await self._enrich_list_with_user_names(categories, parent_id)
         
         except Exception as e:
             logger.error(f"查询标签分类列表失败: {str(e)}")
@@ -180,6 +183,8 @@ class TagCategoryService:
             
             if tag_data.name is not None:
                 tag_category.name = tag_data.name
+            if tag_data.type is not None:
+                tag_category.type = tag_data.type
             if tag_data.sort_no is not None:
                 tag_category.sort_no = tag_data.sort_no
             if tag_data.status is not None:
@@ -229,7 +234,7 @@ class TagCategoryService:
         self,
         root_id: int = 0,
         include_disabled: bool = False
-    ) -> List[TagCategoryTreeNode]:
+    ) -> List[TagCategory]:
         """获取标签分类树"""
         try:
             conditions = [TagCategory.is_deleted == 0]
@@ -242,32 +247,75 @@ class TagCategoryService:
             )
             all_categories = result.scalars().all()
             
-            return self._build_tree(all_categories, root_id)
+            # 构建树
+            tree = self._build_tree(all_categories, root_id)
+            
+            # 为树节点添加 parent_name 和用户名称
+            await self._enrich_tree_with_names(tree, all_categories)
+            
+            return tree
         
         except Exception as e:
             logger.error(f"获取分类树失败: {str(e)}")
             raise
     
     @staticmethod
-    def _build_tree(categories: List[TagCategory], parent_id: int = 0) -> List[TagCategoryTreeNode]:
+    def _build_tree(categories: List[TagCategory], parent_id: int = 0) -> List[TagCategory]:
         """递归构建树形结构"""
         tree = []
         for cat in categories:
             if cat.parent_id == parent_id:
                 children = TagCategoryService._build_tree(categories, cat.id)
-                node = TagCategoryTreeNode(
-                    id=cat.id,
-                    parent_id=cat.parent_id,
-                    name=cat.name,
-                    path=cat.path,
-                    level=cat.level,
-                    sort_no=cat.sort_no,
-                    status=cat.status,
-                    children=children if children else None
-                )
-                tree.append(node)
+                # 直接设置 cat 的 children 属性
+                cat.children = children if children else None
+                tree.append(cat)
         return tree
     
+    async def _enrich_tree_with_names(self, tree: List[TagCategory], all_categories: List[TagCategory]) -> None:
+        """为树节点批量填充 parent_name 和用户名称"""
+        # 构建 category id 到 name 的映射
+        category_map = {cat.id: cat.name for cat in all_categories}
+        
+        # 收集所有用户ID
+        user_ids = set()
+        for cat in all_categories:
+            if cat.created_by:
+                user_ids.add(cat.created_by)
+            if cat.updated_by:
+                user_ids.add(cat.updated_by)
+        
+        # 批量查询用户信息
+        user_map = {}
+        for user_id in user_ids:
+            try:
+                user = await self.user_service.get_user_by_id(user_id)
+                if user:
+                    user_map[user_id] = user.username
+            except Exception as e:
+                logger.warning(f"查询用户 {user_id} 失败: {str(e)}")
+        
+        # 递归填充每个节点
+        self._enrich_node_with_names(tree, category_map, user_map)
+    
+    def _enrich_node_with_names(self, nodes: List[TagCategory], category_map: dict, user_map: dict) -> None:
+        """递归为节点填充名称"""
+        for node in nodes:
+            # 填充 parent_name
+            if node.parent_id != 0:
+                node.parent_name = category_map.get(node.parent_id)
+            else:
+                node.parent_name = None
+            
+            # 填充用户名称
+            if node.created_by:
+                node.created_by_name = user_map.get(node.created_by)
+            if node.updated_by:
+                node.updated_by_name = user_map.get(node.updated_by)
+            
+            # 递归处理子节点
+            if hasattr(node, 'children') and node.children:
+                self._enrich_node_with_names(node.children, category_map, user_map)
+    
     async def get_breadcrumb(self, category_id: int) -> List[Dict[str, Any]]:
         """获取分类的面包屑路径"""
         try:
@@ -451,8 +499,8 @@ class TagCategoryService:
         
         return response
     
-    async def _enrich_list_with_user_names(self, categories: List[TagCategory]) -> List[TagCategoryResponse]:
-        """为分类列表批量填充用户名"""
+    async def _enrich_list_with_user_names(self, categories: List[TagCategory], parent_id: Optional[int] = None) -> List[TagCategoryResponse]:
+        """为分类列表批量填充用户名和父分类名"""
         if not categories:
             return []
         
@@ -472,12 +520,24 @@ class TagCategoryService:
                 if user:
                     user_map[user_id] = user.username
         
+        # 如果指定了 parent_id,所有分类都有相同的父分类,只需查询一次
+        parent_name = None
+        if parent_id is not None and parent_id != 0:
+            result = await self.session.execute(
+                select(TagCategory).where(TagCategory.id == parent_id)
+            )
+            parent = result.scalar_one_or_none()
+            if parent:
+                parent_name = parent.name
+        
         # 构建响应列表
         result = []
         for cat in categories:
             response = TagCategoryResponse.model_validate(cat)
             response.created_by_name = user_map.get(cat.created_by) if cat.created_by else None
             response.updated_by_name = user_map.get(cat.updated_by) if cat.updated_by else None
+            # 如果指定了 parent_id,使用查询出的 parent_name;否则为 None
+            response.parent_name = parent_name if parent_id is not None and parent_id != 0 else None
             result.append(response)
         
         return result

+ 27 - 2
src/views/tag_view.py

@@ -9,11 +9,13 @@ from sqlalchemy.ext.asyncio import AsyncSession
 from app.base.async_mysql_connection import get_db
 from app.services.tag_category_service import TagCategoryService
 from app.services.auth_service import AuthService
+from app.sample.models.tag_category import TagCategory
 from app.sample.schemas.tag_category import (
     TagCategoryCreate,
     TagCategoryUpdate,
     TagCategoryBatchDeleteRequest,
-    TagCategoryMoveRequest
+    TagCategoryMoveRequest,
+    TagCategoryTreeResponse
 )
 from app.schemas.base import ResponseSchema, PaginatedResponseSchema
 
@@ -143,10 +145,33 @@ async def get_category_tree(
     return ResponseSchema(
         code=200,
         message="获取分类树成功",
-        data=[node.model_dump() for node in tree]
+        data=[_build_tree_response(node) for node in tree]
     )
 
 
+def _build_tree_response(category: TagCategory) -> dict:
+    """将 TagCategory 转换为完整的树响应结构"""
+    return {
+        'id': category.id,
+        'parent_id': category.parent_id,
+        'parent_name': getattr(category, 'parent_name', None),
+        'name': category.name,
+        'path': category.path,
+        'level': category.level,
+        'type': category.type,
+        'sort_no': category.sort_no,
+        'status': category.status,
+        'is_deleted': category.is_deleted,
+        'created_by': category.created_by,
+        'created_by_name': getattr(category, 'created_by_name', None),
+        'created_at': category.created_at,
+        'updated_by': category.updated_by,
+        'updated_by_name': getattr(category, 'updated_by_name', None),
+        'updated_at': category.updated_at,
+        'children': [_build_tree_response(child) for child in (getattr(category, 'children', None) or [])] if hasattr(category, 'children') and category.children else None
+    }
+
+
 @router.get("/breadcrumb/{category_id}", response_model=ResponseSchema)
 async def get_category_breadcrumb(
     category_id: int = Path(..., description="分类ID"),