""" 用户服务层 提供用户CRUD操作和管理员初始化功能 需求: 3.1, 3.2, 3.3, 7.1, 7.2, 7.3, 7.4, 9.1, 10.1, 10.2 """ import os import uuid from datetime import date from typing import Optional from fastapi import HTTPException from sqlalchemy.orm import Session from app.models.user import User from app.schemas.user_schema import UserCreate, UserUpdate, UserResponse from app.services.auth_service import AuthService from app.services.password_security_service import PasswordSecurityService class UserService: """用户业务服务类""" def __init__(self, db: Session): self.db = db def create_user(self, data: UserCreate) -> User: """创建新用户""" # 检查用户名是否已存在 if self.db.query(User).filter(User.username == data.username).first(): raise HTTPException(status_code=409, detail="Username already exists") # 检查邮箱是否已存在 if data.email: if self.db.query(User).filter(User.email == data.email).first(): raise HTTPException(status_code=409, detail="Email already exists") # 检查手机号是否已被注册 if data.phone: if self.db.query(User).filter(User.phone == data.phone).first(): raise HTTPException(status_code=409, detail="该手机号已被注册") # 检查密码强度 if PasswordSecurityService.is_password_weak(data.password): raise HTTPException( status_code=400, detail="密码过于简单,请设置更复杂的密码" ) # 验证密码基本要求 is_valid, errors = PasswordSecurityService.validate_password_requirements(data.password) if not is_valid: error_msg = "密码不符合要求:" + ";".join(errors) raise HTTPException(status_code=400, detail=error_msg) # 优先从 local_config db 读取 admin_api_key,没有则回退到环境变量 try: from app.services import local_config_service as _lcs default_apikey = _lcs.get_admin_api_key() or os.getenv("ADMIN_API_KEY") except Exception: default_apikey = os.getenv("ADMIN_API_KEY") user = User( id=str(uuid.uuid4()), username=data.username, password_hash=AuthService.hash_password(data.password), nickname=data.nickname, email=data.email, phone=data.phone if data.phone else None, apikey=default_apikey, registration_date=date.today() ) self.db.add(user) self.db.commit() self.db.refresh(user) return user def get_user_by_id(self, user_id: str) -> Optional[User]: """根据ID获取用户""" return self.db.query(User).filter(User.id == user_id).first() def get_user_by_username(self, username: str) -> Optional[User]: """根据用户名获取用户""" return self.db.query(User).filter(User.username == username).first() def update_user(self, user_id: str, data: UserUpdate) -> User: """更新用户信息""" user = self.get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") # 检查邮箱唯一性 if data.email: existing = self.db.query(User).filter( User.email == data.email, User.id != user_id ).first() if existing: raise HTTPException(status_code=409, detail="Email already exists") # 检查手机号唯一性 if data.phone: existing = self.db.query(User).filter( User.phone == data.phone, User.id != user_id ).first() if existing: raise HTTPException(status_code=409, detail="该手机号已被其他账号绑定") # 只更新提供的字段 update_data = data.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(user, field, value) self.db.commit() self.db.refresh(user) return user def submit_verification(self, user_id: str, encrypted_data: str) -> User: """ 提交实名认证 Args: user_id: 用户ID encrypted_data: RSA加密的数据(格式: real_name|id_card) """ from app.services.encryption_service import encryption_service from app.services.id_card_validator import IDCardValidator, validate_real_name user = self.get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") # 检查是否已经认证通过 if user.is_verified == "verified": raise HTTPException(status_code=400, detail="用户已完成实名认证") # RSA解密数据 try: decrypted_data = encryption_service.rsa_decrypt(encrypted_data) parts = decrypted_data.split('|') if len(parts) != 2: raise ValueError("数据格式错误") real_name, id_card = parts except Exception as e: raise HTTPException(status_code=400, detail=f"数据解密失败: {str(e)}") # 验证真实姓名 valid, error = validate_real_name(real_name) if not valid: raise HTTPException(status_code=400, detail=error) # 验证身份证号 valid, error = IDCardValidator.validate(id_card) if not valid: raise HTTPException(status_code=400, detail=error) # 统一转大写 id_card = id_card.upper().strip() # AES加密身份证号用于存储 encrypted_id_card = encryption_service.aes_encrypt(id_card) # 检查身份证号是否已被使用(需要解密所有已认证用户的身份证号进行比对) verified_users = self.db.query(User).filter( User.is_verified == "verified", User.id != user_id, User.id_card.isnot(None) ).all() for verified_user in verified_users: try: existing_id_card = encryption_service.aes_decrypt(verified_user.id_card) if existing_id_card == id_card: raise HTTPException(status_code=409, detail="该身份证号已被其他用户使用") except: # 如果解密失败,跳过(可能是旧数据) continue # 更新用户信息 user.real_name = real_name user.id_card = encrypted_id_card # 存储加密后的身份证号 user.is_verified = "verified" from datetime import datetime user.verified_at = datetime.now() self.db.commit() self.db.refresh(user) return user def delete_user(self, user_id: str) -> None: """删除用户(彻底从数据库移除)。""" user = self.get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") # 直接删除用户记录 self.db.delete(user) self.db.commit() def init_admin_user(self) -> None: """初始化管理员用户""" admin = self.db.query(User).filter(User.username == "admin").first() if not admin: try: from app.services import local_config_service as _lcs admin_apikey = _lcs.get_admin_api_key() or os.getenv("ADMIN_API_KEY") except Exception: admin_apikey = os.getenv("ADMIN_API_KEY") admin = User( id=str(uuid.uuid4()), username="admin", password_hash=AuthService.hash_password("admin123"), nickname="管理员", email="wxcz@wxcz.com", apikey=admin_apikey, registration_date=date.today() ) self.db.add(admin) self.db.commit()