| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568 |
- """
- 授权管理视图路由
- 包含:SSO验证、授权码生成、Token管理、用户信息获取
- """
- import sys
- import os
- # 添加src目录到Python路径
- sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))
- sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
- import logging
- import json
- import secrets
- from fastapi import APIRouter, Depends, HTTPException, Request, Response
- from fastapi.responses import HTMLResponse, RedirectResponse
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
- from typing import Optional
- from datetime import datetime, timezone
- from app.services.jwt_token import verify_token, create_access_token, ACCESS_TOKEN_EXPIRE_MINUTES
- from app.services.oauth_service import OAuthService
- from app.base.async_mysql_connection import get_db_connection
- from app.utils.auth_dependency import get_current_user_with_refresh
- # 获取logger
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/oauth", tags=["授权管理"])
- security = HTTPBearer()
- security_optional = HTTPBearer(auto_error=False)
- # OAuth2 授权端点
- @router.get("/authorize")
- async def oauth_authorize(
- response_type: str,
- client_id: str,
- redirect_uri: str,
- scope: str = "profile",
- state: str = None
- ):
- """OAuth2授权端点"""
- try:
- logger.info(f"OAuth授权请求: client_id={client_id}, redirect_uri={redirect_uri}, scope={scope}")
-
- # 验证必要参数
- if not response_type or not client_id or not redirect_uri:
- error_url = f"{redirect_uri}?error=invalid_request&error_description=Missing required parameters"
- if state:
- error_url += f"&state={state}"
- return {"error": "invalid_request", "redirect_url": error_url}
-
- # 验证response_type
- if response_type != "code":
- error_url = f"{redirect_uri}?error=unsupported_response_type&error_description=Only authorization code flow is supported"
- if state:
- error_url += f"&state={state}"
- return {"error": "unsupported_response_type", "redirect_url": error_url}
-
- # 调用 service 层验证客户端和重定向URI
- oauth_service = OAuthService()
- success, app_info, message = await oauth_service.validate_client_and_redirect_uri(client_id, redirect_uri)
-
- if not success:
- error_url = f"{redirect_uri}?error=invalid_client&error_description={message}"
- if state:
- error_url += f"&state={state}"
- return {"error": "invalid_client", "redirect_url": error_url}
-
- # 验证scope
- app_scopes = app_info.get("scope", [])
- requested_scopes = scope.split() if scope else []
- invalid_scopes = [s for s in requested_scopes if s not in app_scopes]
- if invalid_scopes:
- error_url = f"{redirect_uri}?error=invalid_scope&error_description=Invalid scope: {' '.join(invalid_scopes)}"
- if state:
- error_url += f"&state={state}"
- return {"error": "invalid_scope", "redirect_url": error_url}
-
- # TODO: 检查用户登录状态
- # 这里应该检查用户是否已登录(通过session或cookie)
- # 如果未登录,应该重定向到登录页面
-
- # 临时方案:返回登录页面,让用户先登录
- # 生产环境应该使用session管理
-
- # 构建登录页面URL,登录后返回授权页面
- login_page_url = f"/oauth/login?response_type={response_type}&client_id={client_id}&redirect_uri={redirect_uri}&scope={scope}"
- if state:
- login_page_url += f"&state={state}"
-
- logger.info(f"需要用户登录,重定向到登录页面: {login_page_url}")
-
- return RedirectResponse(url=login_page_url, status_code=302)
-
- # 非受信任应用需要用户授权确认
- # 这里返回授权页面HTML
- # 为简化,暂时跳过授权确认页面,直接跳转到登录
-
- except Exception as e:
- logger.exception("OAuth授权错误")
- error_url = f"{redirect_uri}?error=server_error&error_description=Internal server error"
- if state:
- error_url += f"&state={state}"
- return {"error": "server_error", "redirect_url": error_url}
- @router.get("/login")
- async def oauth_login_page(
- response_type: str,
- client_id: str,
- redirect_uri: str,
- scope: str = "profile",
- state: str = None
- ):
- """OAuth2登录页面"""
- try:
- logger.info(f"显示OAuth登录页面: client_id={client_id}")
-
- # 调用 service 层获取应用信息
- oauth_service = OAuthService()
- success, app_info, message = await oauth_service.validate_client_and_redirect_uri(client_id, redirect_uri)
-
- app_name = app_info.get("name", "未知应用") if success else "未知应用"
-
- # 构建登录页面HTML
- login_html = f"""
- <!DOCTYPE html>
- <html lang="zh-CN">
- <head>
- <meta charset="UTF-8">
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
- <title>四川路桥AI后台管理平台 - {app_name}</title>
- <style>
- body {{
- font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
- margin: 0;
- padding: 20px;
- min-height: 100vh;
- display: flex;
- align-items: center;
- justify-content: center;
- }}
- .login-container {{
- background: white;
- border-radius: 15px;
- padding: 40px;
- box-shadow: 0 20px 40px rgba(0, 0, 0, 0.1);
- max-width: 400px;
- width: 100%;
- }}
- .login-header {{
- text-align: center;
- margin-bottom: 30px;
- }}
- .login-header h1 {{
- color: #333;
- margin-bottom: 10px;
- }}
- .app-info {{
- background: #f8f9fa;
- padding: 15px;
- border-radius: 8px;
- margin-bottom: 20px;
- text-align: center;
- }}
- .form-group {{
- margin-bottom: 20px;
- }}
- .form-group label {{
- display: block;
- margin-bottom: 5px;
- font-weight: 500;
- color: #333;
- }}
- .form-group input {{
- width: 100%;
- padding: 12px;
- border: 1px solid #ddd;
- border-radius: 6px;
- font-size: 16px;
- box-sizing: border-box;
- }}
- .form-group input:focus {{
- outline: none;
- border-color: #007bff;
- box-shadow: 0 0 0 2px rgba(0, 123, 255, 0.25);
- }}
- .btn {{
- width: 100%;
- padding: 12px;
- background: #007bff;
- color: white;
- border: none;
- border-radius: 6px;
- font-size: 16px;
- font-weight: 500;
- cursor: pointer;
- transition: background 0.3s;
- }}
- .btn:hover {{
- background: #0056b3;
- }}
- .btn:disabled {{
- background: #6c757d;
- cursor: not-allowed;
- }}
- .error-message {{
- color: #dc3545;
- font-size: 14px;
- margin-top: 10px;
- text-align: center;
- }}
- .success-message {{
- color: #28a745;
- font-size: 14px;
- margin-top: 10px;
- text-align: center;
- }}
- </style>
- </head>
- <body>
- <div class="login-container">
- <div class="login-header">
- <h2>🔐 四川路桥AI后台管理平台</h2>
- <p>请登录以继续访问应用</p>
- </div>
-
- <div class="app-info">
- <strong>{app_name}</strong> 请求访问您的账户
- </div>
-
- <form id="loginForm" onsubmit="handleLogin(event)">
- <div class="form-group">
- <label for="username">用户名或邮箱</label>
- <input type="text" id="username" name="username" required>
- </div>
-
- <div class="form-group">
- <label for="password">密码</label>
- <input type="password" id="password" name="password" required>
- </div>
-
- <button type="submit" class="btn" id="loginBtn">登录</button>
-
- <div id="message"></div>
- </form>
-
- <div style="margin-top: 20px; text-align: center; font-size: 14px; color: #666;">
- <p>测试账号: admin / Admin123456 | admin123</p>
- </div>
- </div>
-
- <script>
- async function handleLogin(event) {{
- event.preventDefault();
-
- const loginBtn = document.getElementById('loginBtn');
- const messageDiv = document.getElementById('message');
-
- loginBtn.disabled = true;
- loginBtn.textContent = '登录中...';
- messageDiv.innerHTML = '';
-
- const formData = new FormData(event.target);
- const loginData = {{
- username: formData.get('username'),
- password: formData.get('password'),
- remember_me: false
- }};
-
- try {{
- // 调用登录API
- const response = await fetch('/api/v1/auth/login', {{
- method: 'POST',
- headers: {{
- 'Content-Type': 'application/json'
- }},
- body: JSON.stringify(loginData)
- }});
-
- const result = await response.json();
- if (result.code === 0 || result.code === '0' || result.code === '000000') {{
- messageDiv.innerHTML = '<div class="success-message">登录成功,正在跳转...</div>';
-
- // 登录成功后,重定向到授权页面
- const authUrl = `/oauth/authorize/authenticated?response_type={response_type}&client_id={client_id}&redirect_uri={redirect_uri}&scope={scope}&state={state or ''}&access_token=${{result.data.access_token}}`;
-
- setTimeout(() => {{
- window.location.href = authUrl;
- }}, 1000);
- }} else {{
- messageDiv.innerHTML = `<div class="error-message">${{result.message}}</div>`;
- }}
- }} catch (error) {{
- messageDiv.innerHTML = '<div class="error-message">登录失败,请重试</div>';
- }} finally {{
- loginBtn.disabled = false;
- loginBtn.textContent = '登录';
- }}
- }}
- </script>
- </body>
- </html>
- """
-
- from fastapi.responses import HTMLResponse
- return HTMLResponse(content=login_html)
-
- except Exception as e:
- logger.exception("OAuth登录页面错误")
- return {"error": "server_error", "message": "服务器内部错误"}
- @router.get("/authorize/authenticated")
- async def oauth_authorize_authenticated(
- response_type: str,
- client_id: str,
- redirect_uri: str,
- access_token: str,
- scope: str = "profile",
- state: str = None
- ):
- """用户已登录后的授权处理"""
- try:
- logger.info(f"用户已登录,处理授权: client_id={client_id}")
-
- # 验证访问令牌
- payload = verify_token(access_token)
- if not payload:
- error_url = f"{redirect_uri}?error=invalid_token&error_description=Invalid access token"
- if state:
- error_url += f"&state={state}"
- return RedirectResponse(url=error_url, status_code=302)
-
- user_id = payload.get("sub")
- username = payload.get("username", "")
-
- logger.info(f"用户已验证: {username} ({user_id})")
-
- # 调用 service 层获取应用信息
- oauth_service = OAuthService()
- success, app_info, message = await oauth_service.validate_client_and_redirect_uri(client_id, redirect_uri)
-
- if not success:
- error_url = f"{redirect_uri}?error=invalid_client&error_description={message}"
- if state:
- error_url += f"&state={state}"
- return RedirectResponse(url=error_url, status_code=302)
-
- app_name = app_info.get("name", "")
- is_trusted = app_info.get("is_trusted", False)
-
- # 如果是受信任应用,直接授权
- if is_trusted:
- # 生成授权码
- auth_code = secrets.token_urlsafe(32)
-
- # 调用 service 层存储授权码
- await oauth_service.store_authorization_code(user_id, client_id, auth_code, redirect_uri, scope)
-
- # 重定向回应用
- callback_url = f"{redirect_uri}?code={auth_code}"
- if state:
- callback_url += f"&state={state}"
-
- logger.info(f"受信任应用自动授权: {callback_url}")
-
- return RedirectResponse(url=callback_url, status_code=302)
-
- # 非受信任应用,显示授权确认页面
- # 为简化,暂时也直接授权
- auth_code = secrets.token_urlsafe(32)
-
- # 调用 service 层存储授权码
- await oauth_service.store_authorization_code(user_id, client_id, auth_code, redirect_uri, scope)
-
- callback_url = f"{redirect_uri}?code={auth_code}"
- if state:
- callback_url += f"&state={state}"
-
- logger.info(f"用户授权完成: {callback_url}")
-
- return RedirectResponse(url=callback_url, status_code=302)
-
- except Exception as e:
- logger.exception("授权处理错误")
- error_url = f"{redirect_uri}?error=server_error&error_description=Authorization failed"
- if state:
- error_url += f"&state={state}"
- return RedirectResponse(url=error_url, status_code=302)
- async def oauth_approve(
- client_id: str,
- redirect_uri: str,
- scope: str = "profile",
- state: str = None
- ):
- """用户同意授权"""
- try:
- logger.info(f"用户同意授权: client_id={client_id}")
-
- # 生成授权码
- auth_code = secrets.token_urlsafe(32)
-
- # TODO: 将授权码存储到数据库,关联用户和应用
- # 这里简化处理,实际应该:
- # 1. 验证用户登录状态
- # 2. 将授权码存储到数据库
- # 3. 设置过期时间(通常10分钟)
-
- # 构建回调URL
- callback_url = f"{redirect_uri}?code={auth_code}"
- if state:
- callback_url += f"&state={state}"
-
- logger.info(f"重定向到: {callback_url}")
-
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=callback_url, status_code=302)
-
- except Exception as e:
- logger.exception("授权确认错误")
- error_url = f"{redirect_uri}?error=server_error&error_description=Authorization failed"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
- @router.get("/authorize/deny")
- async def oauth_deny(
- client_id: str,
- redirect_uri: str,
- state: str = None
- ):
- """用户拒绝授权"""
- try:
- logger.info(f"用户拒绝授权: client_id={client_id}")
-
- # 构建错误回调URL
- error_url = f"{redirect_uri}?error=access_denied&error_description=User denied authorization"
- if state:
- error_url += f"&state={state}"
-
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
-
- except Exception as e:
- logger.exception("拒绝授权错误")
- error_url = f"{redirect_uri}?error=server_error&error_description=Authorization failed"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
- @router.post("/token")
- async def oauth_token(request: Request):
- """OAuth2令牌端点"""
- try:
- # 获取请求数据
- form_data = await request.form()
-
- grant_type = form_data.get("grant_type")
- code = form_data.get("code")
- redirect_uri = form_data.get("redirect_uri")
- client_id = form_data.get("client_id")
- client_secret = form_data.get("client_secret")
-
- logger.info(f"令牌请求: grant_type={grant_type}, client_id={client_id}")
-
- # 验证grant_type
- if grant_type != "authorization_code":
- return {
- "error": "unsupported_grant_type",
- "error_description": "Only authorization_code grant type is supported"
- }
-
- # 验证必要参数
- if not code or not redirect_uri or not client_id:
- return {
- "error": "invalid_request",
- "error_description": "Missing required parameters"
- }
-
- # 调用 service 层验证客户端凭据
- oauth_service = OAuthService()
- success, app_info, message = await oauth_service.validate_client_credentials(client_id, client_secret)
-
- if not success:
- return {
- "error": "invalid_client",
- "error_description": message
- }
-
- # 验证redirect_uri
- redirect_uris = app_info.get("redirect_uris", [])
- if redirect_uri not in redirect_uris:
- return {
- "error": "invalid_grant",
- "error_description": "Invalid redirect_uri"
- }
-
- # 调用 service 层验证授权码
- success, user_id, message = await oauth_service.validate_authorization_code(code, client_id, redirect_uri)
-
- if not success:
- return {
- "error": "invalid_grant",
- "error_description": message
- }
-
- # 调用 service 层生成访问令牌
- access_token = await oauth_service.generate_access_token(user_id, client_id, "profile email")
- refresh_token = secrets.token_urlsafe(32)
-
- # TODO: 将令牌存储到数据库
-
- # 返回令牌响应
- token_response = {
- "access_token": access_token,
- "token_type": "Bearer",
- "expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
- "refresh_token": refresh_token,
- "scope": "profile email"
- }
-
- logger.info(f"令牌生成成功: {access_token[:50]}...")
-
- return token_response
-
- except Exception as e:
- logger.exception("令牌生成错误")
- return {
- "error": "server_error",
- "error_description": "Internal server error"
- }
- @router.get("/userinfo")
- async def oauth_userinfo(current_user: dict = Depends(get_current_user_with_refresh)):
- """OAuth2用户信息端点"""
- try:
- user_id = current_user.get("sub")
- client_id = current_user.get("client_id")
- scope = current_user.get("scope", "").split()
-
- logger.info(f"用户信息请求: user_id={user_id}, client_id={client_id}, scope={scope}")
-
- # 调用 service 层获取用户信息
- oauth_service = OAuthService()
- user_info = await oauth_service.get_user_info(user_id, scope)
-
- if not user_info:
- return {
- "error": "invalid_token",
- "error_description": "User not found or inactive"
- }
-
- logger.info(f"返回用户信息: {user_info}")
-
- return user_info
-
- except Exception as e:
- logger.exception("获取用户信息错误")
- return {
- "error": "server_error",
- "error_description": "Internal server error"
- }
|