from __future__ import annotations import os from datetime import datetime, timedelta, timezone import bcrypt from fastapi import APIRouter, HTTPException from jose import jwt from pydantic import BaseModel from app.db import get_pool router = APIRouter(tags=["auth"]) SECRET_KEY = os.environ.get("JWT_SECRET", "change-me-in-production-please") ALGORITHM = "HS256" TOKEN_EXPIRE_HOURS = 12 class LoginIn(BaseModel): username: str password: str class TokenOut(BaseModel): access_token: str token_type: str = "bearer" def _hash_password(plain: str) -> str: return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode() def _verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode(), hashed.encode()) def _create_token(username: str) -> str: expire = datetime.now(timezone.utc) + timedelta(hours=TOKEN_EXPIRE_HOURS) return jwt.encode({"sub": username, "exp": expire}, SECRET_KEY, algorithm=ALGORITHM) async def ensure_admin_user() -> None: """启动时确保 admin 账户存在,不存在则创建。""" pool = get_pool() row = await pool.fetchrow("SELECT id FROM crawl.users WHERE username = 'admin'") if not row: hashed = _hash_password("admin123") await pool.execute( "INSERT INTO crawl.users (username, password_hash) VALUES ('admin', $1)", hashed, ) @router.post("/auth/login", response_model=TokenOut) async def login(body: LoginIn) -> TokenOut: pool = get_pool() row = await pool.fetchrow( "SELECT password_hash FROM crawl.users WHERE username = $1", body.username ) if not row or not _verify_password(body.password, row["password_hash"]): raise HTTPException(status_code=401, detail="用户名或密码错误") return TokenOut(access_token=_create_token(body.username))