| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- """
- 操作日志服务
- 提供操作日志的创建、查询和导出功能
- Requirements: 12.1, 12.2, 12.3, 12.4
- """
- import json
- from datetime import datetime, date
- from typing import Optional, List, Any
- from functools import wraps
- from sqlalchemy.orm import Session
- from sqlalchemy import and_, desc
- from app.models.admin import OperationLog
- class OperationLogService:
- """操作日志服务类"""
- def __init__(self, db: Session):
- self.db = db
- def create_log(
- self,
- admin_id: int,
- operation_type: str,
- module: str,
- target_id: Optional[str] = None,
- detail: Optional[dict] = None,
- ip_address: Optional[str] = None
- ) -> OperationLog:
- """创建操作日志"""
- log = OperationLog(
- admin_id=admin_id,
- operation_type=operation_type,
- module=module,
- target_id=target_id,
- detail=json.dumps(detail, ensure_ascii=False) if detail else None,
- ip_address=ip_address
- )
- self.db.add(log)
- self.db.commit()
- self.db.refresh(log)
- return log
- def list_logs(
- self,
- page: int = 1,
- size: int = 20,
- admin_id: Optional[int] = None,
- operation_type: Optional[str] = None,
- module: Optional[str] = None,
- start_date: Optional[date] = None,
- end_date: Optional[date] = None
- ) -> tuple[List[OperationLog], int]:
- """查询操作日志列表"""
- query = self.db.query(OperationLog)
- # 筛选条件
- conditions = []
- if admin_id:
- conditions.append(OperationLog.admin_id == admin_id)
- if operation_type:
- conditions.append(OperationLog.operation_type == operation_type)
- if module:
- conditions.append(OperationLog.module == module)
- if start_date:
- conditions.append(OperationLog.created_at >= datetime.combine(start_date, datetime.min.time()))
- if end_date:
- conditions.append(OperationLog.created_at <= datetime.combine(end_date, datetime.max.time()))
- if conditions:
- query = query.filter(and_(*conditions))
- # 总数
- total = query.count()
- # 分页和排序
- logs = query.order_by(desc(OperationLog.created_at)).offset((page - 1) * size).limit(size).all()
- return logs, total
- def log_operation(operation_type: str, module: str, get_target_id=None, get_detail=None):
- """
- 操作日志装饰器
-
- 用于自动记录写操作的日志
- Requirements: 12.1, 12.2
- """
- def decorator(func):
- @wraps(func)
- async def async_wrapper(*args, **kwargs):
- result = await func(*args, **kwargs)
- # 从 kwargs 中获取必要参数
- db = kwargs.get('db')
- admin = kwargs.get('current_admin')
- request = kwargs.get('request')
-
- if db and admin:
- target_id = get_target_id(kwargs, result) if get_target_id else None
- detail = get_detail(kwargs, result) if get_detail else None
- ip = request.client.host if request and request.client else None
-
- log_service = OperationLogService(db)
- log_service.create_log(
- admin_id=admin.id,
- operation_type=operation_type,
- module=module,
- target_id=str(target_id) if target_id else None,
- detail=detail,
- ip_address=ip
- )
- return result
-
- @wraps(func)
- def sync_wrapper(*args, **kwargs):
- result = func(*args, **kwargs)
- db = kwargs.get('db')
- admin = kwargs.get('current_admin')
- request = kwargs.get('request')
-
- if db and admin:
- target_id = get_target_id(kwargs, result) if get_target_id else None
- detail = get_detail(kwargs, result) if get_detail else None
- ip = request.client.host if request and request.client else None
-
- log_service = OperationLogService(db)
- log_service.create_log(
- admin_id=admin.id,
- operation_type=operation_type,
- module=module,
- target_id=str(target_id) if target_id else None,
- detail=detail,
- ip_address=ip
- )
- return result
-
- import asyncio
- if asyncio.iscoroutinefunction(func):
- return async_wrapper
- return sync_wrapper
- return decorator
|