| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- """
- 用户服务层
- 提供用户CRUD操作和管理员初始化功能
- 需求: 3.1, 3.2, 3.3, 7.1, 7.2, 7.3, 7.4, 9.1, 10.1, 10.2
- """
- import os
- import uuid
- from datetime import date
- from typing import Optional
- from fastapi import HTTPException
- from sqlalchemy.orm import Session
- from app.models.user import User
- from app.schemas.user_schema import UserCreate, UserUpdate, UserResponse
- from app.services.auth_service import AuthService
- from app.services.password_security_service import PasswordSecurityService
- class UserService:
- """用户业务服务类"""
- def __init__(self, db: Session):
- self.db = db
- def create_user(self, data: UserCreate) -> User:
- """创建新用户"""
- # 检查用户名是否已存在
- if self.db.query(User).filter(User.username == data.username).first():
- raise HTTPException(status_code=409, detail="Username already exists")
- # 检查邮箱是否已存在
- if data.email:
- if self.db.query(User).filter(User.email == data.email).first():
- raise HTTPException(status_code=409, detail="Email already exists")
- # 检查手机号是否已被注册
- if data.phone:
- if self.db.query(User).filter(User.phone == data.phone).first():
- raise HTTPException(status_code=409, detail="该手机号已被注册")
- # 检查密码强度
- if PasswordSecurityService.is_password_weak(data.password):
- raise HTTPException(
- status_code=400,
- detail="密码过于简单,请设置更复杂的密码"
- )
- # 验证密码基本要求
- is_valid, errors = PasswordSecurityService.validate_password_requirements(data.password)
- if not is_valid:
- error_msg = "密码不符合要求:" + ";".join(errors)
- raise HTTPException(status_code=400, detail=error_msg)
- # 优先从 local_config db 读取 admin_api_key,没有则回退到环境变量
- try:
- from app.services import local_config_service as _lcs
- default_apikey = _lcs.get_admin_api_key() or os.getenv("ADMIN_API_KEY")
- except Exception:
- default_apikey = os.getenv("ADMIN_API_KEY")
- user = User(
- id=str(uuid.uuid4()),
- username=data.username,
- password_hash=AuthService.hash_password(data.password),
- nickname=data.nickname,
- email=data.email,
- phone=data.phone if data.phone else None,
- apikey=default_apikey,
- registration_date=date.today()
- )
- self.db.add(user)
- self.db.commit()
- self.db.refresh(user)
- return user
- def get_user_by_id(self, user_id: str) -> Optional[User]:
- """根据ID获取用户"""
- return self.db.query(User).filter(User.id == user_id).first()
- def get_user_by_username(self, username: str) -> Optional[User]:
- """根据用户名获取用户"""
- return self.db.query(User).filter(User.username == username).first()
- def update_user(self, user_id: str, data: UserUpdate) -> User:
- """更新用户信息"""
- user = self.get_user_by_id(user_id)
- if not user:
- raise HTTPException(status_code=404, detail="User not found")
- # 检查邮箱唯一性
- if data.email:
- existing = self.db.query(User).filter(
- User.email == data.email, User.id != user_id
- ).first()
- if existing:
- raise HTTPException(status_code=409, detail="Email already exists")
- # 检查手机号唯一性
- if data.phone:
- existing = self.db.query(User).filter(
- User.phone == data.phone, User.id != user_id
- ).first()
- if existing:
- raise HTTPException(status_code=409, detail="该手机号已被其他账号绑定")
- # 只更新提供的字段
- update_data = data.model_dump(exclude_unset=True)
- for field, value in update_data.items():
- setattr(user, field, value)
- self.db.commit()
- self.db.refresh(user)
- return user
- def submit_verification(self, user_id: str, encrypted_data: str) -> User:
- """
- 提交实名认证
-
- Args:
- user_id: 用户ID
- encrypted_data: RSA加密的数据(格式: real_name|id_card)
- """
- from app.services.encryption_service import encryption_service
- from app.services.id_card_validator import IDCardValidator, validate_real_name
-
- user = self.get_user_by_id(user_id)
- if not user:
- raise HTTPException(status_code=404, detail="User not found")
-
- # 检查是否已经认证通过
- if user.is_verified == "verified":
- raise HTTPException(status_code=400, detail="用户已完成实名认证")
-
- # RSA解密数据
- try:
- decrypted_data = encryption_service.rsa_decrypt(encrypted_data)
- parts = decrypted_data.split('|')
- if len(parts) != 2:
- raise ValueError("数据格式错误")
- real_name, id_card = parts
- except Exception as e:
- raise HTTPException(status_code=400, detail=f"数据解密失败: {str(e)}")
-
- # 验证真实姓名
- valid, error = validate_real_name(real_name)
- if not valid:
- raise HTTPException(status_code=400, detail=error)
-
- # 验证身份证号
- valid, error = IDCardValidator.validate(id_card)
- if not valid:
- raise HTTPException(status_code=400, detail=error)
-
- # 统一转大写
- id_card = id_card.upper().strip()
-
- # AES加密身份证号用于存储
- encrypted_id_card = encryption_service.aes_encrypt(id_card)
-
- # 检查身份证号是否已被使用(需要解密所有已认证用户的身份证号进行比对)
- verified_users = self.db.query(User).filter(
- User.is_verified == "verified",
- User.id != user_id,
- User.id_card.isnot(None)
- ).all()
-
- for verified_user in verified_users:
- try:
- existing_id_card = encryption_service.aes_decrypt(verified_user.id_card)
- if existing_id_card == id_card:
- raise HTTPException(status_code=409, detail="该身份证号已被其他用户使用")
- except:
- # 如果解密失败,跳过(可能是旧数据)
- continue
-
- # 更新用户信息
- user.real_name = real_name
- user.id_card = encrypted_id_card # 存储加密后的身份证号
- user.is_verified = "verified"
- from datetime import datetime
- user.verified_at = datetime.now()
-
- self.db.commit()
- self.db.refresh(user)
- return user
- def delete_user(self, user_id: str) -> None:
- """删除用户(彻底从数据库移除)。"""
- user = self.get_user_by_id(user_id)
- if not user:
- raise HTTPException(status_code=404, detail="User not found")
- # 直接删除用户记录
- self.db.delete(user)
- self.db.commit()
- def init_admin_user(self) -> None:
- """初始化管理员用户"""
- admin = self.db.query(User).filter(User.username == "admin").first()
- if not admin:
- try:
- from app.services import local_config_service as _lcs
- admin_apikey = _lcs.get_admin_api_key() or os.getenv("ADMIN_API_KEY")
- except Exception:
- admin_apikey = os.getenv("ADMIN_API_KEY")
- admin = User(
- id=str(uuid.uuid4()),
- username="admin",
- password_hash=AuthService.hash_password("admin123"),
- nickname="管理员",
- email="wxcz@wxcz.com",
- apikey=admin_apikey,
- registration_date=date.today()
- )
- self.db.add(admin)
- self.db.commit()
|