from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from typing import Optional, Any, List from datetime import datetime from app.models.admin import OperationLog from app.models.log import LoginLog, APILog from app.models.api_call_log import ApiCallLog from app.models.user import User import openpyxl from io import BytesIO import os # API路径到模块的映射 MODULE_PATH_MAPPING = { '系统': ['/api/admin/logs', '/api/admin/auth', '/api/auth'], '用户管理': ['/api/admin/users'], '模型管理': ['/api/admin/models'], '系统配置': ['/api/admin/config'], '视频生成': ['/api/admin/review/videos', '/api/video'], '图片生成': ['/api/admin/review/pictures', '/api/image'], '数据统计': ['/api/admin/stats'], 'AI对话': ['/api/llm', '/api/conversation'], '语音合成': ['/api/audio'], '模型广场': ['/api/models'], '用户': ['/api/user'], } class LogService: def __init__(self, db: Session): self.db = db def log_operation( self, admin_id: int, operation_type: str, module: str, target_id: str, detail: dict, ip_address: str ): """记录操作日志""" log = OperationLog( admin_id=admin_id, operation_type=operation_type, module=module, target_id=target_id, detail=detail, ip_address=ip_address ) self.db.add(log) self.db.commit() def log_login( self, user_id: str, user_type: str, login_result: str, fail_reason: Optional[str] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None ): """记录登录日志""" log = LoginLog( user_id=user_id, user_type=user_type, login_result=login_result, fail_reason=fail_reason, ip_address=ip_address, user_agent=user_agent ) self.db.add(log) self.db.commit() def log_api_call( self, user_id: Optional[str], api_path: str, request_method: str, request_params: dict, response_status: int, response_time: int ): """记录API调用日志""" masked_params = self._mask_sensitive_data(request_params) log = APILog( user_id=user_id, api_path=api_path, request_method=request_method, request_params=masked_params, response_status=response_status, response_time=response_time ) self.db.add(log) self.db.commit() def get_operation_logs( self, start_date: Optional[str] = None, end_date: Optional[str] = None, operation_type: Optional[str] = None, admin_id: Optional[int] = None, keyword: Optional[str] = None, page: int = 1, size: int = 20 ): """查询操作日志""" query = self.db.query(OperationLog) if start_date: query = query.filter(OperationLog.created_at >= start_date) if end_date: query = query.filter(OperationLog.created_at <= end_date) if operation_type: query = query.filter(OperationLog.operation_type == operation_type) if admin_id: query = query.filter(OperationLog.admin_id == admin_id) query = query.order_by(OperationLog.created_at.desc()) total = query.count() offset = (page - 1) * size items = query.offset(offset).limit(size).all() return { "items": items, "total": total, "page": page, "size": size } def get_login_logs( self, start_date: Optional[str] = None, end_date: Optional[str] = None, user_type: Optional[str] = None, login_result: Optional[str] = None, page: int = 1, size: int = 20 ): """查询登录日志""" query = self.db.query(LoginLog) if start_date: query = query.filter(LoginLog.created_at >= start_date) if end_date: query = query.filter(LoginLog.created_at <= end_date) if user_type: query = query.filter(LoginLog.user_type == user_type) if login_result: query = query.filter(LoginLog.login_result == login_result) query = query.order_by(LoginLog.created_at.desc()) total = query.count() offset = (page - 1) * size logs = query.offset(offset).limit(size).all() items = [{ "id": log.id, "user_id": log.user_id, "username": log.username, "user_type": log.user_type, "login_result": log.login_result, "fail_reason": log.fail_reason, "ip_address": log.ip_address, "user_agent": log.user_agent, "created_at": log.created_at.isoformat() if log.created_at else None } for log in logs] return { "items": items, "total": total, "page": page, "size": size } def _get_module_from_path(self, api_path: str) -> str: """根据API路径获取模块名称""" for module, paths in MODULE_PATH_MAPPING.items(): for path in paths: if path in api_path: return module return '其他' def _get_paths_for_module(self, module: str) -> List[str]: """根据模块名称获取对应的API路径列表""" return MODULE_PATH_MAPPING.get(module, []) def get_api_logs( self, start_date: Optional[str] = None, end_date: Optional[str] = None, user_id: Optional[str] = None, username: Optional[str] = None, phone: Optional[str] = None, module: Optional[str] = None, api_path: Optional[str] = None, page: int = 1, size: int = 20 ): """查询API日志,支持按用户名、电话、模块筛选""" # 如果选择开放平台模块,查询api_call_log表 if module == '开放平台': return self._get_platform_api_logs( start_date=start_date, end_date=end_date, user_id=user_id, username=username, phone=phone, page=page, size=size ) query = self.db.query(APILog) if start_date: query = query.filter(APILog.created_at >= start_date) if end_date: query = query.filter(APILog.created_at <= end_date) if user_id: query = query.filter(APILog.user_id == user_id) if api_path: query = query.filter(APILog.api_path.like(f"%{api_path}%")) # 按用户名筛选 - 需要关联用户表 if username: user_ids = self.db.query(User.id).filter( User.username.ilike(f"%{username}%") ).all() user_id_list = [u[0] for u in user_ids] if user_id_list: query = query.filter(APILog.user_id.in_(user_id_list)) else: # 没有匹配的用户,返回空结果 return {"items": [], "total": 0, "page": page, "size": size} # 按电话号码筛选 if phone: user_ids = self.db.query(User.id).filter( User.phone.ilike(f"%{phone}%") ).all() user_id_list = [u[0] for u in user_ids] if user_id_list: query = query.filter(APILog.user_id.in_(user_id_list)) else: return {"items": [], "total": 0, "page": page, "size": size} # 按模块筛选 if module: module_paths = self._get_paths_for_module(module) if module_paths: path_filters = [APILog.api_path.like(f"%{p}%") for p in module_paths] query = query.filter(or_(*path_filters)) query = query.order_by(APILog.created_at.desc()) total = query.count() offset = (page - 1) * size logs = query.offset(offset).limit(size).all() # 批量获取用户信息 user_ids = list(set([log.user_id for log in logs if log.user_id])) user_map = {} if user_ids: users = self.db.query(User).filter(User.id.in_(user_ids)).all() user_map = {u.id: u for u in users} items = [] for log in logs: user = user_map.get(log.user_id) if log.user_id else None items.append({ "id": log.id, "user_id": log.user_id, "username": user.username if user else log.username, "phone": user.phone if user else None, "api_path": log.api_path, "module": self._get_module_from_path(log.api_path), "request_method": log.request_method, "request_params": log.request_params, "response_status": log.response_status, "response_time": log.response_time, "created_at": log.created_at.isoformat() if log.created_at else None }) return { "items": items, "total": total, "page": page, "size": size } def _get_platform_api_logs( self, start_date: Optional[str] = None, end_date: Optional[str] = None, user_id: Optional[str] = None, username: Optional[str] = None, phone: Optional[str] = None, page: int = 1, size: int = 20 ): """查询开放平台API调用日志""" query = self.db.query(ApiCallLog) if start_date: query = query.filter(ApiCallLog.created_at >= start_date) if end_date: query = query.filter(ApiCallLog.created_at <= end_date) if user_id: query = query.filter(ApiCallLog.user_id == user_id) # 按用户名筛选 if username: user_ids = self.db.query(User.id).filter( User.username.ilike(f"%{username}%") ).all() user_id_list = [u[0] for u in user_ids] if user_id_list: query = query.filter(ApiCallLog.user_id.in_(user_id_list)) else: return {"items": [], "total": 0, "page": page, "size": size} # 按电话号码筛选 if phone: user_ids = self.db.query(User.id).filter( User.phone.ilike(f"%{phone}%") ).all() user_id_list = [u[0] for u in user_ids] if user_id_list: query = query.filter(ApiCallLog.user_id.in_(user_id_list)) else: return {"items": [], "total": 0, "page": page, "size": size} query = query.order_by(ApiCallLog.created_at.desc()) total = query.count() offset = (page - 1) * size logs = query.offset(offset).limit(size).all() # 批量获取用户信息 user_ids_list = list(set([log.user_id for log in logs if log.user_id])) user_map = {} if user_ids_list: users = self.db.query(User).filter(User.id.in_(user_ids_list)).all() user_map = {u.id: u for u in users} items = [] for log in logs: user = user_map.get(log.user_id) items.append({ "id": log.id, "user_id": log.user_id, "username": user.username if user else None, "phone": user.phone if user else None, "api_path": f"/v1/chat/completions ({log.model_name})", "module": "开放平台", "request_method": "POST", "request_params": None, "response_status": 200 if log.status == "success" else 500, "response_time": None, "model_name": log.model_name, "input_tokens": log.input_tokens, "output_tokens": log.output_tokens, "bill": float(log.bill) if log.bill else 0, "status": log.status, "request_ip": log.request_ip, "created_at": log.created_at.isoformat() if log.created_at else None }) return { "items": items, "total": total, "page": page, "size": size } def export_logs(self, log_type: str, filters: dict) -> str: """导出日志为Excel""" if log_type == "operation": data = self.get_operation_logs(**filters, page=1, size=10000) elif log_type == "login": data = self.get_login_logs(**filters, page=1, size=10000) elif log_type == "api": data = self.get_api_logs(**filters, page=1, size=10000) else: raise ValueError("不支持的日志类型") wb = openpyxl.Workbook() ws = wb.active if log_type == "operation": ws.append(["ID", "管理员ID", "操作类型", "模块", "目标ID", "IP地址", "创建时间"]) for item in data["items"]: ws.append([ item.id if hasattr(item, 'id') else item.get('id'), item.admin_id if hasattr(item, 'admin_id') else item.get('admin_id'), item.operation_type if hasattr(item, 'operation_type') else item.get('operation_type'), item.module if hasattr(item, 'module') else item.get('module'), item.target_id if hasattr(item, 'target_id') else item.get('target_id'), item.ip_address if hasattr(item, 'ip_address') else item.get('ip_address'), str(item.created_at if hasattr(item, 'created_at') else item.get('created_at')) ]) elif log_type == "login": ws.append(["ID", "用户ID", "用户类型", "登录结果", "失败原因", "IP地址", "创建时间"]) for item in data["items"]: ws.append([ item.get('id'), item.get('user_id'), item.get('user_type'), item.get('login_result'), item.get('fail_reason') or "", item.get('ip_address'), item.get('created_at') ]) elif log_type == "api": ws.append(["ID", "用户ID", "API路径", "请求方法", "响应状态", "响应时间", "创建时间"]) for item in data["items"]: ws.append([ item.get('id'), item.get('user_id') or "", item.get('api_path'), item.get('request_method'), item.get('response_status'), item.get('response_time'), item.get('created_at') ]) filename = f"{log_type}_logs_{datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx" export_dir = os.path.join(os.path.expanduser("~"), "Downloads") os.makedirs(export_dir, exist_ok=True) filepath = os.path.join(export_dir, filename) wb.save(filepath) return filepath def _mask_sensitive_data(self, data: dict) -> dict: """脱敏处理敏感信息""" if not data: return data sensitive_keys = ['password', 'token', 'secret', 'key', 'api_key'] masked_data = data.copy() for key in masked_data: if any(s in key.lower() for s in sensitive_keys): masked_data[key] = '***' return masked_data