| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- """
- 字典项管理API路由
- """
- import sys
- import os
- import logging
- from datetime import datetime, timezone
- sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))
- from fastapi import APIRouter, Depends, HTTPException
- from app.schemas.base import ApiResponse
- from app.system.schemas.dict_item_schema import (
- DictItemCreate, DictItemUpdate, DictItemListRequest, DictItemBatchDeleteRequest
- )
- from app.services.dict_item_service import DictItemService
- from app.utils.auth_dependency import get_current_user_with_refresh
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/dict/item", tags=["字典项管理"])
- @router.post("/list")
- async def get_item_list(
- request: DictItemListRequest,
- current_user: dict = Depends(get_current_user_with_refresh)
- ):
- """
- 获取字典项列表(分页)
- """
- try:
- service = DictItemService()
- items, total = await service.get_item_list(
- page=request.page,
- page_size=request.page_size,
- category_id=request.category_id,
- keyword=request.keyword,
- enable_flag=request.enable_flag
- )
-
- return ApiResponse(
- code="000000",
- message="成功",
- data={
- "list": items,
- "total": total,
- "page": request.page,
- "page_size": request.page_size
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- except Exception as e:
- logger.exception("获取字典项列表错误")
- return ApiResponse(
- code="500001",
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @router.get("/{dict_id}")
- async def get_item_detail(
- dict_id: int,
- current_user: dict = Depends(get_current_user_with_refresh)
- ):
- """
- 获取字典项详情
- """
- try:
- service = DictItemService()
- item = await service.get_item_by_id(dict_id)
-
- if not item:
- return ApiResponse(
- code="300001",
- message="字典项不存在",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- return ApiResponse(
- code="000000",
- message="成功",
- data=item,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- except Exception as e:
- logger.exception(f"获取字典项详情错误: dict_id={dict_id}")
- return ApiResponse(
- code="500001",
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @router.post("")
- async def create_item(
- item: DictItemCreate,
- current_user: dict = Depends(get_current_user_with_refresh)
- ):
- """
- 创建字典项
- """
- try:
- user_id = current_user.get("sub")
- service = DictItemService()
-
- success, message, dict_id = await service.create_item(
- item.model_dump(),
- user_id
- )
-
- if success:
- return ApiResponse(
- code="000000",
- message=message,
- data={"dict_id": dict_id},
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- else:
- return ApiResponse(
- code="400001",
- message=message,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- except Exception as e:
- logger.exception("创建字典项错误")
- return ApiResponse(
- code="500001",
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @router.put("/{dict_id}")
- async def update_item(
- dict_id: int,
- item: DictItemUpdate,
- current_user: dict = Depends(get_current_user_with_refresh)
- ):
- """
- 更新字典项
- """
- try:
- user_id = current_user.get("sub")
- service = DictItemService()
-
- success, message = await service.update_item(
- dict_id,
- item.model_dump(exclude_none=True),
- user_id
- )
-
- if success:
- return ApiResponse(
- code="000000",
- message=message,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- else:
- return ApiResponse(
- code="400001",
- message=message,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- except Exception as e:
- logger.exception(f"更新字典项错误: dict_id={dict_id}")
- return ApiResponse(
- code="500001",
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @router.delete("/{dict_id}")
- async def delete_item(
- dict_id: int,
- current_user: dict = Depends(get_current_user_with_refresh)
- ):
- """
- 删除字典项
- """
- try:
- service = DictItemService()
- success, message = await service.delete_item(dict_id)
-
- if success:
- return ApiResponse(
- code="000000",
- message=message,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- else:
- return ApiResponse(
- code="400001",
- message=message,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- except Exception as e:
- logger.exception(f"删除字典项错误: dict_id={dict_id}")
- return ApiResponse(
- code="500001",
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @router.post("/batch-delete")
- async def batch_delete_items(
- request: DictItemBatchDeleteRequest,
- current_user: dict = Depends(get_current_user_with_refresh)
- ):
- """
- 批量删除字典项
- """
- try:
- service = DictItemService()
- success, message = await service.batch_delete_items(request.dict_ids)
-
- if success:
- return ApiResponse(
- code="000000",
- message=message,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- else:
- return ApiResponse(
- code="400001",
- message=message,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- except Exception as e:
- logger.exception("批量删除字典项错误")
- return ApiResponse(
- code="500001",
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @router.put("/{dict_id}/toggle-status")
- async def toggle_item_status(
- dict_id: int,
- enable_flag: str,
- current_user: dict = Depends(get_current_user_with_refresh)
- ):
- """
- 切换字典项启用状态
- """
- try:
- user_id = current_user.get("sub")
- service = DictItemService()
-
- success, message = await service.toggle_item_status(dict_id, enable_flag, user_id)
-
- if success:
- return ApiResponse(
- code="000000",
- message=message,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- else:
- return ApiResponse(
- code="400001",
- message=message,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- except Exception as e:
- logger.exception(f"切换字典项状态错误: dict_id={dict_id}")
- return ApiResponse(
- code="500001",
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
|