test_user_api.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. """
  2. User API tests.
  3. Tests for user management endpoints.
  4. """
  5. import pytest
  6. import uuid
  7. from fastapi.testclient import TestClient
  8. from main import app
  9. from database import get_db_connection, init_database
  10. from services.jwt_service import JWTService
  11. import bcrypt
  12. # 测试客户端
  13. client = TestClient(app)
  14. @pytest.fixture(scope="module")
  15. def setup_database():
  16. """初始化测试数据库"""
  17. init_database()
  18. yield
  19. @pytest.fixture
  20. def admin_token(setup_database):
  21. """创建管理员用户并返回 token"""
  22. admin_id = f"admin_{uuid.uuid4().hex[:8]}"
  23. password_hash = bcrypt.hashpw("admin123".encode(), bcrypt.gensalt()).decode()
  24. with get_db_connection() as conn:
  25. cursor = conn.cursor()
  26. # 检查是否已存在
  27. cursor.execute("SELECT id FROM users WHERE username = ?", (f"test_admin_{admin_id}",))
  28. if not cursor.fetchone():
  29. cursor.execute("""
  30. INSERT INTO users (id, username, email, password_hash, role)
  31. VALUES (?, ?, ?, ?, 'admin')
  32. """, (admin_id, f"test_admin_{admin_id}", f"admin_{admin_id}@test.com", password_hash))
  33. user_data = {
  34. "id": admin_id,
  35. "username": f"test_admin_{admin_id}",
  36. "email": f"admin_{admin_id}@test.com",
  37. "role": "admin"
  38. }
  39. token = JWTService.create_access_token(user_data)
  40. yield {"token": token, "user_id": admin_id}
  41. # 清理
  42. with get_db_connection() as conn:
  43. cursor = conn.cursor()
  44. cursor.execute("DELETE FROM users WHERE id = ?", (admin_id,))
  45. @pytest.fixture
  46. def annotator_token(setup_database):
  47. """创建标注人员用户并返回 token"""
  48. annotator_id = f"annotator_{uuid.uuid4().hex[:8]}"
  49. password_hash = bcrypt.hashpw("annotator123".encode(), bcrypt.gensalt()).decode()
  50. with get_db_connection() as conn:
  51. cursor = conn.cursor()
  52. cursor.execute("""
  53. INSERT INTO users (id, username, email, password_hash, role)
  54. VALUES (?, ?, ?, ?, 'annotator')
  55. """, (annotator_id, f"test_annotator_{annotator_id}", f"annotator_{annotator_id}@test.com", password_hash))
  56. user_data = {
  57. "id": annotator_id,
  58. "username": f"test_annotator_{annotator_id}",
  59. "email": f"annotator_{annotator_id}@test.com",
  60. "role": "annotator"
  61. }
  62. token = JWTService.create_access_token(user_data)
  63. yield {"token": token, "user_id": annotator_id}
  64. # 清理
  65. with get_db_connection() as conn:
  66. cursor = conn.cursor()
  67. cursor.execute("DELETE FROM users WHERE id = ?", (annotator_id,))
  68. class TestUserListAPI:
  69. """用户列表 API 测试"""
  70. def test_list_users_without_auth(self, setup_database):
  71. """未认证时应返回 401"""
  72. response = client.get("/api/users")
  73. assert response.status_code == 401
  74. def test_list_users_as_annotator(self, annotator_token):
  75. """标注人员访问应返回 403"""
  76. headers = {"Authorization": f"Bearer {annotator_token['token']}"}
  77. response = client.get("/api/users", headers=headers)
  78. assert response.status_code == 403
  79. def test_list_users_as_admin(self, admin_token):
  80. """管理员可以获取用户列表"""
  81. headers = {"Authorization": f"Bearer {admin_token['token']}"}
  82. response = client.get("/api/users", headers=headers)
  83. assert response.status_code == 200
  84. data = response.json()
  85. assert "users" in data
  86. assert "total" in data
  87. assert isinstance(data["users"], list)
  88. def test_list_users_with_role_filter(self, admin_token):
  89. """管理员可以按角色筛选用户"""
  90. headers = {"Authorization": f"Bearer {admin_token['token']}"}
  91. response = client.get("/api/users?role=admin", headers=headers)
  92. assert response.status_code == 200
  93. data = response.json()
  94. # 所有返回的用户都应该是 admin 角色
  95. for user in data["users"]:
  96. assert user["role"] == "admin"
  97. def test_list_users_with_search(self, admin_token):
  98. """管理员可以搜索用户"""
  99. headers = {"Authorization": f"Bearer {admin_token['token']}"}
  100. response = client.get("/api/users?search=test_admin", headers=headers)
  101. assert response.status_code == 200
  102. data = response.json()
  103. # 搜索结果应该包含匹配的用户
  104. assert isinstance(data["users"], list)
  105. class TestAnnotatorsAPI:
  106. """标注人员列表 API 测试"""
  107. def test_list_annotators_without_auth(self, setup_database):
  108. """未认证时应返回 401"""
  109. response = client.get("/api/users/annotators")
  110. assert response.status_code == 401
  111. def test_list_annotators_as_admin(self, admin_token, annotator_token):
  112. """管理员可以获取标注人员列表"""
  113. headers = {"Authorization": f"Bearer {admin_token['token']}"}
  114. response = client.get("/api/users/annotators", headers=headers)
  115. assert response.status_code == 200
  116. data = response.json()
  117. assert isinstance(data, list)
  118. # 检查返回的用户包含必要字段
  119. if len(data) > 0:
  120. assert "id" in data[0]
  121. assert "username" in data[0]
  122. assert "email" in data[0]
  123. assert "current_task_count" in data[0]
  124. assert "completed_task_count" in data[0]
  125. def test_list_annotators_as_annotator(self, annotator_token):
  126. """标注人员也可以获取标注人员列表(用于查看同事)"""
  127. headers = {"Authorization": f"Bearer {annotator_token['token']}"}
  128. response = client.get("/api/users/annotators", headers=headers)
  129. assert response.status_code == 200
  130. class TestUserStatsAPI:
  131. """用户统计 API 测试"""
  132. def test_get_user_stats_without_auth(self, setup_database):
  133. """未认证时应返回 401"""
  134. response = client.get("/api/users/some_user_id/stats")
  135. assert response.status_code == 401
  136. def test_get_user_stats_as_annotator(self, annotator_token):
  137. """标注人员访问应返回 403"""
  138. headers = {"Authorization": f"Bearer {annotator_token['token']}"}
  139. response = client.get(f"/api/users/{annotator_token['user_id']}/stats", headers=headers)
  140. assert response.status_code == 403
  141. def test_get_user_stats_as_admin(self, admin_token, annotator_token):
  142. """管理员可以获取用户统计"""
  143. headers = {"Authorization": f"Bearer {admin_token['token']}"}
  144. response = client.get(f"/api/users/{annotator_token['user_id']}/stats", headers=headers)
  145. assert response.status_code == 200
  146. data = response.json()
  147. assert "user" in data
  148. assert "task_stats" in data
  149. assert "recent_tasks" in data
  150. # 检查统计字段
  151. stats = data["task_stats"]
  152. assert "assigned_count" in stats
  153. assert "completed_count" in stats
  154. assert "completion_rate" in stats
  155. def test_get_nonexistent_user_stats(self, admin_token):
  156. """获取不存在用户的统计应返回 404"""
  157. headers = {"Authorization": f"Bearer {admin_token['token']}"}
  158. response = client.get("/api/users/nonexistent_user_id/stats", headers=headers)
  159. assert response.status_code == 404
  160. class TestUserDetailAPI:
  161. """用户详情 API 测试"""
  162. def test_get_user_without_auth(self, setup_database):
  163. """未认证时应返回 401"""
  164. response = client.get("/api/users/some_user_id")
  165. assert response.status_code == 401
  166. def test_get_user_as_admin(self, admin_token, annotator_token):
  167. """管理员可以获取用户详情"""
  168. headers = {"Authorization": f"Bearer {admin_token['token']}"}
  169. response = client.get(f"/api/users/{annotator_token['user_id']}", headers=headers)
  170. assert response.status_code == 200
  171. data = response.json()
  172. assert data["id"] == annotator_token["user_id"]
  173. assert "task_stats" in data
  174. def test_get_nonexistent_user(self, admin_token):
  175. """获取不存在的用户应返回 404"""
  176. headers = {"Authorization": f"Bearer {admin_token['token']}"}
  177. response = client.get("/api/users/nonexistent_user_id", headers=headers)
  178. assert response.status_code == 404