admin_user_service.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. """
  2. 管理员用户管理服务
  3. 提供用户列表查询、详情查询、状态变更等功能
  4. Requirements: 2.1-2.5, 3.1-3.5, 4.1-4.5, 5.1-5.5, 6.1-6.4
  5. """
  6. import secrets
  7. import string
  8. from datetime import datetime, date
  9. from typing import Optional, List, Tuple
  10. from sqlalchemy.orm import Session
  11. from sqlalchemy import and_, or_, desc, asc, func
  12. from app.models.user import User
  13. from app.models.api_call_log import ApiCallLog
  14. from app.services.auth_service import AuthService
  15. from app.schemas.admin_schema import (
  16. UserListParams, UserListItem, UserDetailResponse,
  17. UserUsageStats
  18. )
  19. class AdminUserService:
  20. """管理员用户管理服务类"""
  21. def __init__(self, db: Session):
  22. self.db = db
  23. def list_users(self, params: UserListParams) -> Tuple[List[UserListItem], int]:
  24. """获取用户列表,支持搜索、筛选、排序、分页"""
  25. query = self.db.query(User)
  26. # 搜索条件
  27. if params.keyword:
  28. keyword = f"%{params.keyword}%"
  29. query = query.filter(or_(
  30. User.id.ilike(keyword),
  31. User.username.ilike(keyword),
  32. User.nickname.ilike(keyword),
  33. User.real_name.ilike(keyword),
  34. User.phone.ilike(keyword),
  35. User.email.ilike(keyword)
  36. ))
  37. # 状态筛选
  38. if params.status:
  39. query = query.filter(User.status == params.status)
  40. # 注册时间筛选
  41. if params.register_start:
  42. query = query.filter(User.registration_date >= params.register_start)
  43. if params.register_end:
  44. query = query.filter(User.registration_date <= params.register_end)
  45. # 总数
  46. total = query.count()
  47. # 排序
  48. sort_column = User.created_at
  49. if params.sort_by == "created_at":
  50. sort_column = User.created_at
  51. if params.sort_order == "asc":
  52. query = query.order_by(asc(sort_column))
  53. else:
  54. query = query.order_by(desc(sort_column))
  55. # 分页
  56. users = query.offset((params.page - 1) * params.size).limit(params.size).all()
  57. # 转换为响应格式
  58. result = []
  59. for user in users:
  60. result.append(UserListItem(
  61. id=user.id,
  62. username=user.username,
  63. nickname=user.nickname,
  64. phone=user.phone,
  65. registration_date=user.registration_date,
  66. status=user.status,
  67. is_verified=user.is_verified,
  68. real_name=user.real_name
  69. ))
  70. return result, total
  71. def get_user_detail(self, user_id: str) -> Optional[UserDetailResponse]:
  72. """获取用户详情"""
  73. user = self.db.query(User).filter(User.id == user_id).first()
  74. if not user:
  75. return None
  76. return UserDetailResponse(
  77. id=user.id,
  78. username=user.username,
  79. nickname=user.nickname,
  80. phone=user.phone,
  81. email=user.email,
  82. avatar=user.avatar,
  83. registration_date=user.registration_date,
  84. status=user.status
  85. )
  86. def get_user_usage_stats(self, user_id: str) -> UserUsageStats:
  87. """获取用户使用统计"""
  88. # API调用统计
  89. api_stats = self.db.query(
  90. func.count(ApiCallLog.id),
  91. func.coalesce(func.sum(ApiCallLog.input_tokens), 0),
  92. func.coalesce(func.sum(ApiCallLog.output_tokens), 0)
  93. ).filter(ApiCallLog.user_id == user_id).first()
  94. api_call_count = api_stats[0] if api_stats else 0
  95. api_input_tokens = int(api_stats[1]) if api_stats else 0
  96. api_output_tokens = int(api_stats[2]) if api_stats else 0
  97. return UserUsageStats(
  98. conversation_count=0,
  99. token_consumed=0,
  100. image_count=0,
  101. audio_count=0,
  102. audio_duration=0,
  103. video_count=0,
  104. api_call_count=api_call_count,
  105. api_input_tokens=api_input_tokens,
  106. api_output_tokens=api_output_tokens
  107. )
  108. def update_user_status(self, user_id: str, status: str) -> bool:
  109. """更新用户状态"""
  110. user = self.db.query(User).filter(User.id == user_id).first()
  111. if not user:
  112. raise ValueError("USER_NOT_FOUND")
  113. if user.status == status:
  114. raise ValueError("STATUS_UNCHANGED")
  115. user.status = status
  116. self.db.commit()
  117. return True
  118. def reset_password(self, user_id: str) -> str:
  119. """重置用户密码,返回新密码"""
  120. user = self.db.query(User).filter(User.id == user_id).first()
  121. if not user:
  122. raise ValueError("USER_NOT_FOUND")
  123. # 生成8位随机密码
  124. alphabet = string.ascii_letters + string.digits
  125. new_password = ''.join(secrets.choice(alphabet) for _ in range(8))
  126. # 更新密码哈希
  127. user.password_hash = AuthService.hash_password(new_password)
  128. self.db.commit()
  129. return new_password