""" 管理员用户管理服务 提供用户列表查询、详情查询、状态变更等功能 Requirements: 2.1-2.5, 3.1-3.5, 4.1-4.5, 5.1-5.5, 6.1-6.4 """ import secrets import string from datetime import datetime, date from typing import Optional, List, Tuple from sqlalchemy.orm import Session from sqlalchemy import and_, or_, desc, asc, func from app.models.user import User from app.models.api_call_log import ApiCallLog from app.services.auth_service import AuthService from app.schemas.admin_schema import ( UserListParams, UserListItem, UserDetailResponse, UserUsageStats ) class AdminUserService: """管理员用户管理服务类""" def __init__(self, db: Session): self.db = db def list_users(self, params: UserListParams) -> Tuple[List[UserListItem], int]: """获取用户列表,支持搜索、筛选、排序、分页""" query = self.db.query(User) # 搜索条件 if params.keyword: keyword = f"%{params.keyword}%" query = query.filter(or_( User.id.ilike(keyword), User.username.ilike(keyword), User.nickname.ilike(keyword), User.real_name.ilike(keyword), User.phone.ilike(keyword), User.email.ilike(keyword) )) # 状态筛选 if params.status: query = query.filter(User.status == params.status) # 注册时间筛选 if params.register_start: query = query.filter(User.registration_date >= params.register_start) if params.register_end: query = query.filter(User.registration_date <= params.register_end) # 总数 total = query.count() # 排序 sort_column = User.created_at if params.sort_by == "created_at": sort_column = User.created_at if params.sort_order == "asc": query = query.order_by(asc(sort_column)) else: query = query.order_by(desc(sort_column)) # 分页 users = query.offset((params.page - 1) * params.size).limit(params.size).all() # 转换为响应格式 result = [] for user in users: result.append(UserListItem( id=user.id, username=user.username, nickname=user.nickname, phone=user.phone, registration_date=user.registration_date, status=user.status, is_verified=user.is_verified, real_name=user.real_name )) return result, total def get_user_detail(self, user_id: str) -> Optional[UserDetailResponse]: """获取用户详情""" user = self.db.query(User).filter(User.id == user_id).first() if not user: return None return UserDetailResponse( id=user.id, username=user.username, nickname=user.nickname, phone=user.phone, email=user.email, avatar=user.avatar, registration_date=user.registration_date, status=user.status ) def get_user_usage_stats(self, user_id: str) -> UserUsageStats: """获取用户使用统计""" # API调用统计 api_stats = self.db.query( func.count(ApiCallLog.id), func.coalesce(func.sum(ApiCallLog.input_tokens), 0), func.coalesce(func.sum(ApiCallLog.output_tokens), 0) ).filter(ApiCallLog.user_id == user_id).first() api_call_count = api_stats[0] if api_stats else 0 api_input_tokens = int(api_stats[1]) if api_stats else 0 api_output_tokens = int(api_stats[2]) if api_stats else 0 return UserUsageStats( conversation_count=0, token_consumed=0, image_count=0, audio_count=0, audio_duration=0, video_count=0, api_call_count=api_call_count, api_input_tokens=api_input_tokens, api_output_tokens=api_output_tokens ) def update_user_status(self, user_id: str, status: str) -> bool: """更新用户状态""" user = self.db.query(User).filter(User.id == user_id).first() if not user: raise ValueError("USER_NOT_FOUND") if user.status == status: raise ValueError("STATUS_UNCHANGED") user.status = status self.db.commit() return True def reset_password(self, user_id: str) -> str: """重置用户密码,返回新密码""" user = self.db.query(User).filter(User.id == user_id).first() if not user: raise ValueError("USER_NOT_FOUND") # 生成8位随机密码 alphabet = string.ascii_letters + string.digits new_password = ''.join(secrets.choice(alphabet) for _ in range(8)) # 更新密码哈希 user.password_hash = AuthService.hash_password(new_password) self.db.commit() return new_password