| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- """
- 管理员用户管理服务
- 提供用户列表查询、详情查询、状态变更等功能
- Requirements: 2.1-2.5, 3.1-3.5, 4.1-4.5, 5.1-5.5, 6.1-6.4
- """
- import secrets
- import string
- from datetime import datetime, date
- from typing import Optional, List, Tuple
- from sqlalchemy.orm import Session
- from sqlalchemy import and_, or_, desc, asc, func
- from app.models.user import User
- from app.models.api_call_log import ApiCallLog
- from app.services.auth_service import AuthService
- from app.schemas.admin_schema import (
- UserListParams, UserListItem, UserDetailResponse,
- UserUsageStats
- )
- class AdminUserService:
- """管理员用户管理服务类"""
- def __init__(self, db: Session):
- self.db = db
- def list_users(self, params: UserListParams) -> Tuple[List[UserListItem], int]:
- """获取用户列表,支持搜索、筛选、排序、分页"""
- query = self.db.query(User)
- # 搜索条件
- if params.keyword:
- keyword = f"%{params.keyword}%"
- query = query.filter(or_(
- User.id.ilike(keyword),
- User.username.ilike(keyword),
- User.nickname.ilike(keyword),
- User.real_name.ilike(keyword),
- User.phone.ilike(keyword),
- User.email.ilike(keyword)
- ))
- # 状态筛选
- if params.status:
- query = query.filter(User.status == params.status)
- # 注册时间筛选
- if params.register_start:
- query = query.filter(User.registration_date >= params.register_start)
- if params.register_end:
- query = query.filter(User.registration_date <= params.register_end)
- # 总数
- total = query.count()
- # 排序
- sort_column = User.created_at
- if params.sort_by == "created_at":
- sort_column = User.created_at
- if params.sort_order == "asc":
- query = query.order_by(asc(sort_column))
- else:
- query = query.order_by(desc(sort_column))
- # 分页
- users = query.offset((params.page - 1) * params.size).limit(params.size).all()
- # 转换为响应格式
- result = []
- for user in users:
- result.append(UserListItem(
- id=user.id,
- username=user.username,
- nickname=user.nickname,
- phone=user.phone,
- registration_date=user.registration_date,
- status=user.status,
- is_verified=user.is_verified,
- real_name=user.real_name
- ))
- return result, total
- def get_user_detail(self, user_id: str) -> Optional[UserDetailResponse]:
- """获取用户详情"""
- user = self.db.query(User).filter(User.id == user_id).first()
- if not user:
- return None
- return UserDetailResponse(
- id=user.id,
- username=user.username,
- nickname=user.nickname,
- phone=user.phone,
- email=user.email,
- avatar=user.avatar,
- registration_date=user.registration_date,
- status=user.status
- )
- def get_user_usage_stats(self, user_id: str) -> UserUsageStats:
- """获取用户使用统计"""
- # API调用统计
- api_stats = self.db.query(
- func.count(ApiCallLog.id),
- func.coalesce(func.sum(ApiCallLog.input_tokens), 0),
- func.coalesce(func.sum(ApiCallLog.output_tokens), 0)
- ).filter(ApiCallLog.user_id == user_id).first()
- api_call_count = api_stats[0] if api_stats else 0
- api_input_tokens = int(api_stats[1]) if api_stats else 0
- api_output_tokens = int(api_stats[2]) if api_stats else 0
- return UserUsageStats(
- conversation_count=0,
- token_consumed=0,
- image_count=0,
- audio_count=0,
- audio_duration=0,
- video_count=0,
- api_call_count=api_call_count,
- api_input_tokens=api_input_tokens,
- api_output_tokens=api_output_tokens
- )
- def update_user_status(self, user_id: str, status: str) -> bool:
- """更新用户状态"""
- user = self.db.query(User).filter(User.id == user_id).first()
- if not user:
- raise ValueError("USER_NOT_FOUND")
- if user.status == status:
- raise ValueError("STATUS_UNCHANGED")
- user.status = status
- self.db.commit()
- return True
- def reset_password(self, user_id: str) -> str:
- """重置用户密码,返回新密码"""
- user = self.db.query(User).filter(User.id == user_id).first()
- if not user:
- raise ValueError("USER_NOT_FOUND")
- # 生成8位随机密码
- alphabet = string.ascii_letters + string.digits
- new_password = ''.join(secrets.choice(alphabet) for _ in range(8))
- # 更新密码哈希
- user.password_hash = AuthService.hash_password(new_password)
- self.db.commit()
- return new_password
|