| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283 |
- #!/usr/bin/env python3
- """
- 完整的SSO服务器 - 包含认证API
- """
- import sys
- import os
- import socket
- import json
- import uuid
- # 添加src目录到Python路径
- sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
- # 加载环境变量
- from dotenv import load_dotenv
- load_dotenv()
- from fastapi import FastAPI, HTTPException, Depends, Request
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
- from pydantic import BaseModel
- from typing import Optional
- import hashlib
- import secrets
- # 修复JWT导入 - 确保使用正确的JWT库
- try:
- # 首先尝试使用PyJWT
- import jwt as pyjwt
- # 测试是否有encode方法
- test_token = pyjwt.encode({"test": "data"}, "secret", algorithm="HS256")
- jwt = pyjwt
- print("✅ 使用PyJWT库")
- except (ImportError, AttributeError, TypeError) as e:
- print(f"PyJWT导入失败: {e}")
- try:
- # 尝试使用python-jose
- from jose import jwt
- print("✅ 使用python-jose库")
- except ImportError as e:
- print(f"python-jose导入失败: {e}")
- # 最后尝试安装PyJWT
- print("尝试安装PyJWT...")
- import subprocess
- import sys
- try:
- subprocess.check_call([sys.executable, "-m", "pip", "install", "PyJWT"])
- import jwt
- print("✅ PyJWT安装成功")
- except Exception as install_error:
- print(f"❌ PyJWT安装失败: {install_error}")
- raise ImportError("无法导入JWT库,请手动安装: pip install PyJWT")
- from datetime import datetime, timedelta, timezone
- import pymysql
- from urllib.parse import urlparse
- # 数据模型
- class LoginRequest(BaseModel):
- username: str
- password: str
- remember_me: bool = False
- class TokenResponse(BaseModel):
- access_token: str
- refresh_token: Optional[str] = None
- token_type: str = "Bearer"
- expires_in: int
- scope: Optional[str] = None
- class UserInfo(BaseModel):
- id: str
- username: str
- email: str
- phone: Optional[str] = None
- avatar_url: Optional[str] = None
- is_active: bool
- is_superuser: bool = False
- roles: list = []
- permissions: list = []
- class ApiResponse(BaseModel):
- code: int
- message: str
- data: Optional[dict] = None
- timestamp: str
- # 配置
- JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "dev-jwt-secret-key-12345")
- ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))
- def check_port(port):
- """检查端口是否可用"""
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- try:
- s.bind(('localhost', port))
- return True
- except OSError:
- return False
- def find_available_port(start_port=8000, max_port=8010):
- """查找可用端口"""
- for port in range(start_port, max_port + 1):
- if check_port(port):
- return port
- return None
- def get_db_connection():
- """获取数据库连接"""
- try:
- database_url = os.getenv('DATABASE_URL', '')
- if not database_url:
- return None
-
- parsed = urlparse(database_url)
- config = {
- 'host': parsed.hostname or 'localhost',
- 'port': parsed.port or 3306,
- 'user': parsed.username or 'root',
- 'password': parsed.password or '',
- 'database': parsed.path[1:] if parsed.path else 'sso_db',
- 'charset': 'utf8mb4'
- }
-
- return pymysql.connect(**config)
- except Exception as e:
- print(f"数据库连接失败: {e}")
- return None
- def verify_password_simple(password: str, stored_hash: str) -> bool:
- """验证密码(简化版)"""
- if stored_hash.startswith("sha256$"):
- parts = stored_hash.split("$")
- if len(parts) == 3:
- salt = parts[1]
- expected_hash = parts[2]
- actual_hash = hashlib.sha256((password + salt).encode()).hexdigest()
- return actual_hash == expected_hash
- return False
- def create_access_token(data: dict) -> str:
- """创建访问令牌"""
- to_encode = data.copy()
- expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- to_encode.update({"exp": expire, "iat": datetime.now(timezone.utc)})
-
- encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, algorithm="HS256")
- return encoded_jwt
- def verify_token(token: str) -> Optional[dict]:
- """验证令牌"""
- try:
- payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=["HS256"])
- return payload
- except jwt.PyJWTError:
- return None
- # 创建FastAPI应用
- app = FastAPI(
- title="SSO认证中心",
- version="1.0.0",
- description="OAuth2单点登录认证中心",
- docs_url="/docs",
- redoc_url="/redoc"
- )
- # 配置CORS
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- security = HTTPBearer()
- @app.get("/")
- async def root():
- """根路径"""
- return ApiResponse(
- code=0,
- message="欢迎使用SSO认证中心",
- data={
- "name": "SSO认证中心",
- "version": "1.0.0",
- "docs": "/docs"
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.get("/health")
- async def health_check():
- """健康检查"""
- return ApiResponse(
- code=0,
- message="服务正常运行",
- data={
- "status": "healthy",
- "version": "1.0.0",
- "timestamp": datetime.now(timezone.utc).isoformat()
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.post("/api/v1/auth/login")
- async def login(request: Request, login_data: LoginRequest):
- """用户登录"""
- print(f"🔐 收到登录请求: username={login_data.username}")
-
- try:
- # 获取数据库连接
- print("📊 尝试连接数据库...")
- conn = get_db_connection()
- if not conn:
- print("❌ 数据库连接失败")
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- print("✅ 数据库连接成功")
- cursor = conn.cursor()
-
- # 查找用户
- print(f"🔍 查找用户: {login_data.username}")
- cursor.execute(
- "SELECT id, username, email, password_hash, is_active, is_superuser FROM users WHERE username = %s OR email = %s",
- (login_data.username, login_data.username)
- )
-
- user_data = cursor.fetchone()
- print(f"👤 用户查询结果: {user_data is not None}")
-
- cursor.close()
- conn.close()
-
- if not user_data:
- print("❌ 用户不存在")
- return ApiResponse(
- code=200001,
- message="用户名或密码错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id, username, email, password_hash, is_active, is_superuser = user_data
- print(f"✅ 找到用户: {username}, 激活状态: {is_active}")
-
- # 检查用户状态
- if not is_active:
- print("❌ 用户已被禁用")
- return ApiResponse(
- code=200002,
- message="用户已被禁用",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 验证密码
- print(f"🔑 验证密码,哈希格式: {password_hash[:20]}...")
- password_valid = verify_password_simple(login_data.password, password_hash)
- print(f"🔑 密码验证结果: {password_valid}")
-
- if not password_valid:
- print("❌ 密码验证失败")
- return ApiResponse(
- code=200001,
- message="用户名或密码错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 生成令牌
- print("🎫 生成访问令牌...")
- token_data = {
- "sub": user_id,
- "username": username,
- "email": email,
- "is_superuser": is_superuser
- }
-
- access_token = create_access_token(token_data)
- print(f"✅ 令牌生成成功: {access_token[:50]}...")
-
- token_response = TokenResponse(
- access_token=access_token,
- token_type="Bearer",
- expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
- scope="profile email"
- )
-
- print("🎉 登录成功")
- return ApiResponse(
- code=0,
- message="登录成功",
- data=token_response.model_dump(),
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"❌ 登录错误详情: {type(e).__name__}: {str(e)}")
- import traceback
- print(f"❌ 错误堆栈: {traceback.format_exc()}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.get("/api/v1/users/profile")
- async def get_user_profile(credentials: HTTPAuthorizationCredentials = Depends(security)):
- """获取用户资料"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
- if not user_id:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 查找用户详细信息
- cursor.execute("""
- SELECT u.id, u.username, u.email, u.phone, u.avatar_url, u.is_active, u.is_superuser,
- u.last_login_at, u.created_at, u.updated_at,
- p.real_name, p.company, p.department, p.position
- FROM users u
- LEFT JOIN user_profiles p ON u.id = p.user_id
- WHERE u.id = %s
- """, (user_id,))
-
- user_data = cursor.fetchone()
- cursor.close()
- conn.close()
-
- if not user_data:
- return ApiResponse(
- code=200001,
- message="用户不存在",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 构建用户信息
- user_info = {
- "id": user_data[0],
- "username": user_data[1],
- "email": user_data[2],
- "phone": user_data[3],
- "avatar_url": user_data[4],
- "is_active": user_data[5],
- "is_superuser": user_data[6],
- "last_login_at": user_data[7].isoformat() if user_data[7] else None,
- "created_at": user_data[8].isoformat() if user_data[8] else None,
- "updated_at": user_data[9].isoformat() if user_data[9] else None,
- "real_name": user_data[10],
- "company": user_data[11],
- "department": user_data[12],
- "position": user_data[13]
- }
-
- return ApiResponse(
- code=0,
- message="获取用户资料成功",
- data=user_info,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"获取用户资料错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.put("/api/v1/users/profile")
- async def update_user_profile(
- request: Request,
- profile_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """更新用户资料"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 更新用户基本信息
- update_fields = []
- update_values = []
-
- if 'email' in profile_data:
- update_fields.append('email = %s')
- update_values.append(profile_data['email'])
-
- if 'phone' in profile_data:
- update_fields.append('phone = %s')
- update_values.append(profile_data['phone'])
-
- if update_fields:
- update_values.append(user_id)
- cursor.execute(f"""
- UPDATE users
- SET {', '.join(update_fields)}, updated_at = NOW()
- WHERE id = %s
- """, update_values)
-
- # 更新或插入用户详情
- profile_fields = ['real_name', 'company', 'department', 'position']
- profile_updates = {k: v for k, v in profile_data.items() if k in profile_fields}
-
- if profile_updates:
- # 检查是否已有记录
- cursor.execute("SELECT id FROM user_profiles WHERE user_id = %s", (user_id,))
- profile_exists = cursor.fetchone()
-
- if profile_exists:
- # 更新现有记录
- update_fields = []
- update_values = []
- for field, value in profile_updates.items():
- update_fields.append(f'{field} = %s')
- update_values.append(value)
-
- update_values.append(user_id)
- cursor.execute(f"""
- UPDATE user_profiles
- SET {', '.join(update_fields)}, updated_at = NOW()
- WHERE user_id = %s
- """, update_values)
- else:
- # 插入新记录
- fields = ['user_id'] + list(profile_updates.keys())
- values = [user_id] + list(profile_updates.values())
- placeholders = ', '.join(['%s'] * len(values))
-
- cursor.execute(f"""
- INSERT INTO user_profiles ({', '.join(fields)}, created_at, updated_at)
- VALUES ({placeholders}, NOW(), NOW())
- """, values)
-
- conn.commit()
- cursor.close()
- conn.close()
-
- return ApiResponse(
- code=0,
- message="用户资料更新成功",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"更新用户资料错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.put("/api/v1/users/password")
- async def change_user_password(
- request: Request,
- password_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """修改用户密码"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
- old_password = password_data.get('old_password')
- new_password = password_data.get('new_password')
-
- if not old_password or not new_password:
- return ApiResponse(
- code=100001,
- message="缺少必要参数",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 验证当前密码
- cursor.execute("SELECT password_hash FROM users WHERE id = %s", (user_id,))
- result = cursor.fetchone()
-
- if not result or not verify_password_simple(old_password, result[0]):
- cursor.close()
- conn.close()
- return ApiResponse(
- code=200001,
- message="当前密码错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 生成新密码哈希
- new_password_hash = hash_password_simple(new_password)
-
- # 更新密码
- cursor.execute("""
- UPDATE users
- SET password_hash = %s, updated_at = NOW()
- WHERE id = %s
- """, (new_password_hash, user_id))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- return ApiResponse(
- code=0,
- message="密码修改成功",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"修改密码错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- def hash_password_simple(password):
- """简单的密码哈希"""
- import hashlib
- import secrets
-
- # 生成盐值
- salt = secrets.token_hex(16)
-
- # 使用SHA256哈希
- password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
-
- return f"sha256${salt}${password_hash}"
- @app.post("/api/v1/auth/logout")
- async def logout():
- """用户登出"""
- return ApiResponse(
- code=0,
- message="登出成功",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- # OAuth2 授权端点
- @app.get("/oauth/authorize")
- async def oauth_authorize(
- response_type: str,
- client_id: str,
- redirect_uri: str,
- scope: str = "profile",
- state: str = None
- ):
- """OAuth2授权端点"""
- try:
- print(f"🔐 OAuth授权请求: client_id={client_id}, redirect_uri={redirect_uri}, scope={scope}")
-
- # 验证必要参数
- if not response_type or not client_id or not redirect_uri:
- error_url = f"{redirect_uri}?error=invalid_request&error_description=Missing required parameters"
- if state:
- error_url += f"&state={state}"
- return {"error": "invalid_request", "redirect_url": error_url}
-
- # 验证response_type
- if response_type != "code":
- error_url = f"{redirect_uri}?error=unsupported_response_type&error_description=Only authorization code flow is supported"
- if state:
- error_url += f"&state={state}"
- return {"error": "unsupported_response_type", "redirect_url": error_url}
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- error_url = f"{redirect_uri}?error=server_error&error_description=Database connection failed"
- if state:
- error_url += f"&state={state}"
- return {"error": "server_error", "redirect_url": error_url}
-
- cursor = conn.cursor()
-
- # 验证client_id和redirect_uri
- cursor.execute("""
- SELECT id, name, redirect_uris, scope, is_active, is_trusted
- FROM apps
- WHERE app_key = %s AND is_active = 1
- """, (client_id,))
-
- app_data = cursor.fetchone()
- cursor.close()
- conn.close()
-
- if not app_data:
- error_url = f"{redirect_uri}?error=invalid_client&error_description=Invalid client_id"
- if state:
- error_url += f"&state={state}"
- return {"error": "invalid_client", "redirect_url": error_url}
-
- app_id, app_name, redirect_uris_json, app_scope_json, is_active, is_trusted = app_data
-
- # 验证redirect_uri
- redirect_uris = json.loads(redirect_uris_json) if redirect_uris_json else []
- if redirect_uri not in redirect_uris:
- error_url = f"{redirect_uri}?error=invalid_request&error_description=Invalid redirect_uri"
- if state:
- error_url += f"&state={state}"
- return {"error": "invalid_request", "redirect_url": error_url}
-
- # 验证scope
- app_scopes = json.loads(app_scope_json) if app_scope_json else []
- requested_scopes = scope.split() if scope else []
- invalid_scopes = [s for s in requested_scopes if s not in app_scopes]
- if invalid_scopes:
- error_url = f"{redirect_uri}?error=invalid_scope&error_description=Invalid scope: {' '.join(invalid_scopes)}"
- if state:
- error_url += f"&state={state}"
- return {"error": "invalid_scope", "redirect_url": error_url}
-
- # TODO: 检查用户登录状态
- # 这里应该检查用户是否已登录(通过session或cookie)
- # 如果未登录,应该重定向到登录页面
-
- # 临时方案:返回登录页面,让用户先登录
- # 生产环境应该使用session管理
-
- # 构建登录页面URL,登录后返回授权页面
- login_page_url = f"/oauth/login?response_type={response_type}&client_id={client_id}&redirect_uri={redirect_uri}&scope={scope}"
- if state:
- login_page_url += f"&state={state}"
-
- print(f"🔐 需要用户登录,重定向到登录页面: {login_page_url}")
-
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=login_page_url, status_code=302)
-
- # 非受信任应用需要用户授权确认
- # 这里返回授权页面HTML
- authorization_html = f"""
- <!DOCTYPE html>
- <html>
- <head>
- <title>授权确认 - SSO认证中心</title>
- <meta charset="utf-8">
- <meta name="viewport" content="width=device-width, initial-scale=1">
- <style>
- body {{
- font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
- margin: 0;
- padding: 20px;
- min-height: 100vh;
- display: flex;
- align-items: center;
- justify-content: center;
- }}
- .auth-container {{
- background: white;
- border-radius: 10px;
- padding: 40px;
- box-shadow: 0 10px 30px rgba(0, 0, 0, 0.1);
- max-width: 400px;
- width: 100%;
- }}
- .auth-header {{
- text-align: center;
- margin-bottom: 30px;
- }}
- .auth-header h1 {{
- color: #333;
- margin-bottom: 10px;
- }}
- .app-info {{
- background: #f8f9fa;
- padding: 20px;
- border-radius: 8px;
- margin-bottom: 20px;
- }}
- .scope-list {{
- list-style: none;
- padding: 0;
- margin: 10px 0;
- }}
- .scope-list li {{
- padding: 5px 0;
- color: #666;
- }}
- .scope-list li:before {{
- content: "✓ ";
- color: #28a745;
- font-weight: bold;
- }}
- .auth-buttons {{
- display: flex;
- gap: 10px;
- margin-top: 20px;
- }}
- .btn {{
- flex: 1;
- padding: 12px 20px;
- border: none;
- border-radius: 6px;
- font-size: 16px;
- cursor: pointer;
- text-decoration: none;
- text-align: center;
- display: inline-block;
- }}
- .btn-primary {{
- background: #007bff;
- color: white;
- }}
- .btn-secondary {{
- background: #6c757d;
- color: white;
- }}
- .btn:hover {{
- opacity: 0.9;
- }}
- </style>
- </head>
- <body>
- <div class="auth-container">
- <div class="auth-header">
- <h1>授权确认</h1>
- <p>应用请求访问您的账户</p>
- </div>
-
- <div class="app-info">
- <h3>{app_name}</h3>
- <p>该应用请求以下权限:</p>
- <ul class="scope-list">
- """
-
- # 添加权限列表
- scope_descriptions = {
- "profile": "访问您的基本信息(用户名、头像等)",
- "email": "访问您的邮箱地址",
- "phone": "访问您的手机号码",
- "roles": "访问您的角色和权限信息"
- }
-
- for scope_item in requested_scopes:
- description = scope_descriptions.get(scope_item, f"访问 {scope_item} 信息")
- authorization_html += f"<li>{description}</li>"
-
- authorization_html += f"""
- </ul>
- </div>
-
- <div class="auth-buttons">
- <a href="/oauth/authorize/deny?client_id={client_id}&redirect_uri={redirect_uri}&state={state or ''}" class="btn btn-secondary">拒绝</a>
- <a href="/oauth/authorize/approve?client_id={client_id}&redirect_uri={redirect_uri}&scope={scope}&state={state or ''}" class="btn btn-primary">授权</a>
- </div>
- </div>
- </body>
- </html>
- """
-
- from fastapi.responses import HTMLResponse
- return HTMLResponse(content=authorization_html)
-
- except Exception as e:
- print(f"❌ OAuth授权错误: {e}")
- error_url = f"{redirect_uri}?error=server_error&error_description=Internal server error"
- if state:
- error_url += f"&state={state}"
- return {"error": "server_error", "redirect_url": error_url}
- @app.get("/oauth/login")
- async def oauth_login_page(
- response_type: str,
- client_id: str,
- redirect_uri: str,
- scope: str = "profile",
- state: str = None
- ):
- """OAuth2登录页面"""
- try:
- print(f"🔐 显示OAuth登录页面: client_id={client_id}")
-
- # 获取应用信息
- conn = get_db_connection()
- if not conn:
- return {"error": "server_error", "message": "数据库连接失败"}
-
- cursor = conn.cursor()
- cursor.execute("SELECT name FROM apps WHERE app_key = %s", (client_id,))
- app_data = cursor.fetchone()
- cursor.close()
- conn.close()
-
- app_name = app_data[0] if app_data else "未知应用"
-
- # 构建登录页面HTML
- login_html = f"""
- <!DOCTYPE html>
- <html lang="zh-CN">
- <head>
- <meta charset="UTF-8">
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
- <title>SSO登录 - {app_name}</title>
- <style>
- body {{
- font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
- margin: 0;
- padding: 20px;
- min-height: 100vh;
- display: flex;
- align-items: center;
- justify-content: center;
- }}
- .login-container {{
- background: white;
- border-radius: 15px;
- padding: 40px;
- box-shadow: 0 20px 40px rgba(0, 0, 0, 0.1);
- max-width: 400px;
- width: 100%;
- }}
- .login-header {{
- text-align: center;
- margin-bottom: 30px;
- }}
- .login-header h1 {{
- color: #333;
- margin-bottom: 10px;
- }}
- .app-info {{
- background: #f8f9fa;
- padding: 15px;
- border-radius: 8px;
- margin-bottom: 20px;
- text-align: center;
- }}
- .form-group {{
- margin-bottom: 20px;
- }}
- .form-group label {{
- display: block;
- margin-bottom: 5px;
- font-weight: 500;
- color: #333;
- }}
- .form-group input {{
- width: 100%;
- padding: 12px;
- border: 1px solid #ddd;
- border-radius: 6px;
- font-size: 16px;
- box-sizing: border-box;
- }}
- .form-group input:focus {{
- outline: none;
- border-color: #007bff;
- box-shadow: 0 0 0 2px rgba(0, 123, 255, 0.25);
- }}
- .btn {{
- width: 100%;
- padding: 12px;
- background: #007bff;
- color: white;
- border: none;
- border-radius: 6px;
- font-size: 16px;
- font-weight: 500;
- cursor: pointer;
- transition: background 0.3s;
- }}
- .btn:hover {{
- background: #0056b3;
- }}
- .btn:disabled {{
- background: #6c757d;
- cursor: not-allowed;
- }}
- .error-message {{
- color: #dc3545;
- font-size: 14px;
- margin-top: 10px;
- text-align: center;
- }}
- .success-message {{
- color: #28a745;
- font-size: 14px;
- margin-top: 10px;
- text-align: center;
- }}
- </style>
- </head>
- <body>
- <div class="login-container">
- <div class="login-header">
- <h1>🔐 SSO登录</h1>
- <p>请登录以继续访问应用</p>
- </div>
-
- <div class="app-info">
- <strong>{app_name}</strong> 请求访问您的账户
- </div>
-
- <form id="loginForm" onsubmit="handleLogin(event)">
- <div class="form-group">
- <label for="username">用户名或邮箱</label>
- <input type="text" id="username" name="username" required>
- </div>
-
- <div class="form-group">
- <label for="password">密码</label>
- <input type="password" id="password" name="password" required>
- </div>
-
- <button type="submit" class="btn" id="loginBtn">登录</button>
-
- <div id="message"></div>
- </form>
-
- <div style="margin-top: 20px; text-align: center; font-size: 14px; color: #666;">
- <p>测试账号: admin / Admin123456</p>
- </div>
- </div>
-
- <script>
- async function handleLogin(event) {{
- event.preventDefault();
-
- const loginBtn = document.getElementById('loginBtn');
- const messageDiv = document.getElementById('message');
-
- loginBtn.disabled = true;
- loginBtn.textContent = '登录中...';
- messageDiv.innerHTML = '';
-
- const formData = new FormData(event.target);
- const loginData = {{
- username: formData.get('username'),
- password: formData.get('password'),
- remember_me: false
- }};
-
- try {{
- // 调用登录API
- const response = await fetch('/api/v1/auth/login', {{
- method: 'POST',
- headers: {{
- 'Content-Type': 'application/json'
- }},
- body: JSON.stringify(loginData)
- }});
-
- const result = await response.json();
-
- if (result.code === 0) {{
- messageDiv.innerHTML = '<div class="success-message">登录成功,正在跳转...</div>';
-
- // 登录成功后,重定向到授权页面
- const authUrl = `/oauth/authorize/authenticated?response_type={response_type}&client_id={client_id}&redirect_uri={redirect_uri}&scope={scope}&state={state or ''}&access_token=${{result.data.access_token}}`;
-
- setTimeout(() => {{
- window.location.href = authUrl;
- }}, 1000);
- }} else {{
- messageDiv.innerHTML = `<div class="error-message">${{result.message}}</div>`;
- }}
- }} catch (error) {{
- messageDiv.innerHTML = '<div class="error-message">登录失败,请重试</div>';
- }} finally {{
- loginBtn.disabled = false;
- loginBtn.textContent = '登录';
- }}
- }}
- </script>
- </body>
- </html>
- """
-
- from fastapi.responses import HTMLResponse
- return HTMLResponse(content=login_html)
-
- except Exception as e:
- print(f"❌ OAuth登录页面错误: {e}")
- return {"error": "server_error", "message": "服务器内部错误"}
- @app.get("/oauth/authorize/authenticated")
- async def oauth_authorize_authenticated(
- response_type: str,
- client_id: str,
- redirect_uri: str,
- access_token: str,
- scope: str = "profile",
- state: str = None
- ):
- """用户已登录后的授权处理"""
- try:
- print(f"🔐 用户已登录,处理授权: client_id={client_id}")
-
- # 验证访问令牌
- payload = verify_token(access_token)
- if not payload:
- error_url = f"{redirect_uri}?error=invalid_token&error_description=Invalid access token"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
-
- user_id = payload.get("sub")
- username = payload.get("username", "")
-
- print(f"✅ 用户已验证: {username} ({user_id})")
-
- # 获取应用信息
- conn = get_db_connection()
- if not conn:
- error_url = f"{redirect_uri}?error=server_error&error_description=Database connection failed"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
-
- cursor = conn.cursor()
- cursor.execute("SELECT name, is_trusted FROM apps WHERE app_key = %s", (client_id,))
- app_data = cursor.fetchone()
- cursor.close()
- conn.close()
-
- if not app_data:
- error_url = f"{redirect_uri}?error=invalid_client&error_description=Invalid client"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
-
- app_name, is_trusted = app_data
-
- # 如果是受信任应用,直接授权
- if is_trusted:
- # 生成授权码
- auth_code = secrets.token_urlsafe(32)
-
- # TODO: 将授权码存储到数据库,关联用户和应用
- # 这里简化处理,实际应该存储到数据库
-
- # 重定向回应用
- callback_url = f"{redirect_uri}?code={auth_code}"
- if state:
- callback_url += f"&state={state}"
-
- print(f"✅ 受信任应用自动授权: {callback_url}")
-
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=callback_url, status_code=302)
-
- # 非受信任应用,显示授权确认页面
- # 这里可以返回授权确认页面的HTML
- # 为简化,暂时也直接授权
- auth_code = secrets.token_urlsafe(32)
- callback_url = f"{redirect_uri}?code={auth_code}"
- if state:
- callback_url += f"&state={state}"
-
- print(f"✅ 用户授权完成: {callback_url}")
-
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=callback_url, status_code=302)
-
- except Exception as e:
- print(f"❌ 授权处理错误: {e}")
- error_url = f"{redirect_uri}?error=server_error&error_description=Authorization failed"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
- async def oauth_approve(
- client_id: str,
- redirect_uri: str,
- scope: str = "profile",
- state: str = None
- ):
- """用户同意授权"""
- try:
- print(f"✅ 用户同意授权: client_id={client_id}")
-
- # 生成授权码
- auth_code = secrets.token_urlsafe(32)
-
- # TODO: 将授权码存储到数据库,关联用户和应用
- # 这里简化处理,实际应该:
- # 1. 验证用户登录状态
- # 2. 将授权码存储到数据库
- # 3. 设置过期时间(通常10分钟)
-
- # 构建回调URL
- callback_url = f"{redirect_uri}?code={auth_code}"
- if state:
- callback_url += f"&state={state}"
-
- print(f"🔄 重定向到: {callback_url}")
-
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=callback_url, status_code=302)
-
- except Exception as e:
- print(f"❌ 授权确认错误: {e}")
- error_url = f"{redirect_uri}?error=server_error&error_description=Authorization failed"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
- @app.get("/oauth/authorize/deny")
- async def oauth_deny(
- client_id: str,
- redirect_uri: str,
- state: str = None
- ):
- """用户拒绝授权"""
- try:
- print(f"❌ 用户拒绝授权: client_id={client_id}")
-
- # 构建错误回调URL
- error_url = f"{redirect_uri}?error=access_denied&error_description=User denied authorization"
- if state:
- error_url += f"&state={state}"
-
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
-
- except Exception as e:
- print(f"❌ 拒绝授权错误: {e}")
- error_url = f"{redirect_uri}?error=server_error&error_description=Authorization failed"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
- @app.post("/oauth/token")
- async def oauth_token(request: Request):
- """OAuth2令牌端点"""
- try:
- # 获取请求数据
- form_data = await request.form()
-
- grant_type = form_data.get("grant_type")
- code = form_data.get("code")
- redirect_uri = form_data.get("redirect_uri")
- client_id = form_data.get("client_id")
- client_secret = form_data.get("client_secret")
-
- print(f"🎫 令牌请求: grant_type={grant_type}, client_id={client_id}")
-
- # 验证grant_type
- if grant_type != "authorization_code":
- return {
- "error": "unsupported_grant_type",
- "error_description": "Only authorization_code grant type is supported"
- }
-
- # 验证必要参数
- if not code or not redirect_uri or not client_id:
- return {
- "error": "invalid_request",
- "error_description": "Missing required parameters"
- }
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return {
- "error": "server_error",
- "error_description": "Database connection failed"
- }
-
- cursor = conn.cursor()
-
- # 验证客户端
- cursor.execute("""
- SELECT id, name, app_secret, redirect_uris, scope, is_active
- FROM apps
- WHERE app_key = %s AND is_active = 1
- """, (client_id,))
-
- app_data = cursor.fetchone()
-
- if not app_data:
- cursor.close()
- conn.close()
- return {
- "error": "invalid_client",
- "error_description": "Invalid client credentials"
- }
-
- app_id, app_name, stored_secret, redirect_uris_json, scope_json, is_active = app_data
-
- # 验证客户端密钥(如果提供了)
- if client_secret and client_secret != stored_secret:
- cursor.close()
- conn.close()
- return {
- "error": "invalid_client",
- "error_description": "Invalid client credentials"
- }
-
- # 验证redirect_uri
- redirect_uris = json.loads(redirect_uris_json) if redirect_uris_json else []
- if redirect_uri not in redirect_uris:
- cursor.close()
- conn.close()
- return {
- "error": "invalid_grant",
- "error_description": "Invalid redirect_uri"
- }
-
- # TODO: 验证授权码
- # 这里简化处理,实际应该:
- # 1. 从数据库查找授权码
- # 2. 验证授权码是否有效且未过期
- # 3. 验证授权码是否已被使用
- # 4. 获取关联的用户ID
-
- # 模拟用户ID(实际应该从授权码记录中获取)
- user_id = "ed6a79d3-0083-4d81-8b48-fc522f686f74" # admin用户ID
-
- # 生成访问令牌
- token_data = {
- "sub": user_id,
- "client_id": client_id,
- "scope": "profile email"
- }
-
- access_token = create_access_token(token_data)
- refresh_token = secrets.token_urlsafe(32)
-
- # TODO: 将令牌存储到数据库
-
- cursor.close()
- conn.close()
-
- # 返回令牌响应
- token_response = {
- "access_token": access_token,
- "token_type": "Bearer",
- "expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
- "refresh_token": refresh_token,
- "scope": "profile email"
- }
-
- print(f"✅ 令牌生成成功: {access_token[:50]}...")
-
- return token_response
-
- except Exception as e:
- print(f"❌ 令牌生成错误: {e}")
- return {
- "error": "server_error",
- "error_description": "Internal server error"
- }
- @app.get("/oauth/userinfo")
- async def oauth_userinfo(credentials: HTTPAuthorizationCredentials = Depends(security)):
- """OAuth2用户信息端点"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return {
- "error": "invalid_token",
- "error_description": "Invalid or expired access token"
- }
-
- user_id = payload.get("sub")
- client_id = payload.get("client_id")
- scope = payload.get("scope", "").split()
-
- print(f"👤 用户信息请求: user_id={user_id}, client_id={client_id}, scope={scope}")
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return {
- "error": "server_error",
- "error_description": "Database connection failed"
- }
-
- cursor = conn.cursor()
-
- # 查找用户信息
- cursor.execute("""
- SELECT u.id, u.username, u.email, u.phone, u.avatar_url, u.is_active,
- p.real_name, p.company, p.department, p.position
- FROM users u
- LEFT JOIN user_profiles p ON u.id = p.user_id
- WHERE u.id = %s AND u.is_active = 1
- """, (user_id,))
-
- user_data = cursor.fetchone()
- cursor.close()
- conn.close()
-
- if not user_data:
- return {
- "error": "invalid_token",
- "error_description": "User not found or inactive"
- }
-
- # 构建用户信息响应(根据scope过滤)
- user_info = {"sub": user_data[0]}
-
- if "profile" in scope:
- user_info.update({
- "username": user_data[1],
- "avatar_url": user_data[4],
- "real_name": user_data[6],
- "company": user_data[7],
- "department": user_data[8],
- "position": user_data[9]
- })
-
- if "email" in scope:
- user_info["email"] = user_data[2]
-
- if "phone" in scope:
- user_info["phone"] = user_data[3]
-
- print(f"✅ 返回用户信息: {user_info}")
-
- return user_info
-
- except Exception as e:
- print(f"❌ 获取用户信息错误: {e}")
- return {
- "error": "server_error",
- "error_description": "Internal server error"
- }
- @app.get("/api/v1/apps")
- async def get_apps(
- page: int = 1,
- page_size: int = 20,
- keyword: str = "",
- status: str = "",
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """获取应用列表"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 构建查询条件
- where_conditions = ["created_by = %s"]
- params = [user_id]
-
- if keyword:
- where_conditions.append("(name LIKE %s OR description LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%"])
-
- if status == "active":
- where_conditions.append("is_active = 1")
- elif status == "inactive":
- where_conditions.append("is_active = 0")
-
- where_clause = " AND ".join(where_conditions)
-
- # 查询总数
- cursor.execute(f"SELECT COUNT(*) FROM apps WHERE {where_clause}", params)
- total = cursor.fetchone()[0]
-
- # 查询应用列表
- offset = (page - 1) * page_size
- cursor.execute(f"""
- SELECT id, name, app_key, description, icon_url, redirect_uris, scope,
- is_active, is_trusted, access_token_expires, refresh_token_expires,
- created_at, updated_at
- FROM apps
- WHERE {where_clause}
- ORDER BY created_at DESC
- LIMIT %s OFFSET %s
- """, params + [page_size, offset])
-
- apps = []
- for row in cursor.fetchall():
- app = {
- "id": row[0],
- "name": row[1],
- "app_key": row[2],
- "description": row[3],
- "icon_url": row[4],
- "redirect_uris": json.loads(row[5]) if row[5] else [],
- "scope": json.loads(row[6]) if row[6] else [],
- "is_active": bool(row[7]),
- "is_trusted": bool(row[8]),
- "access_token_expires": row[9],
- "refresh_token_expires": row[10],
- "created_at": row[11].isoformat() if row[11] else None,
- "updated_at": row[12].isoformat() if row[12] else None,
- # 模拟统计数据
- "today_requests": secrets.randbelow(1000),
- "active_users": secrets.randbelow(100)
- }
- apps.append(app)
-
- cursor.close()
- conn.close()
-
- return ApiResponse(
- code=0,
- message="获取应用列表成功",
- data={
- "items": apps,
- "total": total,
- "page": page,
- "page_size": page_size
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"获取应用列表错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.get("/api/v1/apps/{app_id}")
- async def get_app_detail(
- app_id: str,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """获取应用详情(包含密钥)"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 查询应用详情(包含密钥)
- cursor.execute("""
- SELECT id, name, app_key, app_secret, description, icon_url,
- redirect_uris, scope, is_active, is_trusted,
- access_token_expires, refresh_token_expires,
- created_at, updated_at
- FROM apps
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- app_data = cursor.fetchone()
- cursor.close()
- conn.close()
-
- if not app_data:
- return ApiResponse(
- code=200001,
- message="应用不存在或无权限",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- app_detail = {
- "id": app_data[0],
- "name": app_data[1],
- "app_key": app_data[2],
- "app_secret": app_data[3],
- "description": app_data[4],
- "icon_url": app_data[5],
- "redirect_uris": json.loads(app_data[6]) if app_data[6] else [],
- "scope": json.loads(app_data[7]) if app_data[7] else [],
- "is_active": bool(app_data[8]),
- "is_trusted": bool(app_data[9]),
- "access_token_expires": app_data[10],
- "refresh_token_expires": app_data[11],
- "created_at": app_data[12].isoformat() if app_data[12] else None,
- "updated_at": app_data[13].isoformat() if app_data[13] else None
- }
-
- return ApiResponse(
- code=0,
- message="获取应用详情成功",
- data=app_detail,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"获取应用详情错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.post("/api/v1/apps")
- async def create_app(
- request: Request,
- app_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """创建应用"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 验证必要字段
- if not app_data.get('name') or not app_data.get('redirect_uris'):
- return ApiResponse(
- code=100001,
- message="缺少必要参数",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 生成应用ID和密钥
- app_id = str(uuid.uuid4())
- app_key = generate_random_string(32)
- app_secret = generate_random_string(64)
-
- # 插入应用记录
- cursor.execute("""
- INSERT INTO apps (
- id, name, app_key, app_secret, description, icon_url,
- redirect_uris, scope, is_active, is_trusted,
- access_token_expires, refresh_token_expires, created_by,
- created_at, updated_at
- ) VALUES (
- %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()
- )
- """, (
- app_id,
- app_data['name'],
- app_key,
- app_secret,
- app_data.get('description', ''),
- app_data.get('icon_url', ''),
- json.dumps(app_data['redirect_uris']),
- json.dumps(app_data.get('scope', ['profile'])),
- True,
- app_data.get('is_trusted', False),
- app_data.get('access_token_expires', 7200),
- app_data.get('refresh_token_expires', 2592000),
- user_id
- ))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- # 返回创建的应用信息
- app_info = {
- "id": app_id,
- "name": app_data['name'],
- "app_key": app_key,
- "app_secret": app_secret,
- "description": app_data.get('description', ''),
- "redirect_uris": app_data['redirect_uris'],
- "scope": app_data.get('scope', ['profile']),
- "is_active": True,
- "is_trusted": app_data.get('is_trusted', False)
- }
-
- return ApiResponse(
- code=0,
- message="应用创建成功",
- data=app_info,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"创建应用错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.put("/api/v1/apps/{app_id}/status")
- async def toggle_app_status(
- app_id: str,
- status_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """切换应用状态"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
- is_active = status_data.get('is_active')
-
- if is_active is None:
- return ApiResponse(
- code=100001,
- message="缺少必要参数",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 检查应用是否存在且属于当前用户
- cursor.execute("""
- SELECT id, name FROM apps
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- app_data = cursor.fetchone()
-
- if not app_data:
- cursor.close()
- conn.close()
- return ApiResponse(
- code=200001,
- message="应用不存在或无权限",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 更新应用状态
- cursor.execute("""
- UPDATE apps
- SET is_active = %s, updated_at = NOW()
- WHERE id = %s
- """, (is_active, app_id))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- action = "启用" if is_active else "禁用"
- print(f"✅ 应用状态已更新: {app_data[1]} -> {action}")
-
- return ApiResponse(
- code=0,
- message=f"应用已{action}",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"切换应用状态错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.put("/api/v1/apps/{app_id}")
- async def update_app(
- app_id: str,
- app_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """更新应用信息"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 验证必要参数
- name = app_data.get('name', '').strip()
- if not name:
- return ApiResponse(
- code=100001,
- message="应用名称不能为空",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 检查应用是否存在且属于当前用户
- cursor.execute("""
- SELECT id, name FROM apps
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- existing_app = cursor.fetchone()
-
- if not existing_app:
- cursor.close()
- conn.close()
- return ApiResponse(
- code=200001,
- message="应用不存在或无权限",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 检查应用名称是否已被其他应用使用
- cursor.execute("""
- SELECT id FROM apps
- WHERE name = %s AND created_by = %s AND id != %s
- """, (name, user_id, app_id))
-
- if cursor.fetchone():
- cursor.close()
- conn.close()
- return ApiResponse(
- code=200001,
- message="应用名称已存在",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 准备更新数据
- description = (app_data.get('description') or '').strip()
- icon_url = (app_data.get('icon_url') or '').strip()
- redirect_uris = app_data.get('redirect_uris', [])
- scope = app_data.get('scope', ['profile', 'email'])
- is_trusted = app_data.get('is_trusted', False)
- access_token_expires = app_data.get('access_token_expires', 7200)
- refresh_token_expires = app_data.get('refresh_token_expires', 2592000)
-
- # 验证回调URL
- if not redirect_uris or not isinstance(redirect_uris, list):
- cursor.close()
- conn.close()
- return ApiResponse(
- code=100001,
- message="至少需要一个回调URL",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 验证权限范围
- if not scope or not isinstance(scope, list):
- scope = ['profile', 'email']
-
- # 更新应用信息
- cursor.execute("""
- UPDATE apps
- SET name = %s, description = %s, icon_url = %s,
- redirect_uris = %s, scope = %s, is_trusted = %s,
- access_token_expires = %s, refresh_token_expires = %s,
- updated_at = NOW()
- WHERE id = %s
- """, (
- name, description, icon_url,
- json.dumps(redirect_uris), json.dumps(scope), is_trusted,
- access_token_expires, refresh_token_expires, app_id
- ))
-
- conn.commit()
-
- # 获取更新后的应用信息
- cursor.execute("""
- SELECT id, name, app_key, description, icon_url,
- redirect_uris, scope, is_active, is_trusted,
- access_token_expires, refresh_token_expires,
- created_at, updated_at
- FROM apps
- WHERE id = %s
- """, (app_id,))
-
- app_info = cursor.fetchone()
- cursor.close()
- conn.close()
-
- if app_info:
- app_result = {
- "id": app_info[0],
- "name": app_info[1],
- "app_key": app_info[2],
- "description": app_info[3],
- "icon_url": app_info[4],
- "redirect_uris": json.loads(app_info[5]) if app_info[5] else [],
- "scope": json.loads(app_info[6]) if app_info[6] else [],
- "is_active": bool(app_info[7]),
- "is_trusted": bool(app_info[8]),
- "access_token_expires": app_info[9],
- "refresh_token_expires": app_info[10],
- "created_at": app_info[11].isoformat() if app_info[11] else None,
- "updated_at": app_info[12].isoformat() if app_info[12] else None
- }
-
- print(f"✅ 应用已更新: {name}")
-
- return ApiResponse(
- code=0,
- message="应用更新成功",
- data=app_result,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- else:
- return ApiResponse(
- code=500001,
- message="获取更新后的应用信息失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"更新应用错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.delete("/api/v1/apps/{app_id}")
- async def delete_app(
- app_id: str,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """删除应用"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 检查应用是否存在且属于当前用户
- cursor.execute("""
- SELECT id, name FROM apps
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- app_data = cursor.fetchone()
-
- if not app_data:
- cursor.close()
- conn.close()
- return ApiResponse(
- code=200001,
- message="应用不存在或无权限",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 删除应用(级联删除相关数据)
- cursor.execute("DELETE FROM apps WHERE id = %s", (app_id,))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- print(f"✅ 应用已删除: {app_data[1]}")
-
- return ApiResponse(
- code=0,
- message="应用已删除",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"删除应用错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.post("/api/v1/apps/{app_id}/reset-secret")
- async def reset_app_secret(
- app_id: str,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """重置应用密钥"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 检查应用是否存在且属于当前用户
- cursor.execute("""
- SELECT id, name FROM apps
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- app_data = cursor.fetchone()
-
- if not app_data:
- cursor.close()
- conn.close()
- return ApiResponse(
- code=200001,
- message="应用不存在或无权限",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 生成新的应用密钥
- new_secret = generate_random_string(64)
-
- # 更新应用密钥
- cursor.execute("""
- UPDATE apps
- SET app_secret = %s, updated_at = NOW()
- WHERE id = %s
- """, (new_secret, app_id))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- print(f"✅ 应用密钥已重置: {app_data[1]}")
-
- return ApiResponse(
- code=0,
- message="应用密钥已重置",
- data={"app_secret": new_secret},
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"重置应用密钥错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- def generate_random_string(length=32):
- """生成随机字符串"""
- import secrets
- import string
- alphabet = string.ascii_letters + string.digits
- return ''.join(secrets.choice(alphabet) for _ in range(length))
- """获取验证码"""
- try:
- # 生成验证码
- captcha_text, captcha_image = generate_captcha()
-
- # 这里应该将验证码文本存储到缓存中(Redis或内存)
- # 为了简化,我们暂时返回固定的验证码
- captcha_id = secrets.token_hex(16)
-
- return ApiResponse(
- code=0,
- message="获取验证码成功",
- data={
- "captcha_id": captcha_id,
- "captcha_image": captcha_image,
- "captcha_text": captcha_text # 生产环境中不应该返回这个
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"生成验证码错误: {e}")
- return ApiResponse(
- code=500001,
- message="生成验证码失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- def generate_captcha():
- """生成验证码"""
- try:
- from PIL import Image, ImageDraw, ImageFont
- import io
- import base64
- import random
- import string
-
- # 生成随机验证码文本
- captcha_text = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
-
- # 创建图片
- width, height = 120, 40
- image = Image.new('RGB', (width, height), color='white')
- draw = ImageDraw.Draw(image)
-
- # 尝试使用系统字体,如果失败则使用默认字体
- try:
- # Windows系统字体
- font = ImageFont.truetype("arial.ttf", 20)
- except:
- try:
- # 备用字体
- font = ImageFont.truetype("C:/Windows/Fonts/arial.ttf", 20)
- except:
- # 使用默认字体
- font = ImageFont.load_default()
-
- # 绘制验证码文本
- text_width = draw.textlength(captcha_text, font=font)
- text_height = 20
- x = (width - text_width) // 2
- y = (height - text_height) // 2
-
- # 添加一些随机颜色
- colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7', '#DDA0DD']
- text_color = random.choice(colors)
-
- draw.text((x, y), captcha_text, fill=text_color, font=font)
-
- # 添加一些干扰线
- for _ in range(3):
- x1 = random.randint(0, width)
- y1 = random.randint(0, height)
- x2 = random.randint(0, width)
- y2 = random.randint(0, height)
- draw.line([(x1, y1), (x2, y2)], fill=random.choice(colors), width=1)
-
- # 添加一些干扰点
- for _ in range(20):
- x = random.randint(0, width)
- y = random.randint(0, height)
- draw.point((x, y), fill=random.choice(colors))
-
- # 转换为base64
- buffer = io.BytesIO()
- image.save(buffer, format='PNG')
- image_data = buffer.getvalue()
- image_base64 = base64.b64encode(image_data).decode('utf-8')
-
- return captcha_text, f"data:image/png;base64,{image_base64}"
-
- except ImportError:
- # 如果PIL不可用,返回简单的文本验证码
- captcha_text = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
- # 创建一个简单的SVG验证码
- svg_captcha = f"""
- <svg width="120" height="40" xmlns="http://www.w3.org/2000/svg">
- <rect width="120" height="40" fill="#f0f0f0" stroke="#ccc"/>
- <text x="60" y="25" font-family="Arial" font-size="18" text-anchor="middle" fill="#333">{captcha_text}</text>
- </svg>
- """
- svg_base64 = base64.b64encode(svg_captcha.encode('utf-8')).decode('utf-8')
- return captcha_text, f"data:image/svg+xml;base64,{svg_base64}"
- except Exception as e:
- print(f"生成验证码图片失败: {e}")
- # 返回默认验证码
- return "1234", ""
- if __name__ == "__main__":
- import uvicorn
-
- # 查找可用端口
- port = find_available_port()
-
- if port is None:
- print("❌ 无法找到可用端口 (8000-8010)")
- print("请手动停止占用这些端口的进程")
- sys.exit(1)
-
- print("=" * 60)
- print("🚀 SSO认证中心完整服务器")
- print("=" * 60)
- print(f"✅ 找到可用端口: {port}")
- print(f"🌐 访问地址: http://localhost:{port}")
- print(f"📚 API文档: http://localhost:{port}/docs")
- print(f"❤️ 健康检查: http://localhost:{port}/health")
- print(f"🔐 登录API: http://localhost:{port}/api/v1/auth/login")
- print("=" * 60)
- print("📝 前端配置:")
- print(f" VITE_API_BASE_URL=http://localhost:{port}")
- print("=" * 60)
- print("👤 测试账号:")
- print(" 用户名: admin")
- print(" 密码: Admin123456")
- print("=" * 60)
- print("按 Ctrl+C 停止服务器")
- print()
-
- try:
- uvicorn.run(
- app,
- host="0.0.0.0",
- port=port,
- log_level="info"
- )
- except KeyboardInterrupt:
- print("\n👋 服务器已停止")
- except Exception as e:
- print(f"❌ 启动失败: {e}")
- sys.exit(1)
- @app.get("/api/v1/auth/captcha")
- async def get_captcha():
- """获取验证码"""
- try:
- # 生成验证码
- captcha_text, captcha_image = generate_captcha()
-
- # 这里应该将验证码文本存储到缓存中(Redis或内存)
- # 为了简化,我们暂时返回固定的验证码
- captcha_id = secrets.token_hex(16)
-
- return ApiResponse(
- code=0,
- message="获取验证码成功",
- data={
- "captcha_id": captcha_id,
- "captcha_image": captcha_image,
- "captcha_text": captcha_text # 生产环境中不应该返回这个
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"生成验证码错误: {e}")
- return ApiResponse(
- code=500001,
- message="生成验证码失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- def generate_captcha():
- """生成验证码"""
- try:
- import random
- import string
- import base64
-
- # 生成随机验证码文本
- captcha_text = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
-
- # 创建一个简单的SVG验证码
- svg_captcha = f"""
- <svg width="120" height="40" xmlns="http://www.w3.org/2000/svg">
- <rect width="120" height="40" fill="#f0f0f0" stroke="#ccc"/>
- <text x="60" y="25" font-family="Arial" font-size="18" text-anchor="middle" fill="#333">{captcha_text}</text>
- </svg>
- """
- svg_base64 = base64.b64encode(svg_captcha.encode('utf-8')).decode('utf-8')
- return captcha_text, f"data:image/svg+xml;base64,{svg_base64}"
-
- except Exception as e:
- print(f"生成验证码失败: {e}")
- # 返回默认验证码
- return "1234", ""
|