operation_log_service.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. """
  2. 操作日志服务
  3. 提供操作日志的创建、查询和导出功能
  4. Requirements: 12.1, 12.2, 12.3, 12.4
  5. """
  6. import json
  7. from datetime import datetime, date
  8. from typing import Optional, List, Any
  9. from functools import wraps
  10. from sqlalchemy.orm import Session
  11. from sqlalchemy import and_, desc
  12. from app.models.admin import OperationLog
  13. class OperationLogService:
  14. """操作日志服务类"""
  15. def __init__(self, db: Session):
  16. self.db = db
  17. def create_log(
  18. self,
  19. admin_id: int,
  20. operation_type: str,
  21. module: str,
  22. target_id: Optional[str] = None,
  23. detail: Optional[dict] = None,
  24. ip_address: Optional[str] = None
  25. ) -> OperationLog:
  26. """创建操作日志"""
  27. log = OperationLog(
  28. admin_id=admin_id,
  29. operation_type=operation_type,
  30. module=module,
  31. target_id=target_id,
  32. detail=json.dumps(detail, ensure_ascii=False) if detail else None,
  33. ip_address=ip_address
  34. )
  35. self.db.add(log)
  36. self.db.commit()
  37. self.db.refresh(log)
  38. return log
  39. def list_logs(
  40. self,
  41. page: int = 1,
  42. size: int = 20,
  43. admin_id: Optional[int] = None,
  44. operation_type: Optional[str] = None,
  45. module: Optional[str] = None,
  46. start_date: Optional[date] = None,
  47. end_date: Optional[date] = None
  48. ) -> tuple[List[OperationLog], int]:
  49. """查询操作日志列表"""
  50. query = self.db.query(OperationLog)
  51. # 筛选条件
  52. conditions = []
  53. if admin_id:
  54. conditions.append(OperationLog.admin_id == admin_id)
  55. if operation_type:
  56. conditions.append(OperationLog.operation_type == operation_type)
  57. if module:
  58. conditions.append(OperationLog.module == module)
  59. if start_date:
  60. conditions.append(OperationLog.created_at >= datetime.combine(start_date, datetime.min.time()))
  61. if end_date:
  62. conditions.append(OperationLog.created_at <= datetime.combine(end_date, datetime.max.time()))
  63. if conditions:
  64. query = query.filter(and_(*conditions))
  65. # 总数
  66. total = query.count()
  67. # 分页和排序
  68. logs = query.order_by(desc(OperationLog.created_at)).offset((page - 1) * size).limit(size).all()
  69. return logs, total
  70. def log_operation(operation_type: str, module: str, get_target_id=None, get_detail=None):
  71. """
  72. 操作日志装饰器
  73. 用于自动记录写操作的日志
  74. Requirements: 12.1, 12.2
  75. """
  76. def decorator(func):
  77. @wraps(func)
  78. async def async_wrapper(*args, **kwargs):
  79. result = await func(*args, **kwargs)
  80. # 从 kwargs 中获取必要参数
  81. db = kwargs.get('db')
  82. admin = kwargs.get('current_admin')
  83. request = kwargs.get('request')
  84. if db and admin:
  85. target_id = get_target_id(kwargs, result) if get_target_id else None
  86. detail = get_detail(kwargs, result) if get_detail else None
  87. ip = request.client.host if request and request.client else None
  88. log_service = OperationLogService(db)
  89. log_service.create_log(
  90. admin_id=admin.id,
  91. operation_type=operation_type,
  92. module=module,
  93. target_id=str(target_id) if target_id else None,
  94. detail=detail,
  95. ip_address=ip
  96. )
  97. return result
  98. @wraps(func)
  99. def sync_wrapper(*args, **kwargs):
  100. result = func(*args, **kwargs)
  101. db = kwargs.get('db')
  102. admin = kwargs.get('current_admin')
  103. request = kwargs.get('request')
  104. if db and admin:
  105. target_id = get_target_id(kwargs, result) if get_target_id else None
  106. detail = get_detail(kwargs, result) if get_detail else None
  107. ip = request.client.host if request and request.client else None
  108. log_service = OperationLogService(db)
  109. log_service.create_log(
  110. admin_id=admin.id,
  111. operation_type=operation_type,
  112. module=module,
  113. target_id=str(target_id) if target_id else None,
  114. detail=detail,
  115. ip_address=ip
  116. )
  117. return result
  118. import asyncio
  119. if asyncio.iscoroutinefunction(func):
  120. return async_wrapper
  121. return sync_wrapper
  122. return decorator