user.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. """
  2. User API router.
  3. Provides endpoints for user management and statistics.
  4. """
  5. from typing import List, Optional
  6. from fastapi import APIRouter, HTTPException, status, Query, Request
  7. from database import get_db_connection
  8. from schemas.user import (
  9. UserResponse,
  10. UserWithStatsResponse,
  11. UserListResponse,
  12. UserStatsResponse,
  13. AssignableUserResponse,
  14. TaskStats
  15. )
  16. router = APIRouter(
  17. prefix="/api/users",
  18. tags=["users"]
  19. )
  20. def require_admin(request: Request) -> dict:
  21. """
  22. 验证当前用户是否为管理员。
  23. Args:
  24. request: FastAPI Request 对象
  25. Returns:
  26. 当前用户信息
  27. Raises:
  28. HTTPException: 401 未认证或 403 权限不足
  29. """
  30. user = getattr(request.state, "user", None)
  31. if not user:
  32. raise HTTPException(
  33. status_code=status.HTTP_401_UNAUTHORIZED,
  34. detail="未认证"
  35. )
  36. if user["role"] != "admin":
  37. raise HTTPException(
  38. status_code=status.HTTP_403_FORBIDDEN,
  39. detail="只有管理员可以访问此接口"
  40. )
  41. return user
  42. def get_user_task_stats(cursor, user_id: str) -> TaskStats:
  43. """
  44. 获取用户的任务统计信息。
  45. Args:
  46. cursor: 数据库游标
  47. user_id: 用户ID
  48. Returns:
  49. TaskStats 对象
  50. """
  51. # 查询任务统计
  52. cursor.execute("""
  53. SELECT
  54. COUNT(*) as assigned_count,
  55. SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed_count,
  56. SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_count,
  57. SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending_count
  58. FROM tasks
  59. WHERE assigned_to = ?
  60. """, (user_id,))
  61. task_row = cursor.fetchone()
  62. assigned_count = task_row["assigned_count"] or 0
  63. completed_count = task_row["completed_count"] or 0
  64. in_progress_count = task_row["in_progress_count"] or 0
  65. pending_count = task_row["pending_count"] or 0
  66. # 查询标注数量
  67. cursor.execute("""
  68. SELECT COUNT(*) as annotation_count
  69. FROM annotations
  70. WHERE user_id = ?
  71. """, (user_id,))
  72. annotation_row = cursor.fetchone()
  73. annotation_count = annotation_row["annotation_count"] or 0
  74. # 计算完成率
  75. completion_rate = 0.0
  76. if assigned_count > 0:
  77. completion_rate = round(completed_count / assigned_count * 100, 2)
  78. return TaskStats(
  79. assigned_count=assigned_count,
  80. completed_count=completed_count,
  81. in_progress_count=in_progress_count,
  82. pending_count=pending_count,
  83. annotation_count=annotation_count,
  84. completion_rate=completion_rate
  85. )
  86. @router.get("", response_model=UserListResponse)
  87. async def list_users(
  88. request: Request,
  89. role: Optional[str] = Query(None, description="按角色筛选"),
  90. search: Optional[str] = Query(None, description="按用户名或邮箱搜索"),
  91. skip: int = Query(0, ge=0, description="跳过记录数"),
  92. limit: int = Query(50, ge=1, le=100, description="返回记录数")
  93. ):
  94. """
  95. 获取用户列表(管理员权限)。
  96. 支持按角色筛选和关键词搜索。
  97. Args:
  98. request: FastAPI Request 对象
  99. role: 角色筛选(admin/annotator/viewer)
  100. search: 搜索关键词(用户名或邮箱)
  101. skip: 分页偏移
  102. limit: 每页数量
  103. Returns:
  104. 用户列表和总数
  105. """
  106. require_admin(request)
  107. with get_db_connection() as conn:
  108. cursor = conn.cursor()
  109. # 构建查询条件
  110. where_clauses = []
  111. params = []
  112. if role:
  113. where_clauses.append("role = ?")
  114. params.append(role)
  115. if search:
  116. where_clauses.append("(username LIKE ? OR email LIKE ?)")
  117. search_pattern = f"%{search}%"
  118. params.extend([search_pattern, search_pattern])
  119. where_sql = ""
  120. if where_clauses:
  121. where_sql = "WHERE " + " AND ".join(where_clauses)
  122. # 查询总数
  123. count_sql = f"SELECT COUNT(*) as total FROM users {where_sql}"
  124. cursor.execute(count_sql, tuple(params))
  125. total = cursor.fetchone()["total"]
  126. # 查询用户列表
  127. query_sql = f"""
  128. SELECT id, username, email, role, created_at
  129. FROM users
  130. {where_sql}
  131. ORDER BY created_at DESC
  132. LIMIT ? OFFSET ?
  133. """
  134. params.extend([limit, skip])
  135. cursor.execute(query_sql, tuple(params))
  136. rows = cursor.fetchall()
  137. users = []
  138. for row in rows:
  139. # 获取每个用户的任务统计
  140. task_stats = get_user_task_stats(cursor, row["id"])
  141. users.append(UserWithStatsResponse(
  142. id=row["id"],
  143. username=row["username"],
  144. email=row["email"],
  145. role=row["role"],
  146. created_at=row["created_at"],
  147. task_stats=task_stats
  148. ))
  149. return UserListResponse(users=users, total=total)
  150. @router.get("/annotators", response_model=List[AssignableUserResponse])
  151. async def list_annotators(
  152. request: Request,
  153. search: Optional[str] = Query(None, description="按用户名或邮箱搜索")
  154. ):
  155. """
  156. 获取可分配任务的用户列表。
  157. 返回所有标注人员(annotator 角色)及其当前工作量。
  158. Args:
  159. request: FastAPI Request 对象
  160. search: 搜索关键词
  161. Returns:
  162. 可分配用户列表
  163. """
  164. # 验证用户已登录
  165. user = getattr(request.state, "user", None)
  166. if not user:
  167. raise HTTPException(
  168. status_code=status.HTTP_401_UNAUTHORIZED,
  169. detail="未认证"
  170. )
  171. with get_db_connection() as conn:
  172. cursor = conn.cursor()
  173. # 构建查询
  174. where_clauses = ["role = 'annotator'"]
  175. params = []
  176. if search:
  177. where_clauses.append("(username LIKE ? OR email LIKE ?)")
  178. search_pattern = f"%{search}%"
  179. params.extend([search_pattern, search_pattern])
  180. where_sql = "WHERE " + " AND ".join(where_clauses)
  181. # 查询用户及其任务统计
  182. query_sql = f"""
  183. SELECT
  184. u.id,
  185. u.username,
  186. u.email,
  187. COUNT(CASE WHEN t.status != 'completed' THEN 1 END) as current_task_count,
  188. COUNT(CASE WHEN t.status = 'completed' THEN 1 END) as completed_task_count
  189. FROM users u
  190. LEFT JOIN tasks t ON u.id = t.assigned_to
  191. {where_sql}
  192. GROUP BY u.id, u.username, u.email
  193. ORDER BY u.username
  194. """
  195. cursor.execute(query_sql, tuple(params))
  196. rows = cursor.fetchall()
  197. annotators = []
  198. for row in rows:
  199. annotators.append(AssignableUserResponse(
  200. id=row["id"],
  201. username=row["username"],
  202. email=row["email"],
  203. current_task_count=row["current_task_count"] or 0,
  204. completed_task_count=row["completed_task_count"] or 0
  205. ))
  206. return annotators
  207. @router.get("/{user_id}", response_model=UserWithStatsResponse)
  208. async def get_user(request: Request, user_id: str):
  209. """
  210. 获取用户详情(管理员权限)。
  211. Args:
  212. request: FastAPI Request 对象
  213. user_id: 用户ID
  214. Returns:
  215. 用户详情及统计信息
  216. """
  217. require_admin(request)
  218. with get_db_connection() as conn:
  219. cursor = conn.cursor()
  220. cursor.execute("""
  221. SELECT id, username, email, role, created_at
  222. FROM users
  223. WHERE id = ?
  224. """, (user_id,))
  225. row = cursor.fetchone()
  226. if not row:
  227. raise HTTPException(
  228. status_code=status.HTTP_404_NOT_FOUND,
  229. detail=f"用户 '{user_id}' 不存在"
  230. )
  231. task_stats = get_user_task_stats(cursor, user_id)
  232. return UserWithStatsResponse(
  233. id=row["id"],
  234. username=row["username"],
  235. email=row["email"],
  236. role=row["role"],
  237. created_at=row["created_at"],
  238. task_stats=task_stats
  239. )
  240. @router.get("/{user_id}/stats", response_model=UserStatsResponse)
  241. async def get_user_stats(request: Request, user_id: str):
  242. """
  243. 获取用户详细统计信息(管理员权限)。
  244. 包含用户基本信息、任务统计和最近任务列表。
  245. Args:
  246. request: FastAPI Request 对象
  247. user_id: 用户ID
  248. Returns:
  249. 用户详细统计信息
  250. """
  251. require_admin(request)
  252. with get_db_connection() as conn:
  253. cursor = conn.cursor()
  254. # 查询用户信息
  255. cursor.execute("""
  256. SELECT id, username, email, role, created_at
  257. FROM users
  258. WHERE id = ?
  259. """, (user_id,))
  260. row = cursor.fetchone()
  261. if not row:
  262. raise HTTPException(
  263. status_code=status.HTTP_404_NOT_FOUND,
  264. detail=f"用户 '{user_id}' 不存在"
  265. )
  266. user = UserResponse(
  267. id=row["id"],
  268. username=row["username"],
  269. email=row["email"],
  270. role=row["role"],
  271. created_at=row["created_at"]
  272. )
  273. # 获取任务统计
  274. task_stats = get_user_task_stats(cursor, user_id)
  275. # 获取最近任务
  276. cursor.execute("""
  277. SELECT
  278. t.id,
  279. t.name,
  280. t.status,
  281. t.created_at,
  282. p.name as project_name
  283. FROM tasks t
  284. LEFT JOIN projects p ON t.project_id = p.id
  285. WHERE t.assigned_to = ?
  286. ORDER BY t.created_at DESC
  287. LIMIT 10
  288. """, (user_id,))
  289. recent_tasks = []
  290. for task_row in cursor.fetchall():
  291. recent_tasks.append({
  292. "id": task_row["id"],
  293. "name": task_row["name"],
  294. "status": task_row["status"],
  295. "created_at": str(task_row["created_at"]),
  296. "project_name": task_row["project_name"]
  297. })
  298. return UserStatsResponse(
  299. user=user,
  300. task_stats=task_stats,
  301. recent_tasks=recent_tasks
  302. )