| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- """
- User API tests.
- Tests for user management endpoints.
- """
- import pytest
- import uuid
- from fastapi.testclient import TestClient
- from main import app
- from database import get_db_connection, init_database
- from services.jwt_service import JWTService
- import bcrypt
- # 测试客户端
- client = TestClient(app)
- @pytest.fixture(scope="module")
- def setup_database():
- """初始化测试数据库"""
- init_database()
- yield
- @pytest.fixture
- def admin_token(setup_database):
- """创建管理员用户并返回 token"""
- admin_id = f"admin_{uuid.uuid4().hex[:8]}"
- password_hash = bcrypt.hashpw("admin123".encode(), bcrypt.gensalt()).decode()
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
- # 检查是否已存在
- cursor.execute("SELECT id FROM users WHERE username = ?", (f"test_admin_{admin_id}",))
- if not cursor.fetchone():
- cursor.execute("""
- INSERT INTO users (id, username, email, password_hash, role)
- VALUES (?, ?, ?, ?, 'admin')
- """, (admin_id, f"test_admin_{admin_id}", f"admin_{admin_id}@test.com", password_hash))
-
- user_data = {
- "id": admin_id,
- "username": f"test_admin_{admin_id}",
- "email": f"admin_{admin_id}@test.com",
- "role": "admin"
- }
- token = JWTService.create_access_token(user_data)
-
- yield {"token": token, "user_id": admin_id}
-
- # 清理
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("DELETE FROM users WHERE id = ?", (admin_id,))
- @pytest.fixture
- def annotator_token(setup_database):
- """创建标注人员用户并返回 token"""
- annotator_id = f"annotator_{uuid.uuid4().hex[:8]}"
- password_hash = bcrypt.hashpw("annotator123".encode(), bcrypt.gensalt()).decode()
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("""
- INSERT INTO users (id, username, email, password_hash, role)
- VALUES (?, ?, ?, ?, 'annotator')
- """, (annotator_id, f"test_annotator_{annotator_id}", f"annotator_{annotator_id}@test.com", password_hash))
-
- user_data = {
- "id": annotator_id,
- "username": f"test_annotator_{annotator_id}",
- "email": f"annotator_{annotator_id}@test.com",
- "role": "annotator"
- }
- token = JWTService.create_access_token(user_data)
-
- yield {"token": token, "user_id": annotator_id}
-
- # 清理
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute("DELETE FROM users WHERE id = ?", (annotator_id,))
- class TestUserListAPI:
- """用户列表 API 测试"""
-
- def test_list_users_without_auth(self, setup_database):
- """未认证时应返回 401"""
- response = client.get("/api/users")
- assert response.status_code == 401
-
- def test_list_users_as_annotator(self, annotator_token):
- """标注人员访问应返回 403"""
- headers = {"Authorization": f"Bearer {annotator_token['token']}"}
- response = client.get("/api/users", headers=headers)
- assert response.status_code == 403
-
- def test_list_users_as_admin(self, admin_token):
- """管理员可以获取用户列表"""
- headers = {"Authorization": f"Bearer {admin_token['token']}"}
- response = client.get("/api/users", headers=headers)
- assert response.status_code == 200
- data = response.json()
- assert "users" in data
- assert "total" in data
- assert isinstance(data["users"], list)
-
- def test_list_users_with_role_filter(self, admin_token):
- """管理员可以按角色筛选用户"""
- headers = {"Authorization": f"Bearer {admin_token['token']}"}
- response = client.get("/api/users?role=admin", headers=headers)
- assert response.status_code == 200
- data = response.json()
- # 所有返回的用户都应该是 admin 角色
- for user in data["users"]:
- assert user["role"] == "admin"
-
- def test_list_users_with_search(self, admin_token):
- """管理员可以搜索用户"""
- headers = {"Authorization": f"Bearer {admin_token['token']}"}
- response = client.get("/api/users?search=test_admin", headers=headers)
- assert response.status_code == 200
- data = response.json()
- # 搜索结果应该包含匹配的用户
- assert isinstance(data["users"], list)
- class TestAnnotatorsAPI:
- """标注人员列表 API 测试"""
-
- def test_list_annotators_without_auth(self, setup_database):
- """未认证时应返回 401"""
- response = client.get("/api/users/annotators")
- assert response.status_code == 401
-
- def test_list_annotators_as_admin(self, admin_token, annotator_token):
- """管理员可以获取标注人员列表"""
- headers = {"Authorization": f"Bearer {admin_token['token']}"}
- response = client.get("/api/users/annotators", headers=headers)
- assert response.status_code == 200
- data = response.json()
- assert isinstance(data, list)
- # 检查返回的用户包含必要字段
- if len(data) > 0:
- assert "id" in data[0]
- assert "username" in data[0]
- assert "email" in data[0]
- assert "current_task_count" in data[0]
- assert "completed_task_count" in data[0]
-
- def test_list_annotators_as_annotator(self, annotator_token):
- """标注人员也可以获取标注人员列表(用于查看同事)"""
- headers = {"Authorization": f"Bearer {annotator_token['token']}"}
- response = client.get("/api/users/annotators", headers=headers)
- assert response.status_code == 200
- class TestUserStatsAPI:
- """用户统计 API 测试"""
-
- def test_get_user_stats_without_auth(self, setup_database):
- """未认证时应返回 401"""
- response = client.get("/api/users/some_user_id/stats")
- assert response.status_code == 401
-
- def test_get_user_stats_as_annotator(self, annotator_token):
- """标注人员访问应返回 403"""
- headers = {"Authorization": f"Bearer {annotator_token['token']}"}
- response = client.get(f"/api/users/{annotator_token['user_id']}/stats", headers=headers)
- assert response.status_code == 403
-
- def test_get_user_stats_as_admin(self, admin_token, annotator_token):
- """管理员可以获取用户统计"""
- headers = {"Authorization": f"Bearer {admin_token['token']}"}
- response = client.get(f"/api/users/{annotator_token['user_id']}/stats", headers=headers)
- assert response.status_code == 200
- data = response.json()
- assert "user" in data
- assert "task_stats" in data
- assert "recent_tasks" in data
- # 检查统计字段
- stats = data["task_stats"]
- assert "assigned_count" in stats
- assert "completed_count" in stats
- assert "completion_rate" in stats
-
- def test_get_nonexistent_user_stats(self, admin_token):
- """获取不存在用户的统计应返回 404"""
- headers = {"Authorization": f"Bearer {admin_token['token']}"}
- response = client.get("/api/users/nonexistent_user_id/stats", headers=headers)
- assert response.status_code == 404
- class TestUserDetailAPI:
- """用户详情 API 测试"""
-
- def test_get_user_without_auth(self, setup_database):
- """未认证时应返回 401"""
- response = client.get("/api/users/some_user_id")
- assert response.status_code == 401
-
- def test_get_user_as_admin(self, admin_token, annotator_token):
- """管理员可以获取用户详情"""
- headers = {"Authorization": f"Bearer {admin_token['token']}"}
- response = client.get(f"/api/users/{annotator_token['user_id']}", headers=headers)
- assert response.status_code == 200
- data = response.json()
- assert data["id"] == annotator_token["user_id"]
- assert "task_stats" in data
-
- def test_get_nonexistent_user(self, admin_token):
- """获取不存在的用户应返回 404"""
- headers = {"Authorization": f"Bearer {admin_token['token']}"}
- response = client.get("/api/users/nonexistent_user_id", headers=headers)
- assert response.status_code == 404
|