log_service.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. from sqlalchemy.orm import Session
  2. from sqlalchemy import and_, or_
  3. from typing import Optional, Any, List
  4. from datetime import datetime
  5. from app.models.admin import OperationLog
  6. from app.models.log import LoginLog, APILog
  7. from app.models.api_call_log import ApiCallLog
  8. from app.models.user import User
  9. import openpyxl
  10. from io import BytesIO
  11. import os
  12. # API路径到模块的映射
  13. MODULE_PATH_MAPPING = {
  14. '系统': ['/api/admin/logs', '/api/admin/auth', '/api/auth'],
  15. '用户管理': ['/api/admin/users'],
  16. '模型管理': ['/api/admin/models'],
  17. '系统配置': ['/api/admin/config'],
  18. '视频生成': ['/api/admin/review/videos', '/api/video'],
  19. '图片生成': ['/api/admin/review/pictures', '/api/image'],
  20. '数据统计': ['/api/admin/stats'],
  21. 'AI对话': ['/api/llm', '/api/conversation'],
  22. '语音合成': ['/api/audio'],
  23. '模型广场': ['/api/models'],
  24. '用户': ['/api/user'],
  25. }
  26. class LogService:
  27. def __init__(self, db: Session):
  28. self.db = db
  29. def log_operation(
  30. self,
  31. admin_id: int,
  32. operation_type: str,
  33. module: str,
  34. target_id: str,
  35. detail: dict,
  36. ip_address: str
  37. ):
  38. """记录操作日志"""
  39. log = OperationLog(
  40. admin_id=admin_id,
  41. operation_type=operation_type,
  42. module=module,
  43. target_id=target_id,
  44. detail=detail,
  45. ip_address=ip_address
  46. )
  47. self.db.add(log)
  48. self.db.commit()
  49. def log_login(
  50. self,
  51. user_id: str,
  52. user_type: str,
  53. login_result: str,
  54. fail_reason: Optional[str] = None,
  55. ip_address: Optional[str] = None,
  56. user_agent: Optional[str] = None
  57. ):
  58. """记录登录日志"""
  59. log = LoginLog(
  60. user_id=user_id,
  61. user_type=user_type,
  62. login_result=login_result,
  63. fail_reason=fail_reason,
  64. ip_address=ip_address,
  65. user_agent=user_agent
  66. )
  67. self.db.add(log)
  68. self.db.commit()
  69. def log_api_call(
  70. self,
  71. user_id: Optional[str],
  72. api_path: str,
  73. request_method: str,
  74. request_params: dict,
  75. response_status: int,
  76. response_time: int
  77. ):
  78. """记录API调用日志"""
  79. masked_params = self._mask_sensitive_data(request_params)
  80. log = APILog(
  81. user_id=user_id,
  82. api_path=api_path,
  83. request_method=request_method,
  84. request_params=masked_params,
  85. response_status=response_status,
  86. response_time=response_time
  87. )
  88. self.db.add(log)
  89. self.db.commit()
  90. def get_operation_logs(
  91. self,
  92. start_date: Optional[str] = None,
  93. end_date: Optional[str] = None,
  94. operation_type: Optional[str] = None,
  95. admin_id: Optional[int] = None,
  96. keyword: Optional[str] = None,
  97. page: int = 1,
  98. size: int = 20
  99. ):
  100. """查询操作日志"""
  101. query = self.db.query(OperationLog)
  102. if start_date:
  103. query = query.filter(OperationLog.created_at >= start_date)
  104. if end_date:
  105. query = query.filter(OperationLog.created_at <= end_date)
  106. if operation_type:
  107. query = query.filter(OperationLog.operation_type == operation_type)
  108. if admin_id:
  109. query = query.filter(OperationLog.admin_id == admin_id)
  110. query = query.order_by(OperationLog.created_at.desc())
  111. total = query.count()
  112. offset = (page - 1) * size
  113. items = query.offset(offset).limit(size).all()
  114. return {
  115. "items": items,
  116. "total": total,
  117. "page": page,
  118. "size": size
  119. }
  120. def get_login_logs(
  121. self,
  122. start_date: Optional[str] = None,
  123. end_date: Optional[str] = None,
  124. user_type: Optional[str] = None,
  125. login_result: Optional[str] = None,
  126. page: int = 1,
  127. size: int = 20
  128. ):
  129. """查询登录日志"""
  130. query = self.db.query(LoginLog)
  131. if start_date:
  132. query = query.filter(LoginLog.created_at >= start_date)
  133. if end_date:
  134. query = query.filter(LoginLog.created_at <= end_date)
  135. if user_type:
  136. query = query.filter(LoginLog.user_type == user_type)
  137. if login_result:
  138. query = query.filter(LoginLog.login_result == login_result)
  139. query = query.order_by(LoginLog.created_at.desc())
  140. total = query.count()
  141. offset = (page - 1) * size
  142. logs = query.offset(offset).limit(size).all()
  143. items = [{
  144. "id": log.id,
  145. "user_id": log.user_id,
  146. "username": log.username,
  147. "user_type": log.user_type,
  148. "login_result": log.login_result,
  149. "fail_reason": log.fail_reason,
  150. "ip_address": log.ip_address,
  151. "user_agent": log.user_agent,
  152. "created_at": log.created_at.isoformat() if log.created_at else None
  153. } for log in logs]
  154. return {
  155. "items": items,
  156. "total": total,
  157. "page": page,
  158. "size": size
  159. }
  160. def _get_module_from_path(self, api_path: str) -> str:
  161. """根据API路径获取模块名称"""
  162. for module, paths in MODULE_PATH_MAPPING.items():
  163. for path in paths:
  164. if path in api_path:
  165. return module
  166. return '其他'
  167. def _get_paths_for_module(self, module: str) -> List[str]:
  168. """根据模块名称获取对应的API路径列表"""
  169. return MODULE_PATH_MAPPING.get(module, [])
  170. def get_api_logs(
  171. self,
  172. start_date: Optional[str] = None,
  173. end_date: Optional[str] = None,
  174. user_id: Optional[str] = None,
  175. username: Optional[str] = None,
  176. phone: Optional[str] = None,
  177. module: Optional[str] = None,
  178. api_path: Optional[str] = None,
  179. page: int = 1,
  180. size: int = 20
  181. ):
  182. """查询API日志,支持按用户名、电话、模块筛选"""
  183. # 如果选择开放平台模块,查询api_call_log表
  184. if module == '开放平台':
  185. return self._get_platform_api_logs(
  186. start_date=start_date,
  187. end_date=end_date,
  188. user_id=user_id,
  189. username=username,
  190. phone=phone,
  191. page=page,
  192. size=size
  193. )
  194. query = self.db.query(APILog)
  195. if start_date:
  196. query = query.filter(APILog.created_at >= start_date)
  197. if end_date:
  198. query = query.filter(APILog.created_at <= end_date)
  199. if user_id:
  200. query = query.filter(APILog.user_id == user_id)
  201. if api_path:
  202. query = query.filter(APILog.api_path.like(f"%{api_path}%"))
  203. # 按用户名筛选 - 需要关联用户表
  204. if username:
  205. user_ids = self.db.query(User.id).filter(
  206. User.username.ilike(f"%{username}%")
  207. ).all()
  208. user_id_list = [u[0] for u in user_ids]
  209. if user_id_list:
  210. query = query.filter(APILog.user_id.in_(user_id_list))
  211. else:
  212. # 没有匹配的用户,返回空结果
  213. return {"items": [], "total": 0, "page": page, "size": size}
  214. # 按电话号码筛选
  215. if phone:
  216. user_ids = self.db.query(User.id).filter(
  217. User.phone.ilike(f"%{phone}%")
  218. ).all()
  219. user_id_list = [u[0] for u in user_ids]
  220. if user_id_list:
  221. query = query.filter(APILog.user_id.in_(user_id_list))
  222. else:
  223. return {"items": [], "total": 0, "page": page, "size": size}
  224. # 按模块筛选
  225. if module:
  226. module_paths = self._get_paths_for_module(module)
  227. if module_paths:
  228. path_filters = [APILog.api_path.like(f"%{p}%") for p in module_paths]
  229. query = query.filter(or_(*path_filters))
  230. query = query.order_by(APILog.created_at.desc())
  231. total = query.count()
  232. offset = (page - 1) * size
  233. logs = query.offset(offset).limit(size).all()
  234. # 批量获取用户信息
  235. user_ids = list(set([log.user_id for log in logs if log.user_id]))
  236. user_map = {}
  237. if user_ids:
  238. users = self.db.query(User).filter(User.id.in_(user_ids)).all()
  239. user_map = {u.id: u for u in users}
  240. items = []
  241. for log in logs:
  242. user = user_map.get(log.user_id) if log.user_id else None
  243. items.append({
  244. "id": log.id,
  245. "user_id": log.user_id,
  246. "username": user.username if user else log.username,
  247. "phone": user.phone if user else None,
  248. "api_path": log.api_path,
  249. "module": self._get_module_from_path(log.api_path),
  250. "request_method": log.request_method,
  251. "request_params": log.request_params,
  252. "response_status": log.response_status,
  253. "response_time": log.response_time,
  254. "created_at": log.created_at.isoformat() if log.created_at else None
  255. })
  256. return {
  257. "items": items,
  258. "total": total,
  259. "page": page,
  260. "size": size
  261. }
  262. def _get_platform_api_logs(
  263. self,
  264. start_date: Optional[str] = None,
  265. end_date: Optional[str] = None,
  266. user_id: Optional[str] = None,
  267. username: Optional[str] = None,
  268. phone: Optional[str] = None,
  269. page: int = 1,
  270. size: int = 20
  271. ):
  272. """查询开放平台API调用日志"""
  273. query = self.db.query(ApiCallLog)
  274. if start_date:
  275. query = query.filter(ApiCallLog.created_at >= start_date)
  276. if end_date:
  277. query = query.filter(ApiCallLog.created_at <= end_date)
  278. if user_id:
  279. query = query.filter(ApiCallLog.user_id == user_id)
  280. # 按用户名筛选
  281. if username:
  282. user_ids = self.db.query(User.id).filter(
  283. User.username.ilike(f"%{username}%")
  284. ).all()
  285. user_id_list = [u[0] for u in user_ids]
  286. if user_id_list:
  287. query = query.filter(ApiCallLog.user_id.in_(user_id_list))
  288. else:
  289. return {"items": [], "total": 0, "page": page, "size": size}
  290. # 按电话号码筛选
  291. if phone:
  292. user_ids = self.db.query(User.id).filter(
  293. User.phone.ilike(f"%{phone}%")
  294. ).all()
  295. user_id_list = [u[0] for u in user_ids]
  296. if user_id_list:
  297. query = query.filter(ApiCallLog.user_id.in_(user_id_list))
  298. else:
  299. return {"items": [], "total": 0, "page": page, "size": size}
  300. query = query.order_by(ApiCallLog.created_at.desc())
  301. total = query.count()
  302. offset = (page - 1) * size
  303. logs = query.offset(offset).limit(size).all()
  304. # 批量获取用户信息
  305. user_ids_list = list(set([log.user_id for log in logs if log.user_id]))
  306. user_map = {}
  307. if user_ids_list:
  308. users = self.db.query(User).filter(User.id.in_(user_ids_list)).all()
  309. user_map = {u.id: u for u in users}
  310. items = []
  311. for log in logs:
  312. user = user_map.get(log.user_id)
  313. items.append({
  314. "id": log.id,
  315. "user_id": log.user_id,
  316. "username": user.username if user else None,
  317. "phone": user.phone if user else None,
  318. "api_path": f"/v1/chat/completions ({log.model_name})",
  319. "module": "开放平台",
  320. "request_method": "POST",
  321. "request_params": None,
  322. "response_status": 200 if log.status == "success" else 500,
  323. "response_time": None,
  324. "model_name": log.model_name,
  325. "input_tokens": log.input_tokens,
  326. "output_tokens": log.output_tokens,
  327. "bill": float(log.bill) if log.bill else 0,
  328. "status": log.status,
  329. "request_ip": log.request_ip,
  330. "created_at": log.created_at.isoformat() if log.created_at else None
  331. })
  332. return {
  333. "items": items,
  334. "total": total,
  335. "page": page,
  336. "size": size
  337. }
  338. def export_logs(self, log_type: str, filters: dict) -> str:
  339. """导出日志为Excel"""
  340. if log_type == "operation":
  341. data = self.get_operation_logs(**filters, page=1, size=10000)
  342. elif log_type == "login":
  343. data = self.get_login_logs(**filters, page=1, size=10000)
  344. elif log_type == "api":
  345. data = self.get_api_logs(**filters, page=1, size=10000)
  346. else:
  347. raise ValueError("不支持的日志类型")
  348. wb = openpyxl.Workbook()
  349. ws = wb.active
  350. if log_type == "operation":
  351. ws.append(["ID", "管理员ID", "操作类型", "模块", "目标ID", "IP地址", "创建时间"])
  352. for item in data["items"]:
  353. ws.append([
  354. item.id if hasattr(item, 'id') else item.get('id'),
  355. item.admin_id if hasattr(item, 'admin_id') else item.get('admin_id'),
  356. item.operation_type if hasattr(item, 'operation_type') else item.get('operation_type'),
  357. item.module if hasattr(item, 'module') else item.get('module'),
  358. item.target_id if hasattr(item, 'target_id') else item.get('target_id'),
  359. item.ip_address if hasattr(item, 'ip_address') else item.get('ip_address'),
  360. str(item.created_at if hasattr(item, 'created_at') else item.get('created_at'))
  361. ])
  362. elif log_type == "login":
  363. ws.append(["ID", "用户ID", "用户类型", "登录结果", "失败原因", "IP地址", "创建时间"])
  364. for item in data["items"]:
  365. ws.append([
  366. item.get('id'),
  367. item.get('user_id'),
  368. item.get('user_type'),
  369. item.get('login_result'),
  370. item.get('fail_reason') or "",
  371. item.get('ip_address'),
  372. item.get('created_at')
  373. ])
  374. elif log_type == "api":
  375. ws.append(["ID", "用户ID", "API路径", "请求方法", "响应状态", "响应时间", "创建时间"])
  376. for item in data["items"]:
  377. ws.append([
  378. item.get('id'),
  379. item.get('user_id') or "",
  380. item.get('api_path'),
  381. item.get('request_method'),
  382. item.get('response_status'),
  383. item.get('response_time'),
  384. item.get('created_at')
  385. ])
  386. filename = f"{log_type}_logs_{datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx"
  387. export_dir = os.path.join(os.path.expanduser("~"), "Downloads")
  388. os.makedirs(export_dir, exist_ok=True)
  389. filepath = os.path.join(export_dir, filename)
  390. wb.save(filepath)
  391. return filepath
  392. def _mask_sensitive_data(self, data: dict) -> dict:
  393. """脱敏处理敏感信息"""
  394. if not data:
  395. return data
  396. sensitive_keys = ['password', 'token', 'secret', 'key', 'api_key']
  397. masked_data = data.copy()
  398. for key in masked_data:
  399. if any(s in key.lower() for s in sensitive_keys):
  400. masked_data[key] = '***'
  401. return masked_data