"""
授权管理视图路由
包含:SSO验证、授权码生成、Token管理、用户信息获取
"""
import sys
import os
# 添加src目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
import logging
import json
import secrets
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional
from datetime import datetime, timezone
from app.services.jwt_token import verify_token, create_access_token, ACCESS_TOKEN_EXPIRE_MINUTES
from app.services.oauth_service import OAuthService
from app.base.async_mysql_connection import get_db_connection
from app.utils.auth_dependency import get_current_user_with_refresh
# 获取logger
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/oauth", tags=["授权管理"])
security = HTTPBearer()
security_optional = HTTPBearer(auto_error=False)
# OAuth2 授权端点
@router.get("/authorize")
async def oauth_authorize(
response_type: str,
client_id: str,
redirect_uri: str,
scope: str = "profile",
state: str = None
):
"""OAuth2授权端点"""
try:
logger.info(f"OAuth授权请求: client_id={client_id}, redirect_uri={redirect_uri}, scope={scope}")
# 验证必要参数
if not response_type or not client_id or not redirect_uri:
error_url = f"{redirect_uri}?error=invalid_request&error_description=Missing required parameters"
if state:
error_url += f"&state={state}"
return {"error": "invalid_request", "redirect_url": error_url}
# 验证response_type
if response_type != "code":
error_url = f"{redirect_uri}?error=unsupported_response_type&error_description=Only authorization code flow is supported"
if state:
error_url += f"&state={state}"
return {"error": "unsupported_response_type", "redirect_url": error_url}
# 调用 service 层验证客户端和重定向URI
oauth_service = OAuthService()
success, app_info, message = await oauth_service.validate_client_and_redirect_uri(client_id, redirect_uri)
if not success:
error_url = f"{redirect_uri}?error=invalid_client&error_description={message}"
if state:
error_url += f"&state={state}"
return {"error": "invalid_client", "redirect_url": error_url}
# 验证scope
app_scopes = app_info.get("scope", [])
requested_scopes = scope.split() if scope else []
invalid_scopes = [s for s in requested_scopes if s not in app_scopes]
if invalid_scopes:
error_url = f"{redirect_uri}?error=invalid_scope&error_description=Invalid scope: {' '.join(invalid_scopes)}"
if state:
error_url += f"&state={state}"
return {"error": "invalid_scope", "redirect_url": error_url}
# TODO: 检查用户登录状态
# 这里应该检查用户是否已登录(通过session或cookie)
# 如果未登录,应该重定向到登录页面
# 临时方案:返回登录页面,让用户先登录
# 生产环境应该使用session管理
# 构建登录页面URL,登录后返回授权页面
login_page_url = f"/oauth/login?response_type={response_type}&client_id={client_id}&redirect_uri={redirect_uri}&scope={scope}"
if state:
login_page_url += f"&state={state}"
logger.info(f"需要用户登录,重定向到登录页面: {login_page_url}")
return RedirectResponse(url=login_page_url, status_code=302)
# 非受信任应用需要用户授权确认
# 这里返回授权页面HTML
# 为简化,暂时跳过授权确认页面,直接跳转到登录
except Exception as e:
logger.exception("OAuth授权错误")
error_url = f"{redirect_uri}?error=server_error&error_description=Internal server error"
if state:
error_url += f"&state={state}"
return {"error": "server_error", "redirect_url": error_url}
@router.get("/login")
async def oauth_login_page(
response_type: str,
client_id: str,
redirect_uri: str,
scope: str = "profile",
state: str = None
):
"""OAuth2登录页面"""
try:
logger.info(f"显示OAuth登录页面: client_id={client_id}")
# 调用 service 层获取应用信息
oauth_service = OAuthService()
success, app_info, message = await oauth_service.validate_client_and_redirect_uri(client_id, redirect_uri)
app_name = app_info.get("name", "未知应用") if success else "未知应用"
# 构建登录页面HTML
login_html = f"""
四川路桥AI后台管理平台 - {app_name}
{app_name} 请求访问您的账户
测试账号: admin / Admin123456 | admin123
"""
from fastapi.responses import HTMLResponse
return HTMLResponse(content=login_html)
except Exception as e:
logger.exception("OAuth登录页面错误")
return {"error": "server_error", "message": "服务器内部错误"}
@router.get("/authorize/authenticated")
async def oauth_authorize_authenticated(
response_type: str,
client_id: str,
redirect_uri: str,
access_token: str,
scope: str = "profile",
state: str = None
):
"""用户已登录后的授权处理"""
try:
logger.info(f"用户已登录,处理授权: client_id={client_id}")
# 验证访问令牌
payload = verify_token(access_token)
if not payload:
error_url = f"{redirect_uri}?error=invalid_token&error_description=Invalid access token"
if state:
error_url += f"&state={state}"
return RedirectResponse(url=error_url, status_code=302)
user_id = payload.get("sub")
username = payload.get("username", "")
logger.info(f"用户已验证: {username} ({user_id})")
# 调用 service 层获取应用信息
oauth_service = OAuthService()
success, app_info, message = await oauth_service.validate_client_and_redirect_uri(client_id, redirect_uri)
if not success:
error_url = f"{redirect_uri}?error=invalid_client&error_description={message}"
if state:
error_url += f"&state={state}"
return RedirectResponse(url=error_url, status_code=302)
app_name = app_info.get("name", "")
is_trusted = app_info.get("is_trusted", False)
# 如果是受信任应用,直接授权
if is_trusted:
# 生成授权码
auth_code = secrets.token_urlsafe(32)
# 调用 service 层存储授权码
await oauth_service.store_authorization_code(user_id, client_id, auth_code, redirect_uri, scope)
# 重定向回应用
callback_url = f"{redirect_uri}?code={auth_code}"
if state:
callback_url += f"&state={state}"
logger.info(f"受信任应用自动授权: {callback_url}")
return RedirectResponse(url=callback_url, status_code=302)
# 非受信任应用,显示授权确认页面
# 为简化,暂时也直接授权
auth_code = secrets.token_urlsafe(32)
# 调用 service 层存储授权码
await oauth_service.store_authorization_code(user_id, client_id, auth_code, redirect_uri, scope)
callback_url = f"{redirect_uri}?code={auth_code}"
if state:
callback_url += f"&state={state}"
logger.info(f"用户授权完成: {callback_url}")
return RedirectResponse(url=callback_url, status_code=302)
except Exception as e:
logger.exception("授权处理错误")
error_url = f"{redirect_uri}?error=server_error&error_description=Authorization failed"
if state:
error_url += f"&state={state}"
return RedirectResponse(url=error_url, status_code=302)
async def oauth_approve(
client_id: str,
redirect_uri: str,
scope: str = "profile",
state: str = None
):
"""用户同意授权"""
try:
logger.info(f"用户同意授权: client_id={client_id}")
# 生成授权码
auth_code = secrets.token_urlsafe(32)
# TODO: 将授权码存储到数据库,关联用户和应用
# 这里简化处理,实际应该:
# 1. 验证用户登录状态
# 2. 将授权码存储到数据库
# 3. 设置过期时间(通常10分钟)
# 构建回调URL
callback_url = f"{redirect_uri}?code={auth_code}"
if state:
callback_url += f"&state={state}"
logger.info(f"重定向到: {callback_url}")
from fastapi.responses import RedirectResponse
return RedirectResponse(url=callback_url, status_code=302)
except Exception as e:
logger.exception("授权确认错误")
error_url = f"{redirect_uri}?error=server_error&error_description=Authorization failed"
if state:
error_url += f"&state={state}"
from fastapi.responses import RedirectResponse
return RedirectResponse(url=error_url, status_code=302)
@router.get("/authorize/deny")
async def oauth_deny(
client_id: str,
redirect_uri: str,
state: str = None
):
"""用户拒绝授权"""
try:
logger.info(f"用户拒绝授权: client_id={client_id}")
# 构建错误回调URL
error_url = f"{redirect_uri}?error=access_denied&error_description=User denied authorization"
if state:
error_url += f"&state={state}"
from fastapi.responses import RedirectResponse
return RedirectResponse(url=error_url, status_code=302)
except Exception as e:
logger.exception("拒绝授权错误")
error_url = f"{redirect_uri}?error=server_error&error_description=Authorization failed"
if state:
error_url += f"&state={state}"
from fastapi.responses import RedirectResponse
return RedirectResponse(url=error_url, status_code=302)
@router.post("/token")
async def oauth_token(request: Request):
"""OAuth2令牌端点"""
try:
# 获取请求数据
form_data = await request.form()
grant_type = form_data.get("grant_type")
code = form_data.get("code")
redirect_uri = form_data.get("redirect_uri")
client_id = form_data.get("client_id")
client_secret = form_data.get("client_secret")
logger.info(f"令牌请求: grant_type={grant_type}, client_id={client_id}")
# 验证grant_type
if grant_type != "authorization_code":
return {
"error": "unsupported_grant_type",
"error_description": "Only authorization_code grant type is supported"
}
# 验证必要参数
if not code or not redirect_uri or not client_id:
return {
"error": "invalid_request",
"error_description": "Missing required parameters"
}
# 调用 service 层验证客户端凭据
oauth_service = OAuthService()
success, app_info, message = await oauth_service.validate_client_credentials(client_id, client_secret)
if not success:
return {
"error": "invalid_client",
"error_description": message
}
# 验证redirect_uri
redirect_uris = app_info.get("redirect_uris", [])
if redirect_uri not in redirect_uris:
return {
"error": "invalid_grant",
"error_description": "Invalid redirect_uri"
}
# 调用 service 层验证授权码
success, user_id, message = await oauth_service.validate_authorization_code(code, client_id, redirect_uri)
if not success:
return {
"error": "invalid_grant",
"error_description": message
}
# 调用 service 层生成访问令牌
access_token = await oauth_service.generate_access_token(user_id, client_id, "profile email")
refresh_token = secrets.token_urlsafe(32)
# TODO: 将令牌存储到数据库
# 返回令牌响应
token_response = {
"access_token": access_token,
"token_type": "Bearer",
"expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
"refresh_token": refresh_token,
"scope": "profile email"
}
logger.info(f"令牌生成成功: {access_token[:50]}...")
return token_response
except Exception as e:
logger.exception("令牌生成错误")
return {
"error": "server_error",
"error_description": "Internal server error"
}
@router.get("/userinfo")
async def oauth_userinfo(current_user: dict = Depends(get_current_user_with_refresh)):
"""OAuth2用户信息端点"""
try:
user_id = current_user.get("sub")
client_id = current_user.get("client_id")
scope = current_user.get("scope", "").split()
logger.info(f"用户信息请求: user_id={user_id}, client_id={client_id}, scope={scope}")
# 调用 service 层获取用户信息
oauth_service = OAuthService()
user_info = await oauth_service.get_user_info(user_id, scope)
if not user_info:
return {
"error": "invalid_token",
"error_description": "User not found or inactive"
}
logger.info(f"返回用户信息: {user_info}")
return user_info
except Exception as e:
logger.exception("获取用户信息错误")
return {
"error": "server_error",
"error_description": "Internal server error"
}