users.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. from datetime import datetime, timezone
  2. from typing import List
  3. from fastapi import APIRouter, Depends
  4. from fastapi.responses import StreamingResponse
  5. from pydantic import BaseModel
  6. from sqlmodel import select
  7. from gpustack.api.exceptions import (
  8. AlreadyExistsException,
  9. ForbiddenException,
  10. InternalServerErrorException,
  11. NotFoundException,
  12. ConflictException,
  13. )
  14. from gpustack.security import get_secret_hash
  15. from gpustack.schemas.organizations import OrganizationPublic
  16. from gpustack.schemas.principals import (
  17. OrgRole,
  18. PLATFORM_PRINCIPAL_ID,
  19. Principal,
  20. PrincipalMembership,
  21. PrincipalType,
  22. )
  23. from gpustack.server.db import async_session
  24. from gpustack.server.deps import CurrentUserDep, SessionDep, TenantContextDep
  25. from gpustack.schemas.users import (
  26. User,
  27. UserActivationUpdate,
  28. UserCreate,
  29. UserListParams,
  30. UserUpdate,
  31. UserPublic,
  32. UsersPublic,
  33. UserSelfUpdate,
  34. )
  35. from gpustack.server.services import UserService, create_user_with_principal
  36. router = APIRouter()
  37. class UserMembership(BaseModel):
  38. organization: OrganizationPublic
  39. role: OrgRole
  40. model_config = {"from_attributes": True}
  41. @router.get("", response_model=UsersPublic)
  42. async def get_users(
  43. params: UserListParams = Depends(),
  44. search: str = None,
  45. ):
  46. fuzzy_fields = {}
  47. if search:
  48. fuzzy_fields = {"username": search, "full_name": search}
  49. if params.watch:
  50. return StreamingResponse(
  51. User.streaming(fuzzy_fields=fuzzy_fields),
  52. media_type="text/event-stream",
  53. )
  54. async with async_session() as session:
  55. return await User.paginated_by_query(
  56. session=session,
  57. fuzzy_fields=fuzzy_fields,
  58. page=params.page,
  59. per_page=params.perPage,
  60. fields={
  61. "deleted_at": None,
  62. "is_system": False,
  63. },
  64. order_by=params.order_by,
  65. )
  66. @router.get("/{id}", response_model=UserPublic)
  67. async def get_user(session: SessionDep, id: int):
  68. user = await User.one_by_id(session, id)
  69. if not user:
  70. raise NotFoundException(message="User not found")
  71. return user
  72. @router.get("/{id}/memberships", response_model=List[UserMembership])
  73. async def list_user_memberships(session: SessionDep, id: int):
  74. """Admin-only: list the Orgs a user belongs to.
  75. Only ORG-kind principals are returned — the user's own
  76. USER-principal is intrinsic to them, not a "membership" anyone
  77. grants or revokes.
  78. """
  79. user = await User.one_by_id(session, id)
  80. if not user:
  81. raise NotFoundException(message="User not found")
  82. stmt = (
  83. select(PrincipalMembership, Principal)
  84. .join(
  85. Principal,
  86. Principal.id == PrincipalMembership.parent_principal_id,
  87. )
  88. .where(
  89. PrincipalMembership.member_principal_id == user.principal_id,
  90. PrincipalMembership.deleted_at.is_(None),
  91. Principal.kind == PrincipalType.ORG,
  92. Principal.deleted_at.is_(None),
  93. )
  94. )
  95. rows = (await session.exec(stmt)).all()
  96. return [
  97. UserMembership(
  98. organization=OrganizationPublic.from_principal(org),
  99. role=membership.role or OrgRole.USER,
  100. )
  101. for membership, org in rows
  102. ]
  103. @router.post("", response_model=UserPublic)
  104. async def create_user(session: SessionDep, user_in: UserCreate):
  105. existing = await User.one_by_field(session, "username", user_in.username)
  106. if existing:
  107. raise AlreadyExistsException(message=f"User {user_in.username} already exists")
  108. try:
  109. to_create = User(
  110. username=user_in.username,
  111. full_name=user_in.full_name,
  112. is_admin=user_in.is_admin,
  113. is_active=user_in.is_active,
  114. )
  115. if user_in.password:
  116. to_create.hashed_password = get_secret_hash(user_in.password)
  117. # User row + USER-principal go in one transactional helper —
  118. # ``users.principal_id`` is NOT NULL so the principal must
  119. # exist first. Admin additionally joins the platform Org as
  120. # ADMIN; regular users do NOT auto-join — admin can add them
  121. # later if shared workspace access is needed.
  122. user = await create_user_with_principal(session, to_create)
  123. if user.is_admin:
  124. now = datetime.now(timezone.utc).replace(tzinfo=None)
  125. session.add(
  126. PrincipalMembership(
  127. parent_principal_id=PLATFORM_PRINCIPAL_ID,
  128. member_principal_id=user.principal_id,
  129. role=OrgRole.ADMIN,
  130. created_at=now,
  131. updated_at=now,
  132. )
  133. )
  134. await session.commit()
  135. await session.refresh(user)
  136. except Exception as e:
  137. raise InternalServerErrorException(message=f"Failed to create user: {e}")
  138. return user
  139. @router.put("/{id}", response_model=UserPublic)
  140. async def update_user(session: SessionDep, id: int, user_in: UserUpdate):
  141. user = await User.one_by_id(session, id)
  142. if not user:
  143. raise NotFoundException(message="User not found")
  144. if (
  145. user.is_active
  146. and user_in.is_active is False
  147. and await is_only_admin_user(session, user)
  148. ):
  149. raise ConflictException(message="Cannot deactivate the only admin user")
  150. try:
  151. update_data = user_in.model_dump()
  152. if user_in.password:
  153. hashed_password = get_secret_hash(user_in.password)
  154. update_data["hashed_password"] = hashed_password
  155. del update_data["password"]
  156. del update_data["source"]
  157. await UserService(session).update(user, update_data)
  158. except Exception as e:
  159. raise InternalServerErrorException(message=f"Failed to update user: {e}")
  160. return user
  161. @router.patch("/{id}/activation", response_model=UserPublic)
  162. async def update_user_activation(
  163. session: SessionDep, id: int, activation_data: UserActivationUpdate
  164. ):
  165. """
  166. Activate or deactivate a user account.
  167. Only administrators can perform this action.
  168. """
  169. user = await User.one_by_id(session, id)
  170. if not user:
  171. raise NotFoundException(message="User not found")
  172. changed = user.is_active != activation_data.is_active
  173. if not changed:
  174. return user
  175. if (
  176. user.is_active
  177. and activation_data.is_active is False
  178. and await is_only_admin_user(session, user)
  179. ):
  180. raise ConflictException(message="Cannot deactivate the only admin user")
  181. try:
  182. await UserService(session).update(
  183. user, {"is_active": activation_data.is_active}
  184. )
  185. except Exception as e:
  186. raise InternalServerErrorException(
  187. message=f"Failed to update user activation: {e}"
  188. )
  189. return user
  190. @router.delete("/{id}")
  191. async def delete_user(session: SessionDep, id: int):
  192. user_service = UserService(session)
  193. user = await user_service.get_by_id(id)
  194. if not user:
  195. raise NotFoundException(message="User not found")
  196. if await is_only_admin_user(session, user):
  197. raise ConflictException(message="Cannot delete the only admin user")
  198. # The user's USER-principal is the canonical owner of any
  199. # personal-scope resources (models, routes, clusters, api keys).
  200. # FK cascades on ``owner_principal_id == user.principal_id`` will
  201. # take those rows with the principal when it's deleted; the
  202. # principal in turn is RESTRICT-FK'd to ``users.principal_id``, so
  203. # the user must be deleted first and the principal cleaned up
  204. # afterward.
  205. principal = await Principal.one_by_id(session, user.principal_id)
  206. try:
  207. await user_service.delete(user)
  208. if principal is not None:
  209. await principal.delete(session)
  210. except Exception as e:
  211. raise InternalServerErrorException(message=f"Failed to delete user: {e}")
  212. async def is_only_admin_user(session: SessionDep, user: User) -> bool:
  213. if not user.is_admin:
  214. return False
  215. admin_count = await User.count_by_fields(
  216. session, {"is_admin": True, "is_active": True}
  217. )
  218. return admin_count == 1
  219. me_router = APIRouter()
  220. @me_router.get("/me", response_model=UserPublic)
  221. async def get_user_me(user: CurrentUserDep):
  222. return user
  223. @me_router.put("/me", response_model=UserPublic)
  224. async def update_user_me(
  225. session: SessionDep, user: CurrentUserDep, user_in: UserSelfUpdate
  226. ):
  227. try:
  228. update_data = user_in.model_dump(exclude_none=True)
  229. if "password" in update_data:
  230. hashed_password = get_secret_hash(update_data["password"])
  231. update_data["hashed_password"] = hashed_password
  232. del update_data["password"]
  233. await UserService(session).update(user, update_data)
  234. except Exception as e:
  235. raise InternalServerErrorException(message=f"Failed to update user: {e}")
  236. return user
  237. # User-search endpoint accessible to org admins (any) and platform
  238. # admins, so the Add Member picker works without the admin-gated full
  239. # /users endpoint. Returns the standard UsersPublic page.
  240. directory_router = APIRouter()
  241. @directory_router.get("/user-directory", response_model=UsersPublic)
  242. async def list_user_directory(
  243. ctx: TenantContextDep,
  244. page: int = 1,
  245. perPage: int = 30,
  246. search: str = None,
  247. ):
  248. if not ctx.is_platform_admin and ctx.org_role != OrgRole.ADMIN:
  249. raise ForbiddenException(message="Insufficient permission")
  250. fuzzy_fields = {}
  251. if search:
  252. fuzzy_fields = {"username": search, "full_name": search}
  253. async with async_session() as session:
  254. return await User.paginated_by_query(
  255. session=session,
  256. fuzzy_fields=fuzzy_fields,
  257. page=page,
  258. per_page=perPage,
  259. fields={
  260. "deleted_at": None,
  261. "is_system": False,
  262. },
  263. )