| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- from sqlalchemy.orm import Session
- from sqlalchemy import and_, or_
- from typing import Optional, Any, List
- from datetime import datetime
- from app.models.admin import OperationLog
- from app.models.log import LoginLog, APILog
- from app.models.api_call_log import ApiCallLog
- from app.models.user import User
- import openpyxl
- from io import BytesIO
- import os
- # API路径到模块的映射
- MODULE_PATH_MAPPING = {
- '系统': ['/api/admin/logs', '/api/admin/auth', '/api/auth'],
- '用户管理': ['/api/admin/users'],
- '模型管理': ['/api/admin/models'],
- '系统配置': ['/api/admin/config'],
- '视频生成': ['/api/admin/review/videos', '/api/video'],
- '图片生成': ['/api/admin/review/pictures', '/api/image'],
- '数据统计': ['/api/admin/stats'],
- 'AI对话': ['/api/llm', '/api/conversation'],
- '语音合成': ['/api/audio'],
- '模型广场': ['/api/models'],
- '用户': ['/api/user'],
- }
- class LogService:
- def __init__(self, db: Session):
- self.db = db
-
- def log_operation(
- self,
- admin_id: int,
- operation_type: str,
- module: str,
- target_id: str,
- detail: dict,
- ip_address: str
- ):
- """记录操作日志"""
- log = OperationLog(
- admin_id=admin_id,
- operation_type=operation_type,
- module=module,
- target_id=target_id,
- detail=detail,
- ip_address=ip_address
- )
- self.db.add(log)
- self.db.commit()
-
- def log_login(
- self,
- user_id: str,
- user_type: str,
- login_result: str,
- fail_reason: Optional[str] = None,
- ip_address: Optional[str] = None,
- user_agent: Optional[str] = None
- ):
- """记录登录日志"""
- log = LoginLog(
- user_id=user_id,
- user_type=user_type,
- login_result=login_result,
- fail_reason=fail_reason,
- ip_address=ip_address,
- user_agent=user_agent
- )
- self.db.add(log)
- self.db.commit()
-
- def log_api_call(
- self,
- user_id: Optional[str],
- api_path: str,
- request_method: str,
- request_params: dict,
- response_status: int,
- response_time: int
- ):
- """记录API调用日志"""
- masked_params = self._mask_sensitive_data(request_params)
-
- log = APILog(
- user_id=user_id,
- api_path=api_path,
- request_method=request_method,
- request_params=masked_params,
- response_status=response_status,
- response_time=response_time
- )
- self.db.add(log)
- self.db.commit()
-
- def get_operation_logs(
- self,
- start_date: Optional[str] = None,
- end_date: Optional[str] = None,
- operation_type: Optional[str] = None,
- admin_id: Optional[int] = None,
- keyword: Optional[str] = None,
- page: int = 1,
- size: int = 20
- ):
- """查询操作日志"""
- query = self.db.query(OperationLog)
-
- if start_date:
- query = query.filter(OperationLog.created_at >= start_date)
- if end_date:
- query = query.filter(OperationLog.created_at <= end_date)
- if operation_type:
- query = query.filter(OperationLog.operation_type == operation_type)
- if admin_id:
- query = query.filter(OperationLog.admin_id == admin_id)
-
- query = query.order_by(OperationLog.created_at.desc())
-
- total = query.count()
- offset = (page - 1) * size
- items = query.offset(offset).limit(size).all()
-
- return {
- "items": items,
- "total": total,
- "page": page,
- "size": size
- }
-
- def get_login_logs(
- self,
- start_date: Optional[str] = None,
- end_date: Optional[str] = None,
- user_type: Optional[str] = None,
- login_result: Optional[str] = None,
- page: int = 1,
- size: int = 20
- ):
- """查询登录日志"""
- query = self.db.query(LoginLog)
-
- if start_date:
- query = query.filter(LoginLog.created_at >= start_date)
- if end_date:
- query = query.filter(LoginLog.created_at <= end_date)
- if user_type:
- query = query.filter(LoginLog.user_type == user_type)
- if login_result:
- query = query.filter(LoginLog.login_result == login_result)
-
- query = query.order_by(LoginLog.created_at.desc())
-
- total = query.count()
- offset = (page - 1) * size
- logs = query.offset(offset).limit(size).all()
-
- items = [{
- "id": log.id,
- "user_id": log.user_id,
- "username": log.username,
- "user_type": log.user_type,
- "login_result": log.login_result,
- "fail_reason": log.fail_reason,
- "ip_address": log.ip_address,
- "user_agent": log.user_agent,
- "created_at": log.created_at.isoformat() if log.created_at else None
- } for log in logs]
-
- return {
- "items": items,
- "total": total,
- "page": page,
- "size": size
- }
-
- def _get_module_from_path(self, api_path: str) -> str:
- """根据API路径获取模块名称"""
- for module, paths in MODULE_PATH_MAPPING.items():
- for path in paths:
- if path in api_path:
- return module
- return '其他'
-
- def _get_paths_for_module(self, module: str) -> List[str]:
- """根据模块名称获取对应的API路径列表"""
- return MODULE_PATH_MAPPING.get(module, [])
- def get_api_logs(
- self,
- start_date: Optional[str] = None,
- end_date: Optional[str] = None,
- user_id: Optional[str] = None,
- username: Optional[str] = None,
- phone: Optional[str] = None,
- module: Optional[str] = None,
- api_path: Optional[str] = None,
- page: int = 1,
- size: int = 20
- ):
- """查询API日志,支持按用户名、电话、模块筛选"""
- # 如果选择开放平台模块,查询api_call_log表
- if module == '开放平台':
- return self._get_platform_api_logs(
- start_date=start_date,
- end_date=end_date,
- user_id=user_id,
- username=username,
- phone=phone,
- page=page,
- size=size
- )
-
- query = self.db.query(APILog)
-
- if start_date:
- query = query.filter(APILog.created_at >= start_date)
- if end_date:
- query = query.filter(APILog.created_at <= end_date)
- if user_id:
- query = query.filter(APILog.user_id == user_id)
- if api_path:
- query = query.filter(APILog.api_path.like(f"%{api_path}%"))
-
- # 按用户名筛选 - 需要关联用户表
- if username:
- user_ids = self.db.query(User.id).filter(
- User.username.ilike(f"%{username}%")
- ).all()
- user_id_list = [u[0] for u in user_ids]
- if user_id_list:
- query = query.filter(APILog.user_id.in_(user_id_list))
- else:
- # 没有匹配的用户,返回空结果
- return {"items": [], "total": 0, "page": page, "size": size}
-
- # 按电话号码筛选
- if phone:
- user_ids = self.db.query(User.id).filter(
- User.phone.ilike(f"%{phone}%")
- ).all()
- user_id_list = [u[0] for u in user_ids]
- if user_id_list:
- query = query.filter(APILog.user_id.in_(user_id_list))
- else:
- return {"items": [], "total": 0, "page": page, "size": size}
-
- # 按模块筛选
- if module:
- module_paths = self._get_paths_for_module(module)
- if module_paths:
- path_filters = [APILog.api_path.like(f"%{p}%") for p in module_paths]
- query = query.filter(or_(*path_filters))
-
- query = query.order_by(APILog.created_at.desc())
-
- total = query.count()
- offset = (page - 1) * size
- logs = query.offset(offset).limit(size).all()
-
- # 批量获取用户信息
- user_ids = list(set([log.user_id for log in logs if log.user_id]))
- user_map = {}
- if user_ids:
- users = self.db.query(User).filter(User.id.in_(user_ids)).all()
- user_map = {u.id: u for u in users}
-
- items = []
- for log in logs:
- user = user_map.get(log.user_id) if log.user_id else None
- items.append({
- "id": log.id,
- "user_id": log.user_id,
- "username": user.username if user else log.username,
- "phone": user.phone if user else None,
- "api_path": log.api_path,
- "module": self._get_module_from_path(log.api_path),
- "request_method": log.request_method,
- "request_params": log.request_params,
- "response_status": log.response_status,
- "response_time": log.response_time,
- "created_at": log.created_at.isoformat() if log.created_at else None
- })
-
- return {
- "items": items,
- "total": total,
- "page": page,
- "size": size
- }
-
- def _get_platform_api_logs(
- self,
- start_date: Optional[str] = None,
- end_date: Optional[str] = None,
- user_id: Optional[str] = None,
- username: Optional[str] = None,
- phone: Optional[str] = None,
- page: int = 1,
- size: int = 20
- ):
- """查询开放平台API调用日志"""
- query = self.db.query(ApiCallLog)
-
- if start_date:
- query = query.filter(ApiCallLog.created_at >= start_date)
- if end_date:
- query = query.filter(ApiCallLog.created_at <= end_date)
- if user_id:
- query = query.filter(ApiCallLog.user_id == user_id)
-
- # 按用户名筛选
- if username:
- user_ids = self.db.query(User.id).filter(
- User.username.ilike(f"%{username}%")
- ).all()
- user_id_list = [u[0] for u in user_ids]
- if user_id_list:
- query = query.filter(ApiCallLog.user_id.in_(user_id_list))
- else:
- return {"items": [], "total": 0, "page": page, "size": size}
-
- # 按电话号码筛选
- if phone:
- user_ids = self.db.query(User.id).filter(
- User.phone.ilike(f"%{phone}%")
- ).all()
- user_id_list = [u[0] for u in user_ids]
- if user_id_list:
- query = query.filter(ApiCallLog.user_id.in_(user_id_list))
- else:
- return {"items": [], "total": 0, "page": page, "size": size}
-
- query = query.order_by(ApiCallLog.created_at.desc())
-
- total = query.count()
- offset = (page - 1) * size
- logs = query.offset(offset).limit(size).all()
-
- # 批量获取用户信息
- user_ids_list = list(set([log.user_id for log in logs if log.user_id]))
- user_map = {}
- if user_ids_list:
- users = self.db.query(User).filter(User.id.in_(user_ids_list)).all()
- user_map = {u.id: u for u in users}
-
- items = []
- for log in logs:
- user = user_map.get(log.user_id)
- items.append({
- "id": log.id,
- "user_id": log.user_id,
- "username": user.username if user else None,
- "phone": user.phone if user else None,
- "api_path": f"/v1/chat/completions ({log.model_name})",
- "module": "开放平台",
- "request_method": "POST",
- "request_params": None,
- "response_status": 200 if log.status == "success" else 500,
- "response_time": None,
- "model_name": log.model_name,
- "input_tokens": log.input_tokens,
- "output_tokens": log.output_tokens,
- "bill": float(log.bill) if log.bill else 0,
- "status": log.status,
- "request_ip": log.request_ip,
- "created_at": log.created_at.isoformat() if log.created_at else None
- })
-
- return {
- "items": items,
- "total": total,
- "page": page,
- "size": size
- }
-
- def export_logs(self, log_type: str, filters: dict) -> str:
- """导出日志为Excel"""
- if log_type == "operation":
- data = self.get_operation_logs(**filters, page=1, size=10000)
- elif log_type == "login":
- data = self.get_login_logs(**filters, page=1, size=10000)
- elif log_type == "api":
- data = self.get_api_logs(**filters, page=1, size=10000)
- else:
- raise ValueError("不支持的日志类型")
-
- wb = openpyxl.Workbook()
- ws = wb.active
-
- if log_type == "operation":
- ws.append(["ID", "管理员ID", "操作类型", "模块", "目标ID", "IP地址", "创建时间"])
- for item in data["items"]:
- ws.append([
- item.id if hasattr(item, 'id') else item.get('id'),
- item.admin_id if hasattr(item, 'admin_id') else item.get('admin_id'),
- item.operation_type if hasattr(item, 'operation_type') else item.get('operation_type'),
- item.module if hasattr(item, 'module') else item.get('module'),
- item.target_id if hasattr(item, 'target_id') else item.get('target_id'),
- item.ip_address if hasattr(item, 'ip_address') else item.get('ip_address'),
- str(item.created_at if hasattr(item, 'created_at') else item.get('created_at'))
- ])
- elif log_type == "login":
- ws.append(["ID", "用户ID", "用户类型", "登录结果", "失败原因", "IP地址", "创建时间"])
- for item in data["items"]:
- ws.append([
- item.get('id'),
- item.get('user_id'),
- item.get('user_type'),
- item.get('login_result'),
- item.get('fail_reason') or "",
- item.get('ip_address'),
- item.get('created_at')
- ])
- elif log_type == "api":
- ws.append(["ID", "用户ID", "API路径", "请求方法", "响应状态", "响应时间", "创建时间"])
- for item in data["items"]:
- ws.append([
- item.get('id'),
- item.get('user_id') or "",
- item.get('api_path'),
- item.get('request_method'),
- item.get('response_status'),
- item.get('response_time'),
- item.get('created_at')
- ])
-
- filename = f"{log_type}_logs_{datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx"
- export_dir = os.path.join(os.path.expanduser("~"), "Downloads")
- os.makedirs(export_dir, exist_ok=True)
- filepath = os.path.join(export_dir, filename)
- wb.save(filepath)
-
- return filepath
-
- def _mask_sensitive_data(self, data: dict) -> dict:
- """脱敏处理敏感信息"""
- if not data:
- return data
-
- sensitive_keys = ['password', 'token', 'secret', 'key', 'api_key']
- masked_data = data.copy()
-
- for key in masked_data:
- if any(s in key.lower() for s in sensitive_keys):
- masked_data[key] = '***'
-
- return masked_data
|