| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- from __future__ import annotations
- import os
- from datetime import datetime, timedelta, timezone
- import bcrypt
- from fastapi import APIRouter, HTTPException
- from jose import jwt
- from pydantic import BaseModel
- from app.db import get_pool
- router = APIRouter(tags=["auth"])
- SECRET_KEY = os.environ.get("JWT_SECRET", "change-me-in-production-please")
- ALGORITHM = "HS256"
- TOKEN_EXPIRE_HOURS = 12
- class LoginIn(BaseModel):
- username: str
- password: str
- class TokenOut(BaseModel):
- access_token: str
- token_type: str = "bearer"
- def _hash_password(plain: str) -> str:
- return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode()
- def _verify_password(plain: str, hashed: str) -> bool:
- return bcrypt.checkpw(plain.encode(), hashed.encode())
- def _create_token(username: str) -> str:
- expire = datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRE_HOURS)
- return jwt.encode({"sub": username, "exp": expire}, SECRET_KEY, algorithm=ALGORITHM)
- async def ensure_admin_user() -> None:
- """启动时确保 admin 账户存在,不存在则创建。"""
- pool = get_pool()
- row = await pool.fetchrow("SELECT id FROM crawl.users WHERE username = 'admin'")
- if not row:
- hashed = _hash_password("admin123")
- await pool.execute(
- "INSERT INTO crawl.users (username, password_hash) VALUES ('admin', $1)",
- hashed,
- )
- @router.post("/auth/login", response_model=TokenOut)
- async def login(body: LoginIn) -> TokenOut:
- pool = get_pool()
- row = await pool.fetchrow(
- "SELECT password_hash FROM crawl.users WHERE username = $1", body.username
- )
- if not row or not _verify_password(body.password, row["password_hash"]):
- raise HTTPException(status_code=401, detail="用户名或密码错误")
- return TokenOut(access_token=_create_token(body.username))
|