| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- from datetime import datetime, timezone
- from typing import List
- from fastapi import APIRouter, Depends
- from fastapi.responses import StreamingResponse
- from pydantic import BaseModel
- from sqlmodel import select
- from gpustack.api.exceptions import (
- AlreadyExistsException,
- ForbiddenException,
- InternalServerErrorException,
- NotFoundException,
- ConflictException,
- )
- from gpustack.security import get_secret_hash
- from gpustack.schemas.organizations import OrganizationPublic
- from gpustack.schemas.principals import (
- OrgRole,
- PLATFORM_PRINCIPAL_ID,
- Principal,
- PrincipalMembership,
- PrincipalType,
- )
- from gpustack.server.db import async_session
- from gpustack.server.deps import CurrentUserDep, SessionDep, TenantContextDep
- from gpustack.schemas.users import (
- User,
- UserActivationUpdate,
- UserCreate,
- UserListParams,
- UserUpdate,
- UserPublic,
- UsersPublic,
- UserSelfUpdate,
- )
- from gpustack.server.services import UserService, create_user_with_principal
- router = APIRouter()
- class UserMembership(BaseModel):
- organization: OrganizationPublic
- role: OrgRole
- model_config = {"from_attributes": True}
- @router.get("", response_model=UsersPublic)
- async def get_users(
- params: UserListParams = Depends(),
- search: str = None,
- ):
- fuzzy_fields = {}
- if search:
- fuzzy_fields = {"username": search, "full_name": search}
- if params.watch:
- return StreamingResponse(
- User.streaming(fuzzy_fields=fuzzy_fields),
- media_type="text/event-stream",
- )
- async with async_session() as session:
- return await User.paginated_by_query(
- session=session,
- fuzzy_fields=fuzzy_fields,
- page=params.page,
- per_page=params.perPage,
- fields={
- "deleted_at": None,
- "is_system": False,
- },
- order_by=params.order_by,
- )
- @router.get("/{id}", response_model=UserPublic)
- async def get_user(session: SessionDep, id: int):
- user = await User.one_by_id(session, id)
- if not user:
- raise NotFoundException(message="User not found")
- return user
- @router.get("/{id}/memberships", response_model=List[UserMembership])
- async def list_user_memberships(session: SessionDep, id: int):
- """Admin-only: list the Orgs a user belongs to.
- Only ORG-kind principals are returned — the user's own
- USER-principal is intrinsic to them, not a "membership" anyone
- grants or revokes.
- """
- user = await User.one_by_id(session, id)
- if not user:
- raise NotFoundException(message="User not found")
- stmt = (
- select(PrincipalMembership, Principal)
- .join(
- Principal,
- Principal.id == PrincipalMembership.parent_principal_id,
- )
- .where(
- PrincipalMembership.member_principal_id == user.principal_id,
- PrincipalMembership.deleted_at.is_(None),
- Principal.kind == PrincipalType.ORG,
- Principal.deleted_at.is_(None),
- )
- )
- rows = (await session.exec(stmt)).all()
- return [
- UserMembership(
- organization=OrganizationPublic.from_principal(org),
- role=membership.role or OrgRole.USER,
- )
- for membership, org in rows
- ]
- @router.post("", response_model=UserPublic)
- async def create_user(session: SessionDep, user_in: UserCreate):
- existing = await User.one_by_field(session, "username", user_in.username)
- if existing:
- raise AlreadyExistsException(message=f"User {user_in.username} already exists")
- try:
- to_create = User(
- username=user_in.username,
- full_name=user_in.full_name,
- is_admin=user_in.is_admin,
- is_active=user_in.is_active,
- )
- if user_in.password:
- to_create.hashed_password = get_secret_hash(user_in.password)
- # User row + USER-principal go in one transactional helper —
- # ``users.principal_id`` is NOT NULL so the principal must
- # exist first. Admin additionally joins the platform Org as
- # ADMIN; regular users do NOT auto-join — admin can add them
- # later if shared workspace access is needed.
- user = await create_user_with_principal(session, to_create)
- if user.is_admin:
- now = datetime.now(timezone.utc).replace(tzinfo=None)
- session.add(
- PrincipalMembership(
- parent_principal_id=PLATFORM_PRINCIPAL_ID,
- member_principal_id=user.principal_id,
- role=OrgRole.ADMIN,
- created_at=now,
- updated_at=now,
- )
- )
- await session.commit()
- await session.refresh(user)
- except Exception as e:
- raise InternalServerErrorException(message=f"Failed to create user: {e}")
- return user
- @router.put("/{id}", response_model=UserPublic)
- async def update_user(session: SessionDep, id: int, user_in: UserUpdate):
- user = await User.one_by_id(session, id)
- if not user:
- raise NotFoundException(message="User not found")
- if (
- user.is_active
- and user_in.is_active is False
- and await is_only_admin_user(session, user)
- ):
- raise ConflictException(message="Cannot deactivate the only admin user")
- try:
- update_data = user_in.model_dump()
- if user_in.password:
- hashed_password = get_secret_hash(user_in.password)
- update_data["hashed_password"] = hashed_password
- del update_data["password"]
- del update_data["source"]
- await UserService(session).update(user, update_data)
- except Exception as e:
- raise InternalServerErrorException(message=f"Failed to update user: {e}")
- return user
- @router.patch("/{id}/activation", response_model=UserPublic)
- async def update_user_activation(
- session: SessionDep, id: int, activation_data: UserActivationUpdate
- ):
- """
- Activate or deactivate a user account.
- Only administrators can perform this action.
- """
- user = await User.one_by_id(session, id)
- if not user:
- raise NotFoundException(message="User not found")
- changed = user.is_active != activation_data.is_active
- if not changed:
- return user
- if (
- user.is_active
- and activation_data.is_active is False
- and await is_only_admin_user(session, user)
- ):
- raise ConflictException(message="Cannot deactivate the only admin user")
- try:
- await UserService(session).update(
- user, {"is_active": activation_data.is_active}
- )
- except Exception as e:
- raise InternalServerErrorException(
- message=f"Failed to update user activation: {e}"
- )
- return user
- @router.delete("/{id}")
- async def delete_user(session: SessionDep, id: int):
- user_service = UserService(session)
- user = await user_service.get_by_id(id)
- if not user:
- raise NotFoundException(message="User not found")
- if await is_only_admin_user(session, user):
- raise ConflictException(message="Cannot delete the only admin user")
- # The user's USER-principal is the canonical owner of any
- # personal-scope resources (models, routes, clusters, api keys).
- # FK cascades on ``owner_principal_id == user.principal_id`` will
- # take those rows with the principal when it's deleted; the
- # principal in turn is RESTRICT-FK'd to ``users.principal_id``, so
- # the user must be deleted first and the principal cleaned up
- # afterward.
- principal = await Principal.one_by_id(session, user.principal_id)
- try:
- await user_service.delete(user)
- if principal is not None:
- await principal.delete(session)
- except Exception as e:
- raise InternalServerErrorException(message=f"Failed to delete user: {e}")
- async def is_only_admin_user(session: SessionDep, user: User) -> bool:
- if not user.is_admin:
- return False
- admin_count = await User.count_by_fields(
- session, {"is_admin": True, "is_active": True}
- )
- return admin_count == 1
- me_router = APIRouter()
- @me_router.get("/me", response_model=UserPublic)
- async def get_user_me(user: CurrentUserDep):
- return user
- @me_router.put("/me", response_model=UserPublic)
- async def update_user_me(
- session: SessionDep, user: CurrentUserDep, user_in: UserSelfUpdate
- ):
- try:
- update_data = user_in.model_dump(exclude_none=True)
- if "password" in update_data:
- hashed_password = get_secret_hash(update_data["password"])
- update_data["hashed_password"] = hashed_password
- del update_data["password"]
- await UserService(session).update(user, update_data)
- except Exception as e:
- raise InternalServerErrorException(message=f"Failed to update user: {e}")
- return user
- # User-search endpoint accessible to org admins (any) and platform
- # admins, so the Add Member picker works without the admin-gated full
- # /users endpoint. Returns the standard UsersPublic page.
- directory_router = APIRouter()
- @directory_router.get("/user-directory", response_model=UsersPublic)
- async def list_user_directory(
- ctx: TenantContextDep,
- page: int = 1,
- perPage: int = 30,
- search: str = None,
- ):
- if not ctx.is_platform_admin and ctx.org_role != OrgRole.ADMIN:
- raise ForbiddenException(message="Insufficient permission")
- fuzzy_fields = {}
- if search:
- fuzzy_fields = {"username": search, "full_name": search}
- async with async_session() as session:
- return await User.paginated_by_query(
- session=session,
- fuzzy_fields=fuzzy_fields,
- page=page,
- per_page=perPage,
- fields={
- "deleted_at": None,
- "is_system": False,
- },
- )
|