auth.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. from __future__ import annotations
  2. import os
  3. from datetime import datetime, timedelta, timezone
  4. import bcrypt
  5. from fastapi import APIRouter, HTTPException
  6. from jose import jwt
  7. from pydantic import BaseModel
  8. from app.db import get_pool
  9. router = APIRouter(tags=["auth"])
  10. SECRET_KEY = os.environ.get("JWT_SECRET", "change-me-in-production-please")
  11. ALGORITHM = "HS256"
  12. TOKEN_EXPIRE_HOURS = 12
  13. class LoginIn(BaseModel):
  14. username: str
  15. password: str
  16. class TokenOut(BaseModel):
  17. access_token: str
  18. token_type: str = "bearer"
  19. def _hash_password(plain: str) -> str:
  20. return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode()
  21. def _verify_password(plain: str, hashed: str) -> bool:
  22. return bcrypt.checkpw(plain.encode(), hashed.encode())
  23. def _create_token(username: str) -> str:
  24. expire = datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRE_HOURS)
  25. return jwt.encode({"sub": username, "exp": expire}, SECRET_KEY, algorithm=ALGORITHM)
  26. async def ensure_admin_user() -> None:
  27. """启动时确保 admin 账户存在,不存在则创建。"""
  28. pool = get_pool()
  29. row = await pool.fetchrow("SELECT id FROM crawl.users WHERE username = 'admin'")
  30. if not row:
  31. hashed = _hash_password("admin123")
  32. await pool.execute(
  33. "INSERT INTO crawl.users (username, password_hash) VALUES ('admin', $1)",
  34. hashed,
  35. )
  36. @router.post("/auth/login", response_model=TokenOut)
  37. async def login(body: LoginIn) -> TokenOut:
  38. pool = get_pool()
  39. row = await pool.fetchrow(
  40. "SELECT password_hash FROM crawl.users WHERE username = $1", body.username
  41. )
  42. if not row or not _verify_password(body.password, row["password_hash"]):
  43. raise HTTPException(status_code=401, detail="用户名或密码错误")
  44. return TokenOut(access_token=_create_token(body.username))