| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605 |
- #!/usr/bin/env python3
- """
- 完整的SSO服务器 - 包含认证API
- """
- import sys
- import os
- import socket
- import json
- import uuid
- # 添加src目录到Python路径
- sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
- # 加载环境变量
- from dotenv import load_dotenv
- load_dotenv()
- from fastapi import FastAPI, HTTPException, Depends, Request
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
- from pydantic import BaseModel
- from typing import Optional, Any
- import hashlib
- import secrets
- # 修复JWT导入 - 确保使用正确的JWT库
- try:
- # 首先尝试使用PyJWT
- import jwt as pyjwt
- # 测试是否有encode方法
- test_token = pyjwt.encode({"test": "data"}, "secret", algorithm="HS256")
- jwt = pyjwt
- print("✅ 使用PyJWT库")
- except (ImportError, AttributeError, TypeError) as e:
- print(f"PyJWT导入失败: {e}")
- try:
- # 尝试使用python-jose
- from jose import jwt
- print("✅ 使用python-jose库")
- except ImportError as e:
- print(f"python-jose导入失败: {e}")
- # 最后尝试安装PyJWT
- print("尝试安装PyJWT...")
- import subprocess
- import sys
- try:
- subprocess.check_call([sys.executable, "-m", "pip", "install", "PyJWT"])
- import jwt
- print("✅ PyJWT安装成功")
- except Exception as install_error:
- print(f"❌ PyJWT安装失败: {install_error}")
- raise ImportError("无法导入JWT库,请手动安装: pip install PyJWT")
- from datetime import datetime, timedelta, timezone
- import pymysql
- from urllib.parse import urlparse
- # 导入RBAC API - 移除循环导入
- # from rbac_api import get_user_menus, get_all_menus, get_all_roles, get_user_permissions
- # 数据模型
- class LoginRequest(BaseModel):
- username: str
- password: str
- remember_me: bool = False
- class TokenResponse(BaseModel):
- access_token: str
- refresh_token: Optional[str] = None
- token_type: str = "Bearer"
- expires_in: int
- scope: Optional[str] = None
- class UserInfo(BaseModel):
- id: str
- username: str
- email: str
- phone: Optional[str] = None
- avatar_url: Optional[str] = None
- is_active: bool
- is_superuser: bool = False
- roles: list = []
- permissions: list = []
- class ApiResponse(BaseModel):
- code: int
- message: str
- data: Optional[Any] = None
- timestamp: str
- # 配置
- JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "dev-jwt-secret-key-12345")
- ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))
- def check_port(port):
- """检查端口是否可用"""
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- try:
- s.bind(('localhost', port))
- return True
- except OSError:
- return False
- def find_available_port(start_port=8000, max_port=8010):
- """查找可用端口"""
- for port in range(start_port, max_port + 1):
- if check_port(port):
- return port
- return None
- def get_db_connection():
- """获取数据库连接"""
- try:
- database_url = os.getenv('DATABASE_URL', '')
- if not database_url:
- return None
-
- parsed = urlparse(database_url)
- config = {
- 'host': parsed.hostname or 'localhost',
- 'port': parsed.port or 3306,
- 'user': parsed.username or 'root',
- 'password': parsed.password or '',
- 'database': parsed.path[1:] if parsed.path else 'sso_db',
- 'charset': 'utf8mb4'
- }
-
- return pymysql.connect(**config)
- except Exception as e:
- print(f"数据库连接失败: {e}")
- return None
- def verify_password_simple(password: str, stored_hash: str) -> bool:
- """验证密码(简化版)"""
- if stored_hash.startswith("sha256$"):
- parts = stored_hash.split("$")
- if len(parts) == 3:
- salt = parts[1]
- expected_hash = parts[2]
- actual_hash = hashlib.sha256((password + salt).encode()).hexdigest()
- return actual_hash == expected_hash
- return False
- def create_access_token(data: dict) -> str:
- """创建访问令牌"""
- to_encode = data.copy()
- expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
- to_encode.update({"exp": expire, "iat": datetime.now(timezone.utc)})
-
- encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, algorithm="HS256")
- return encoded_jwt
- def verify_token(token: str) -> Optional[dict]:
- """验证令牌"""
- try:
- payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=["HS256"])
- return payload
- except jwt.PyJWTError:
- return None
- # 创建FastAPI应用
- app = FastAPI(
- title="SSO认证中心",
- version="1.0.0",
- description="OAuth2单点登录认证中心",
- docs_url="/docs",
- redoc_url="/redoc"
- )
- # 配置CORS
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- security = HTTPBearer()
- @app.get("/")
- async def root():
- """根路径"""
- return ApiResponse(
- code=0,
- message="欢迎使用SSO认证中心",
- data={
- "name": "SSO认证中心",
- "version": "1.0.0",
- "docs": "/docs"
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.get("/health")
- async def health_check():
- """健康检查"""
- return ApiResponse(
- code=0,
- message="服务正常运行",
- data={
- "status": "healthy",
- "version": "1.0.0",
- "timestamp": datetime.now(timezone.utc).isoformat()
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.post("/api/v1/auth/login")
- async def login(request: Request, login_data: LoginRequest):
- """用户登录"""
- print(f"🔐 收到登录请求: username={login_data.username}")
-
- try:
- # 获取数据库连接
- print("📊 尝试连接数据库...")
- conn = get_db_connection()
- if not conn:
- print("❌ 数据库连接失败")
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- print("✅ 数据库连接成功")
- cursor = conn.cursor()
-
- # 查找用户
- print(f"🔍 查找用户: {login_data.username}")
- cursor.execute(
- "SELECT id, username, email, password_hash, is_active, is_superuser FROM users WHERE username = %s OR email = %s",
- (login_data.username, login_data.username)
- )
-
- user_data = cursor.fetchone()
- print(f"👤 用户查询结果: {user_data is not None}")
-
- cursor.close()
- conn.close()
-
- if not user_data:
- print("❌ 用户不存在")
- return ApiResponse(
- code=200001,
- message="用户名或密码错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id, username, email, password_hash, is_active, is_superuser = user_data
- print(f"✅ 找到用户: {username}, 激活状态: {is_active}")
-
- # 检查用户状态
- if not is_active:
- print("❌ 用户已被禁用")
- return ApiResponse(
- code=200002,
- message="用户已被禁用",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 验证密码
- print(f"🔑 验证密码,哈希格式: {password_hash[:20]}...")
- password_valid = verify_password_simple(login_data.password, password_hash)
- print(f"🔑 密码验证结果: {password_valid}")
-
- if not password_valid:
- print("❌ 密码验证失败")
- return ApiResponse(
- code=200001,
- message="用户名或密码错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 生成令牌
- print("🎫 生成访问令牌...")
- token_data = {
- "sub": user_id,
- "username": username,
- "email": email,
- "is_superuser": is_superuser
- }
-
- access_token = create_access_token(token_data)
- print(f"✅ 令牌生成成功: {access_token[:50]}...")
-
- token_response = TokenResponse(
- access_token=access_token,
- token_type="Bearer",
- expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
- scope="profile email"
- )
-
- print("🎉 登录成功")
- return ApiResponse(
- code=0,
- message="登录成功",
- data=token_response.model_dump(),
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"❌ 登录错误详情: {type(e).__name__}: {str(e)}")
- import traceback
- print(f"❌ 错误堆栈: {traceback.format_exc()}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.get("/api/v1/users/profile")
- async def get_user_profile(credentials: HTTPAuthorizationCredentials = Depends(security)):
- """获取用户资料"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
- if not user_id:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 查找用户详细信息
- cursor.execute("""
- SELECT u.id, u.username, u.email, u.phone, u.avatar_url, u.is_active, u.is_superuser,
- u.last_login_at, u.created_at, u.updated_at,
- p.real_name, p.company, p.department, p.position
- FROM users u
- LEFT JOIN user_profiles p ON u.id = p.user_id
- WHERE u.id = %s
- """, (user_id,))
-
- user_data = cursor.fetchone()
-
- # 获取用户角色
- cursor.execute("""
- SELECT r.name
- FROM user_roles ur
- JOIN roles r ON ur.role_id = r.id
- WHERE ur.user_id = %s AND ur.is_active = 1
- """, (user_id,))
-
- roles = [row[0] for row in cursor.fetchall()]
-
- cursor.close()
- conn.close()
-
- if not user_data:
- return ApiResponse(
- code=200001,
- message="用户不存在",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 构建用户信息
- user_info = {
- "id": user_data[0],
- "username": user_data[1],
- "email": user_data[2],
- "phone": user_data[3],
- "avatar_url": user_data[4],
- "is_active": user_data[5],
- "is_superuser": user_data[6],
- "last_login_at": user_data[7].isoformat() if user_data[7] else None,
- "created_at": user_data[8].isoformat() if user_data[8] else None,
- "updated_at": user_data[9].isoformat() if user_data[9] else None,
- "real_name": user_data[10],
- "company": user_data[11],
- "department": user_data[12],
- "position": user_data[13],
- "roles": roles
- }
-
- return ApiResponse(
- code=0,
- message="获取用户资料成功",
- data=user_info,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"获取用户资料错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.put("/api/v1/users/profile")
- async def update_user_profile(
- request: Request,
- profile_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """更新用户资料"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 更新用户基本信息
- update_fields = []
- update_values = []
-
- if 'email' in profile_data:
- update_fields.append('email = %s')
- update_values.append(profile_data['email'])
-
- if 'phone' in profile_data:
- update_fields.append('phone = %s')
- update_values.append(profile_data['phone'])
-
- if update_fields:
- update_values.append(user_id)
- cursor.execute(f"""
- UPDATE users
- SET {', '.join(update_fields)}, updated_at = NOW()
- WHERE id = %s
- """, update_values)
-
- # 更新或插入用户详情
- profile_fields = ['real_name', 'company', 'department', 'position']
- profile_updates = {k: v for k, v in profile_data.items() if k in profile_fields}
-
- if profile_updates:
- # 检查是否已有记录
- cursor.execute("SELECT id FROM user_profiles WHERE user_id = %s", (user_id,))
- profile_exists = cursor.fetchone()
-
- if profile_exists:
- # 更新现有记录
- update_fields = []
- update_values = []
- for field, value in profile_updates.items():
- update_fields.append(f'{field} = %s')
- update_values.append(value)
-
- update_values.append(user_id)
- cursor.execute(f"""
- UPDATE user_profiles
- SET {', '.join(update_fields)}, updated_at = NOW()
- WHERE user_id = %s
- """, update_values)
- else:
- # 插入新记录
- fields = ['user_id'] + list(profile_updates.keys())
- values = [user_id] + list(profile_updates.values())
- placeholders = ', '.join(['%s'] * len(values))
-
- cursor.execute(f"""
- INSERT INTO user_profiles ({', '.join(fields)}, created_at, updated_at)
- VALUES ({placeholders}, NOW(), NOW())
- """, values)
-
- conn.commit()
- cursor.close()
- conn.close()
-
- return ApiResponse(
- code=0,
- message="用户资料更新成功",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"更新用户资料错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.put("/api/v1/users/password")
- async def change_user_password(
- request: Request,
- password_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """修改用户密码"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
- old_password = password_data.get('old_password')
- new_password = password_data.get('new_password')
-
- if not old_password or not new_password:
- return ApiResponse(
- code=100001,
- message="缺少必要参数",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 验证当前密码
- cursor.execute("SELECT password_hash FROM users WHERE id = %s", (user_id,))
- result = cursor.fetchone()
-
- if not result or not verify_password_simple(old_password, result[0]):
- cursor.close()
- conn.close()
- return ApiResponse(
- code=200001,
- message="当前密码错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 生成新密码哈希
- new_password_hash = hash_password_simple(new_password)
-
- # 更新密码
- cursor.execute("""
- UPDATE users
- SET password_hash = %s, updated_at = NOW()
- WHERE id = %s
- """, (new_password_hash, user_id))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- return ApiResponse(
- code=0,
- message="密码修改成功",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"修改密码错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- def hash_password_simple(password):
- """简单的密码哈希"""
- import hashlib
- import secrets
-
- # 生成盐值
- salt = secrets.token_hex(16)
-
- # 使用SHA256哈希
- password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
-
- return f"sha256${salt}${password_hash}"
- @app.post("/api/v1/auth/logout")
- async def logout():
- """用户登出"""
- return ApiResponse(
- code=0,
- message="登出成功",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- # OAuth2 授权端点
- @app.get("/oauth/authorize")
- async def oauth_authorize(
- response_type: str,
- client_id: str,
- redirect_uri: str,
- scope: str = "profile",
- state: str = None
- ):
- """OAuth2授权端点"""
- try:
- print(f"🔐 OAuth授权请求: client_id={client_id}, redirect_uri={redirect_uri}, scope={scope}")
-
- # 验证必要参数
- if not response_type or not client_id or not redirect_uri:
- error_url = f"{redirect_uri}?error=invalid_request&error_description=Missing required parameters"
- if state:
- error_url += f"&state={state}"
- return {"error": "invalid_request", "redirect_url": error_url}
-
- # 验证response_type
- if response_type != "code":
- error_url = f"{redirect_uri}?error=unsupported_response_type&error_description=Only authorization code flow is supported"
- if state:
- error_url += f"&state={state}"
- return {"error": "unsupported_response_type", "redirect_url": error_url}
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- error_url = f"{redirect_uri}?error=server_error&error_description=Database connection failed"
- if state:
- error_url += f"&state={state}"
- return {"error": "server_error", "redirect_url": error_url}
-
- cursor = conn.cursor()
-
- # 验证client_id和redirect_uri
- cursor.execute("""
- SELECT id, name, redirect_uris, scope, is_active, is_trusted
- FROM apps
- WHERE app_key = %s AND is_active = 1
- """, (client_id,))
-
- app_data = cursor.fetchone()
- cursor.close()
- conn.close()
-
- if not app_data:
- error_url = f"{redirect_uri}?error=invalid_client&error_description=Invalid client_id"
- if state:
- error_url += f"&state={state}"
- return {"error": "invalid_client", "redirect_url": error_url}
-
- app_id, app_name, redirect_uris_json, app_scope_json, is_active, is_trusted = app_data
-
- # 验证redirect_uri
- redirect_uris = json.loads(redirect_uris_json) if redirect_uris_json else []
- if redirect_uri not in redirect_uris:
- error_url = f"{redirect_uri}?error=invalid_request&error_description=Invalid redirect_uri"
- if state:
- error_url += f"&state={state}"
- return {"error": "invalid_request", "redirect_url": error_url}
-
- # 验证scope
- app_scopes = json.loads(app_scope_json) if app_scope_json else []
- requested_scopes = scope.split() if scope else []
- invalid_scopes = [s for s in requested_scopes if s not in app_scopes]
- if invalid_scopes:
- error_url = f"{redirect_uri}?error=invalid_scope&error_description=Invalid scope: {' '.join(invalid_scopes)}"
- if state:
- error_url += f"&state={state}"
- return {"error": "invalid_scope", "redirect_url": error_url}
-
- # TODO: 检查用户登录状态
- # 这里应该检查用户是否已登录(通过session或cookie)
- # 如果未登录,应该重定向到登录页面
-
- # 临时方案:返回登录页面,让用户先登录
- # 生产环境应该使用session管理
-
- # 构建登录页面URL,登录后返回授权页面
- login_page_url = f"/oauth/login?response_type={response_type}&client_id={client_id}&redirect_uri={redirect_uri}&scope={scope}"
- if state:
- login_page_url += f"&state={state}"
-
- print(f"🔐 需要用户登录,重定向到登录页面: {login_page_url}")
-
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=login_page_url, status_code=302)
-
- # 非受信任应用需要用户授权确认
- # 这里返回授权页面HTML
- authorization_html = f"""
- <!DOCTYPE html>
- <html>
- <head>
- <title>授权确认 - SSO认证中心</title>
- <meta charset="utf-8">
- <meta name="viewport" content="width=device-width, initial-scale=1">
- <style>
- body {{
- font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
- margin: 0;
- padding: 20px;
- min-height: 100vh;
- display: flex;
- align-items: center;
- justify-content: center;
- }}
- .auth-container {{
- background: white;
- border-radius: 10px;
- padding: 40px;
- box-shadow: 0 10px 30px rgba(0, 0, 0, 0.1);
- max-width: 400px;
- width: 100%;
- }}
- .auth-header {{
- text-align: center;
- margin-bottom: 30px;
- }}
- .auth-header h1 {{
- color: #333;
- margin-bottom: 10px;
- }}
- .app-info {{
- background: #f8f9fa;
- padding: 20px;
- border-radius: 8px;
- margin-bottom: 20px;
- }}
- .scope-list {{
- list-style: none;
- padding: 0;
- margin: 10px 0;
- }}
- .scope-list li {{
- padding: 5px 0;
- color: #666;
- }}
- .scope-list li:before {{
- content: "✓ ";
- color: #28a745;
- font-weight: bold;
- }}
- .auth-buttons {{
- display: flex;
- gap: 10px;
- margin-top: 20px;
- }}
- .btn {{
- flex: 1;
- padding: 12px 20px;
- border: none;
- border-radius: 6px;
- font-size: 16px;
- cursor: pointer;
- text-decoration: none;
- text-align: center;
- display: inline-block;
- }}
- .btn-primary {{
- background: #007bff;
- color: white;
- }}
- .btn-secondary {{
- background: #6c757d;
- color: white;
- }}
- .btn:hover {{
- opacity: 0.9;
- }}
- </style>
- </head>
- <body>
- <div class="auth-container">
- <div class="auth-header">
- <h1>授权确认</h1>
- <p>应用请求访问您的账户</p>
- </div>
-
- <div class="app-info">
- <h3>{app_name}</h3>
- <p>该应用请求以下权限:</p>
- <ul class="scope-list">
- """
-
- # 添加权限列表
- scope_descriptions = {
- "profile": "访问您的基本信息(用户名、头像等)",
- "email": "访问您的邮箱地址",
- "phone": "访问您的手机号码",
- "roles": "访问您的角色和权限信息"
- }
-
- for scope_item in requested_scopes:
- description = scope_descriptions.get(scope_item, f"访问 {scope_item} 信息")
- authorization_html += f"<li>{description}</li>"
-
- authorization_html += f"""
- </ul>
- </div>
-
- <div class="auth-buttons">
- <a href="/oauth/authorize/deny?client_id={client_id}&redirect_uri={redirect_uri}&state={state or ''}" class="btn btn-secondary">拒绝</a>
- <a href="/oauth/authorize/approve?client_id={client_id}&redirect_uri={redirect_uri}&scope={scope}&state={state or ''}" class="btn btn-primary">授权</a>
- </div>
- </div>
- </body>
- </html>
- """
-
- from fastapi.responses import HTMLResponse
- return HTMLResponse(content=authorization_html)
-
- except Exception as e:
- print(f"❌ OAuth授权错误: {e}")
- error_url = f"{redirect_uri}?error=server_error&error_description=Internal server error"
- if state:
- error_url += f"&state={state}"
- return {"error": "server_error", "redirect_url": error_url}
- @app.get("/oauth/login")
- async def oauth_login_page(
- response_type: str,
- client_id: str,
- redirect_uri: str,
- scope: str = "profile",
- state: str = None
- ):
- """OAuth2登录页面"""
- try:
- print(f"🔐 显示OAuth登录页面: client_id={client_id}")
-
- # 获取应用信息
- conn = get_db_connection()
- if not conn:
- return {"error": "server_error", "message": "数据库连接失败"}
-
- cursor = conn.cursor()
- cursor.execute("SELECT name FROM apps WHERE app_key = %s", (client_id,))
- app_data = cursor.fetchone()
- cursor.close()
- conn.close()
-
- app_name = app_data[0] if app_data else "未知应用"
-
- # 构建登录页面HTML
- login_html = f"""
- <!DOCTYPE html>
- <html lang="zh-CN">
- <head>
- <meta charset="UTF-8">
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
- <title>SSO登录 - {app_name}</title>
- <style>
- body {{
- font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
- margin: 0;
- padding: 20px;
- min-height: 100vh;
- display: flex;
- align-items: center;
- justify-content: center;
- }}
- .login-container {{
- background: white;
- border-radius: 15px;
- padding: 40px;
- box-shadow: 0 20px 40px rgba(0, 0, 0, 0.1);
- max-width: 400px;
- width: 100%;
- }}
- .login-header {{
- text-align: center;
- margin-bottom: 30px;
- }}
- .login-header h1 {{
- color: #333;
- margin-bottom: 10px;
- }}
- .app-info {{
- background: #f8f9fa;
- padding: 15px;
- border-radius: 8px;
- margin-bottom: 20px;
- text-align: center;
- }}
- .form-group {{
- margin-bottom: 20px;
- }}
- .form-group label {{
- display: block;
- margin-bottom: 5px;
- font-weight: 500;
- color: #333;
- }}
- .form-group input {{
- width: 100%;
- padding: 12px;
- border: 1px solid #ddd;
- border-radius: 6px;
- font-size: 16px;
- box-sizing: border-box;
- }}
- .form-group input:focus {{
- outline: none;
- border-color: #007bff;
- box-shadow: 0 0 0 2px rgba(0, 123, 255, 0.25);
- }}
- .btn {{
- width: 100%;
- padding: 12px;
- background: #007bff;
- color: white;
- border: none;
- border-radius: 6px;
- font-size: 16px;
- font-weight: 500;
- cursor: pointer;
- transition: background 0.3s;
- }}
- .btn:hover {{
- background: #0056b3;
- }}
- .btn:disabled {{
- background: #6c757d;
- cursor: not-allowed;
- }}
- .error-message {{
- color: #dc3545;
- font-size: 14px;
- margin-top: 10px;
- text-align: center;
- }}
- .success-message {{
- color: #28a745;
- font-size: 14px;
- margin-top: 10px;
- text-align: center;
- }}
- </style>
- </head>
- <body>
- <div class="login-container">
- <div class="login-header">
- <h1>🔐 SSO登录</h1>
- <p>请登录以继续访问应用</p>
- </div>
-
- <div class="app-info">
- <strong>{app_name}</strong> 请求访问您的账户
- </div>
-
- <form id="loginForm" onsubmit="handleLogin(event)">
- <div class="form-group">
- <label for="username">用户名或邮箱</label>
- <input type="text" id="username" name="username" required>
- </div>
-
- <div class="form-group">
- <label for="password">密码</label>
- <input type="password" id="password" name="password" required>
- </div>
-
- <button type="submit" class="btn" id="loginBtn">登录</button>
-
- <div id="message"></div>
- </form>
-
- <div style="margin-top: 20px; text-align: center; font-size: 14px; color: #666;">
- <p>测试账号: admin / Admin123456</p>
- </div>
- </div>
-
- <script>
- async function handleLogin(event) {{
- event.preventDefault();
-
- const loginBtn = document.getElementById('loginBtn');
- const messageDiv = document.getElementById('message');
-
- loginBtn.disabled = true;
- loginBtn.textContent = '登录中...';
- messageDiv.innerHTML = '';
-
- const formData = new FormData(event.target);
- const loginData = {{
- username: formData.get('username'),
- password: formData.get('password'),
- remember_me: false
- }};
-
- try {{
- // 调用登录API
- const response = await fetch('/api/v1/auth/login', {{
- method: 'POST',
- headers: {{
- 'Content-Type': 'application/json'
- }},
- body: JSON.stringify(loginData)
- }});
-
- const result = await response.json();
-
- if (result.code === 0) {{
- messageDiv.innerHTML = '<div class="success-message">登录成功,正在跳转...</div>';
-
- // 登录成功后,重定向到授权页面
- const authUrl = `/oauth/authorize/authenticated?response_type={response_type}&client_id={client_id}&redirect_uri={redirect_uri}&scope={scope}&state={state or ''}&access_token=${{result.data.access_token}}`;
-
- setTimeout(() => {{
- window.location.href = authUrl;
- }}, 1000);
- }} else {{
- messageDiv.innerHTML = `<div class="error-message">${{result.message}}</div>`;
- }}
- }} catch (error) {{
- messageDiv.innerHTML = '<div class="error-message">登录失败,请重试</div>';
- }} finally {{
- loginBtn.disabled = false;
- loginBtn.textContent = '登录';
- }}
- }}
- </script>
- </body>
- </html>
- """
-
- from fastapi.responses import HTMLResponse
- return HTMLResponse(content=login_html)
-
- except Exception as e:
- print(f"❌ OAuth登录页面错误: {e}")
- return {"error": "server_error", "message": "服务器内部错误"}
- @app.get("/oauth/authorize/authenticated")
- async def oauth_authorize_authenticated(
- response_type: str,
- client_id: str,
- redirect_uri: str,
- access_token: str,
- scope: str = "profile",
- state: str = None
- ):
- """用户已登录后的授权处理"""
- try:
- print(f"🔐 用户已登录,处理授权: client_id={client_id}")
-
- # 验证访问令牌
- payload = verify_token(access_token)
- if not payload:
- error_url = f"{redirect_uri}?error=invalid_token&error_description=Invalid access token"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
-
- user_id = payload.get("sub")
- username = payload.get("username", "")
-
- print(f"✅ 用户已验证: {username} ({user_id})")
-
- # 获取应用信息
- conn = get_db_connection()
- if not conn:
- error_url = f"{redirect_uri}?error=server_error&error_description=Database connection failed"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
-
- cursor = conn.cursor()
- cursor.execute("SELECT name, is_trusted FROM apps WHERE app_key = %s", (client_id,))
- app_data = cursor.fetchone()
- cursor.close()
- conn.close()
-
- if not app_data:
- error_url = f"{redirect_uri}?error=invalid_client&error_description=Invalid client"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
-
- app_name, is_trusted = app_data
-
- # 如果是受信任应用,直接授权
- if is_trusted:
- # 生成授权码
- auth_code = secrets.token_urlsafe(32)
-
- # TODO: 将授权码存储到数据库,关联用户和应用
- # 这里简化处理,实际应该存储到数据库
-
- # 重定向回应用
- callback_url = f"{redirect_uri}?code={auth_code}"
- if state:
- callback_url += f"&state={state}"
-
- print(f"✅ 受信任应用自动授权: {callback_url}")
-
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=callback_url, status_code=302)
-
- # 非受信任应用,显示授权确认页面
- # 这里可以返回授权确认页面的HTML
- # 为简化,暂时也直接授权
- auth_code = secrets.token_urlsafe(32)
- callback_url = f"{redirect_uri}?code={auth_code}"
- if state:
- callback_url += f"&state={state}"
-
- print(f"✅ 用户授权完成: {callback_url}")
-
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=callback_url, status_code=302)
-
- except Exception as e:
- print(f"❌ 授权处理错误: {e}")
- error_url = f"{redirect_uri}?error=server_error&error_description=Authorization failed"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
- async def oauth_approve(
- client_id: str,
- redirect_uri: str,
- scope: str = "profile",
- state: str = None
- ):
- """用户同意授权"""
- try:
- print(f"✅ 用户同意授权: client_id={client_id}")
-
- # 生成授权码
- auth_code = secrets.token_urlsafe(32)
-
- # TODO: 将授权码存储到数据库,关联用户和应用
- # 这里简化处理,实际应该:
- # 1. 验证用户登录状态
- # 2. 将授权码存储到数据库
- # 3. 设置过期时间(通常10分钟)
-
- # 构建回调URL
- callback_url = f"{redirect_uri}?code={auth_code}"
- if state:
- callback_url += f"&state={state}"
-
- print(f"🔄 重定向到: {callback_url}")
-
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=callback_url, status_code=302)
-
- except Exception as e:
- print(f"❌ 授权确认错误: {e}")
- error_url = f"{redirect_uri}?error=server_error&error_description=Authorization failed"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
- @app.get("/oauth/authorize/deny")
- async def oauth_deny(
- client_id: str,
- redirect_uri: str,
- state: str = None
- ):
- """用户拒绝授权"""
- try:
- print(f"❌ 用户拒绝授权: client_id={client_id}")
-
- # 构建错误回调URL
- error_url = f"{redirect_uri}?error=access_denied&error_description=User denied authorization"
- if state:
- error_url += f"&state={state}"
-
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
-
- except Exception as e:
- print(f"❌ 拒绝授权错误: {e}")
- error_url = f"{redirect_uri}?error=server_error&error_description=Authorization failed"
- if state:
- error_url += f"&state={state}"
- from fastapi.responses import RedirectResponse
- return RedirectResponse(url=error_url, status_code=302)
- @app.post("/oauth/token")
- async def oauth_token(request: Request):
- """OAuth2令牌端点"""
- try:
- # 获取请求数据
- form_data = await request.form()
-
- grant_type = form_data.get("grant_type")
- code = form_data.get("code")
- redirect_uri = form_data.get("redirect_uri")
- client_id = form_data.get("client_id")
- client_secret = form_data.get("client_secret")
-
- print(f"🎫 令牌请求: grant_type={grant_type}, client_id={client_id}")
-
- # 验证grant_type
- if grant_type != "authorization_code":
- return {
- "error": "unsupported_grant_type",
- "error_description": "Only authorization_code grant type is supported"
- }
-
- # 验证必要参数
- if not code or not redirect_uri or not client_id:
- return {
- "error": "invalid_request",
- "error_description": "Missing required parameters"
- }
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return {
- "error": "server_error",
- "error_description": "Database connection failed"
- }
-
- cursor = conn.cursor()
-
- # 验证客户端
- cursor.execute("""
- SELECT id, name, app_secret, redirect_uris, scope, is_active
- FROM apps
- WHERE app_key = %s AND is_active = 1
- """, (client_id,))
-
- app_data = cursor.fetchone()
-
- if not app_data:
- cursor.close()
- conn.close()
- return {
- "error": "invalid_client",
- "error_description": "Invalid client credentials"
- }
-
- app_id, app_name, stored_secret, redirect_uris_json, scope_json, is_active = app_data
-
- # 验证客户端密钥(如果提供了)
- if client_secret and client_secret != stored_secret:
- cursor.close()
- conn.close()
- return {
- "error": "invalid_client",
- "error_description": "Invalid client credentials"
- }
-
- # 验证redirect_uri
- redirect_uris = json.loads(redirect_uris_json) if redirect_uris_json else []
- if redirect_uri not in redirect_uris:
- cursor.close()
- conn.close()
- return {
- "error": "invalid_grant",
- "error_description": "Invalid redirect_uri"
- }
-
- # TODO: 验证授权码
- # 这里简化处理,实际应该:
- # 1. 从数据库查找授权码
- # 2. 验证授权码是否有效且未过期
- # 3. 验证授权码是否已被使用
- # 4. 获取关联的用户ID
-
- # 模拟用户ID(实际应该从授权码记录中获取)
- user_id = "ed6a79d3-0083-4d81-8b48-fc522f686f74" # admin用户ID
-
- # 生成访问令牌
- token_data = {
- "sub": user_id,
- "client_id": client_id,
- "scope": "profile email"
- }
-
- access_token = create_access_token(token_data)
- refresh_token = secrets.token_urlsafe(32)
-
- # TODO: 将令牌存储到数据库
-
- cursor.close()
- conn.close()
-
- # 返回令牌响应
- token_response = {
- "access_token": access_token,
- "token_type": "Bearer",
- "expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
- "refresh_token": refresh_token,
- "scope": "profile email"
- }
-
- print(f"✅ 令牌生成成功: {access_token[:50]}...")
-
- return token_response
-
- except Exception as e:
- print(f"❌ 令牌生成错误: {e}")
- return {
- "error": "server_error",
- "error_description": "Internal server error"
- }
- @app.get("/oauth/userinfo")
- async def oauth_userinfo(credentials: HTTPAuthorizationCredentials = Depends(security)):
- """OAuth2用户信息端点"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return {
- "error": "invalid_token",
- "error_description": "Invalid or expired access token"
- }
-
- user_id = payload.get("sub")
- client_id = payload.get("client_id")
- scope = payload.get("scope", "").split()
-
- print(f"👤 用户信息请求: user_id={user_id}, client_id={client_id}, scope={scope}")
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return {
- "error": "server_error",
- "error_description": "Database connection failed"
- }
-
- cursor = conn.cursor()
-
- # 查找用户信息
- cursor.execute("""
- SELECT u.id, u.username, u.email, u.phone, u.avatar_url, u.is_active,
- p.real_name, p.company, p.department, p.position
- FROM users u
- LEFT JOIN user_profiles p ON u.id = p.user_id
- WHERE u.id = %s AND u.is_active = 1
- """, (user_id,))
-
- user_data = cursor.fetchone()
- cursor.close()
- conn.close()
-
- if not user_data:
- return {
- "error": "invalid_token",
- "error_description": "User not found or inactive"
- }
-
- # 构建用户信息响应(根据scope过滤)
- user_info = {"sub": user_data[0]}
-
- if "profile" in scope:
- user_info.update({
- "username": user_data[1],
- "avatar_url": user_data[4],
- "real_name": user_data[6],
- "company": user_data[7],
- "department": user_data[8],
- "position": user_data[9]
- })
-
- if "email" in scope:
- user_info["email"] = user_data[2]
-
- if "phone" in scope:
- user_info["phone"] = user_data[3]
-
- print(f"✅ 返回用户信息: {user_info}")
-
- return user_info
-
- except Exception as e:
- print(f"❌ 获取用户信息错误: {e}")
- return {
- "error": "server_error",
- "error_description": "Internal server error"
- }
- @app.get("/api/v1/apps")
- async def get_apps(
- page: int = 1,
- page_size: int = 20,
- keyword: str = "",
- status: str = "",
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """获取应用列表"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 检查用户角色,决定是否显示所有应用
- cursor.execute("""
- SELECT COUNT(*) FROM user_roles ur
- JOIN roles r ON ur.role_id = r.id
- WHERE ur.user_id = %s AND r.name IN ('super_admin', 'admin', 'app_manager') AND ur.is_active = 1
- """, (user_id,))
-
- is_app_manager = cursor.fetchone()[0] > 0
-
- # 构建查询条件
- where_conditions = []
- params = []
-
- # 如果不是应用管理员,只显示自己创建的应用
- if not is_app_manager:
- where_conditions.append("created_by = %s")
- params.append(user_id)
-
- if keyword:
- where_conditions.append("(name LIKE %s OR description LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%"])
-
- if status == "active":
- where_conditions.append("is_active = 1")
- elif status == "inactive":
- where_conditions.append("is_active = 0")
-
- where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
-
- # 查询总数
- cursor.execute(f"SELECT COUNT(*) FROM apps WHERE {where_clause}", params)
- total = cursor.fetchone()[0]
-
- # 查询应用列表
- offset = (page - 1) * page_size
- cursor.execute(f"""
- SELECT id, name, app_key, description, icon_url, redirect_uris, scope,
- is_active, is_trusted, access_token_expires, refresh_token_expires,
- created_at, updated_at
- FROM apps
- WHERE {where_clause}
- ORDER BY created_at DESC
- LIMIT %s OFFSET %s
- """, params + [page_size, offset])
-
- apps = []
- for row in cursor.fetchall():
- app = {
- "id": row[0],
- "name": row[1],
- "app_key": row[2],
- "description": row[3],
- "icon_url": row[4],
- "redirect_uris": json.loads(row[5]) if row[5] else [],
- "scope": json.loads(row[6]) if row[6] else [],
- "is_active": bool(row[7]),
- "is_trusted": bool(row[8]),
- "access_token_expires": row[9],
- "refresh_token_expires": row[10],
- "created_at": row[11].isoformat() if row[11] else None,
- "updated_at": row[12].isoformat() if row[12] else None,
- # 模拟统计数据
- "today_requests": secrets.randbelow(1000),
- "active_users": secrets.randbelow(100)
- }
- apps.append(app)
-
- cursor.close()
- conn.close()
-
- return ApiResponse(
- code=0,
- message="获取应用列表成功",
- data={
- "items": apps,
- "total": total,
- "page": page,
- "page_size": page_size
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"获取应用列表错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.get("/api/v1/apps/{app_id}")
- async def get_app_detail(
- app_id: str,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """获取应用详情(包含密钥)"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 查询应用详情(包含密钥)
- cursor.execute("""
- SELECT id, name, app_key, app_secret, description, icon_url,
- redirect_uris, scope, is_active, is_trusted,
- access_token_expires, refresh_token_expires,
- created_at, updated_at
- FROM apps
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- app_data = cursor.fetchone()
- cursor.close()
- conn.close()
-
- if not app_data:
- return ApiResponse(
- code=200001,
- message="应用不存在或无权限",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- app_detail = {
- "id": app_data[0],
- "name": app_data[1],
- "app_key": app_data[2],
- "app_secret": app_data[3],
- "description": app_data[4],
- "icon_url": app_data[5],
- "redirect_uris": json.loads(app_data[6]) if app_data[6] else [],
- "scope": json.loads(app_data[7]) if app_data[7] else [],
- "is_active": bool(app_data[8]),
- "is_trusted": bool(app_data[9]),
- "access_token_expires": app_data[10],
- "refresh_token_expires": app_data[11],
- "created_at": app_data[12].isoformat() if app_data[12] else None,
- "updated_at": app_data[13].isoformat() if app_data[13] else None
- }
-
- return ApiResponse(
- code=0,
- message="获取应用详情成功",
- data=app_detail,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"获取应用详情错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.post("/api/v1/apps")
- async def create_app(
- request: Request,
- app_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """创建应用"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 验证必要字段
- if not app_data.get('name') or not app_data.get('redirect_uris'):
- return ApiResponse(
- code=100001,
- message="缺少必要参数",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 生成应用ID和密钥
- app_id = str(uuid.uuid4())
- app_key = generate_random_string(32)
- app_secret = generate_random_string(64)
-
- # 插入应用记录
- cursor.execute("""
- INSERT INTO apps (
- id, name, app_key, app_secret, description, icon_url,
- redirect_uris, scope, is_active, is_trusted,
- access_token_expires, refresh_token_expires, created_by,
- created_at, updated_at
- ) VALUES (
- %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()
- )
- """, (
- app_id,
- app_data['name'],
- app_key,
- app_secret,
- app_data.get('description', ''),
- app_data.get('icon_url', ''),
- json.dumps(app_data['redirect_uris']),
- json.dumps(app_data.get('scope', ['profile'])),
- True,
- app_data.get('is_trusted', False),
- app_data.get('access_token_expires', 7200),
- app_data.get('refresh_token_expires', 2592000),
- user_id
- ))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- # 返回创建的应用信息
- app_info = {
- "id": app_id,
- "name": app_data['name'],
- "app_key": app_key,
- "app_secret": app_secret,
- "description": app_data.get('description', ''),
- "redirect_uris": app_data['redirect_uris'],
- "scope": app_data.get('scope', ['profile']),
- "is_active": True,
- "is_trusted": app_data.get('is_trusted', False)
- }
-
- return ApiResponse(
- code=0,
- message="应用创建成功",
- data=app_info,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"创建应用错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.put("/api/v1/apps/{app_id}/status")
- async def toggle_app_status(
- app_id: str,
- status_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """切换应用状态"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
- is_active = status_data.get('is_active')
-
- if is_active is None:
- return ApiResponse(
- code=100001,
- message="缺少必要参数",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 检查应用是否存在且属于当前用户
- cursor.execute("""
- SELECT id, name FROM apps
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- app_data = cursor.fetchone()
-
- if not app_data:
- cursor.close()
- conn.close()
- return ApiResponse(
- code=200001,
- message="应用不存在或无权限",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 更新应用状态
- cursor.execute("""
- UPDATE apps
- SET is_active = %s, updated_at = NOW()
- WHERE id = %s
- """, (is_active, app_id))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- action = "启用" if is_active else "禁用"
- print(f"✅ 应用状态已更新: {app_data[1]} -> {action}")
-
- return ApiResponse(
- code=0,
- message=f"应用已{action}",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"切换应用状态错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.put("/api/v1/apps/{app_id}")
- async def update_app(
- app_id: str,
- app_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """更新应用信息"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 验证必要参数
- name = app_data.get('name', '').strip()
- if not name:
- return ApiResponse(
- code=100001,
- message="应用名称不能为空",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 检查应用是否存在且属于当前用户
- cursor.execute("""
- SELECT id, name FROM apps
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- existing_app = cursor.fetchone()
-
- if not existing_app:
- cursor.close()
- conn.close()
- return ApiResponse(
- code=200001,
- message="应用不存在或无权限",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 检查应用名称是否已被其他应用使用
- cursor.execute("""
- SELECT id FROM apps
- WHERE name = %s AND created_by = %s AND id != %s
- """, (name, user_id, app_id))
-
- if cursor.fetchone():
- cursor.close()
- conn.close()
- return ApiResponse(
- code=200001,
- message="应用名称已存在",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 准备更新数据
- description = (app_data.get('description') or '').strip()
- icon_url = (app_data.get('icon_url') or '').strip()
- redirect_uris = app_data.get('redirect_uris', [])
- scope = app_data.get('scope', ['profile', 'email'])
- is_trusted = app_data.get('is_trusted', False)
- access_token_expires = app_data.get('access_token_expires', 7200)
- refresh_token_expires = app_data.get('refresh_token_expires', 2592000)
-
- # 验证回调URL
- if not redirect_uris or not isinstance(redirect_uris, list):
- cursor.close()
- conn.close()
- return ApiResponse(
- code=100001,
- message="至少需要一个回调URL",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 验证权限范围
- if not scope or not isinstance(scope, list):
- scope = ['profile', 'email']
-
- # 更新应用信息
- cursor.execute("""
- UPDATE apps
- SET name = %s, description = %s, icon_url = %s,
- redirect_uris = %s, scope = %s, is_trusted = %s,
- access_token_expires = %s, refresh_token_expires = %s,
- updated_at = NOW()
- WHERE id = %s
- """, (
- name, description, icon_url,
- json.dumps(redirect_uris), json.dumps(scope), is_trusted,
- access_token_expires, refresh_token_expires, app_id
- ))
-
- conn.commit()
-
- # 获取更新后的应用信息
- cursor.execute("""
- SELECT id, name, app_key, description, icon_url,
- redirect_uris, scope, is_active, is_trusted,
- access_token_expires, refresh_token_expires,
- created_at, updated_at
- FROM apps
- WHERE id = %s
- """, (app_id,))
-
- app_info = cursor.fetchone()
- cursor.close()
- conn.close()
-
- if app_info:
- app_result = {
- "id": app_info[0],
- "name": app_info[1],
- "app_key": app_info[2],
- "description": app_info[3],
- "icon_url": app_info[4],
- "redirect_uris": json.loads(app_info[5]) if app_info[5] else [],
- "scope": json.loads(app_info[6]) if app_info[6] else [],
- "is_active": bool(app_info[7]),
- "is_trusted": bool(app_info[8]),
- "access_token_expires": app_info[9],
- "refresh_token_expires": app_info[10],
- "created_at": app_info[11].isoformat() if app_info[11] else None,
- "updated_at": app_info[12].isoformat() if app_info[12] else None
- }
-
- print(f"✅ 应用已更新: {name}")
-
- return ApiResponse(
- code=0,
- message="应用更新成功",
- data=app_result,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- else:
- return ApiResponse(
- code=500001,
- message="获取更新后的应用信息失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"更新应用错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.delete("/api/v1/apps/{app_id}")
- async def delete_app(
- app_id: str,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """删除应用"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 检查应用是否存在且属于当前用户
- cursor.execute("""
- SELECT id, name FROM apps
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- app_data = cursor.fetchone()
-
- if not app_data:
- cursor.close()
- conn.close()
- return ApiResponse(
- code=200001,
- message="应用不存在或无权限",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 删除应用(级联删除相关数据)
- cursor.execute("DELETE FROM apps WHERE id = %s", (app_id,))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- print(f"✅ 应用已删除: {app_data[1]}")
-
- return ApiResponse(
- code=0,
- message="应用已删除",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"删除应用错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.post("/api/v1/apps/{app_id}/reset-secret")
- async def reset_app_secret(
- app_id: str,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """重置应用密钥"""
- try:
- # 验证令牌
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=200002,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- # 获取数据库连接
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500001,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 检查应用是否存在且属于当前用户
- cursor.execute("""
- SELECT id, name FROM apps
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- app_data = cursor.fetchone()
-
- if not app_data:
- cursor.close()
- conn.close()
- return ApiResponse(
- code=200001,
- message="应用不存在或无权限",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 生成新的应用密钥
- new_secret = generate_random_string(64)
-
- # 更新应用密钥
- cursor.execute("""
- UPDATE apps
- SET app_secret = %s, updated_at = NOW()
- WHERE id = %s
- """, (new_secret, app_id))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- print(f"✅ 应用密钥已重置: {app_data[1]}")
-
- return ApiResponse(
- code=0,
- message="应用密钥已重置",
- data={"app_secret": new_secret},
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"重置应用密钥错误: {e}")
- return ApiResponse(
- code=500001,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- def generate_random_string(length=32):
- """生成随机字符串"""
- import secrets
- import string
- alphabet = string.ascii_letters + string.digits
- return ''.join(secrets.choice(alphabet) for _ in range(length))
- """获取验证码"""
- try:
- # 生成验证码
- captcha_text, captcha_image = generate_captcha()
-
- # 这里应该将验证码文本存储到缓存中(Redis或内存)
- # 为了简化,我们暂时返回固定的验证码
- captcha_id = secrets.token_hex(16)
-
- return ApiResponse(
- code=0,
- message="获取验证码成功",
- data={
- "captcha_id": captcha_id,
- "captcha_image": captcha_image,
- "captcha_text": captcha_text # 生产环境中不应该返回这个
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"生成验证码错误: {e}")
- return ApiResponse(
- code=500001,
- message="生成验证码失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- def generate_captcha():
- """生成验证码"""
- try:
- from PIL import Image, ImageDraw, ImageFont
- import io
- import base64
- import random
- import string
-
- # 生成随机验证码文本
- captcha_text = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
-
- # 创建图片
- width, height = 120, 40
- image = Image.new('RGB', (width, height), color='white')
- draw = ImageDraw.Draw(image)
-
- # 尝试使用系统字体,如果失败则使用默认字体
- try:
- # Windows系统字体
- font = ImageFont.truetype("arial.ttf", 20)
- except:
- try:
- # 备用字体
- font = ImageFont.truetype("C:/Windows/Fonts/arial.ttf", 20)
- except:
- # 使用默认字体
- font = ImageFont.load_default()
-
- # 绘制验证码文本
- text_width = draw.textlength(captcha_text, font=font)
- text_height = 20
- x = (width - text_width) // 2
- y = (height - text_height) // 2
-
- # 添加一些随机颜色
- colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7', '#DDA0DD']
- text_color = random.choice(colors)
-
- draw.text((x, y), captcha_text, fill=text_color, font=font)
-
- # 添加一些干扰线
- for _ in range(3):
- x1 = random.randint(0, width)
- y1 = random.randint(0, height)
- x2 = random.randint(0, width)
- y2 = random.randint(0, height)
- draw.line([(x1, y1), (x2, y2)], fill=random.choice(colors), width=1)
-
- # 添加一些干扰点
- for _ in range(20):
- x = random.randint(0, width)
- y = random.randint(0, height)
- draw.point((x, y), fill=random.choice(colors))
-
- # 转换为base64
- buffer = io.BytesIO()
- image.save(buffer, format='PNG')
- image_data = buffer.getvalue()
- image_base64 = base64.b64encode(image_data).decode('utf-8')
-
- return captcha_text, f"data:image/png;base64,{image_base64}"
-
- except ImportError:
- # 如果PIL不可用,返回简单的文本验证码
- captcha_text = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
- # 创建一个简单的SVG验证码
- svg_captcha = f"""
- <svg width="120" height="40" xmlns="http://www.w3.org/2000/svg">
- <rect width="120" height="40" fill="#f0f0f0" stroke="#ccc"/>
- <text x="60" y="25" font-family="Arial" font-size="18" text-anchor="middle" fill="#333">{captcha_text}</text>
- </svg>
- """
- svg_base64 = base64.b64encode(svg_captcha.encode('utf-8')).decode('utf-8')
- return captcha_text, f"data:image/svg+xml;base64,{svg_base64}"
- except Exception as e:
- print(f"生成验证码图片失败: {e}")
- # 返回默认验证码
- return "1234", "data:image/svg+xml;base64,PHN2ZyB3aWR0aD0iMTIwIiBoZWlnaHQ9IjQwIiB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciPjxyZWN0IHdpZHRoPSIxMjAiIGhlaWdodD0iNDAiIGZpbGw9IiNmMGYwZjAiIHN0cm9rZT0iI2NjYyIvPjx0ZXh0IHg9IjYwIiB5PSIyNSIgZm9udC1mYW1pbHk9IkFyaWFsIiBmb250LXNpemU9IjE4IiB0ZXh0LWFuY2hvcj0ibWlkZGxlIiBmaWxsPSIjMzMzIj4xMjM0PC90ZXh0Pjwvc3ZnPg=="
- # RBAC权限管理API
- @app.get("/api/v1/user/menus")
- async def api_get_user_menus(credentials: HTTPAuthorizationCredentials = Depends(security)):
- """获取用户菜单"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=401,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 检查用户是否是超级管理员
- cursor.execute("""
- SELECT COUNT(*) FROM user_roles ur
- JOIN roles r ON ur.role_id = r.id
- WHERE ur.user_id = %s AND r.name = 'super_admin' AND ur.is_active = 1
- """, (user_id,))
-
- is_super_admin = cursor.fetchone()[0] > 0
-
- if is_super_admin:
- # 超级管理员返回所有活跃菜单
- cursor.execute("""
- SELECT m.id, m.parent_id, m.name, m.title, m.path,
- m.component, m.icon, m.sort_order, m.menu_type,
- m.is_hidden, m.is_active
- FROM menus m
- WHERE m.is_active = 1
- ORDER BY m.sort_order
- """)
- else:
- # 普通用户根据角色权限获取菜单
- cursor.execute("""
- SELECT m.id, m.parent_id, m.name, m.title, m.path,
- m.component, m.icon, m.sort_order, m.menu_type,
- m.is_hidden, m.is_active
- FROM menus m
- JOIN role_menus rm ON m.id = rm.menu_id
- JOIN user_roles ur ON rm.role_id = ur.role_id
- WHERE ur.user_id = %s
- AND ur.is_active = 1
- AND m.is_active = 1
- GROUP BY m.id, m.parent_id, m.name, m.title, m.path,
- m.component, m.icon, m.sort_order, m.menu_type,
- m.is_hidden, m.is_active
- ORDER BY m.sort_order
- """, (user_id,))
-
- menus = []
- for row in cursor.fetchall():
- menu = {
- "id": row[0],
- "parent_id": row[1],
- "name": row[2],
- "title": row[3],
- "path": row[4],
- "component": row[5],
- "icon": row[6],
- "sort_order": row[7],
- "menu_type": row[8],
- "is_hidden": bool(row[9]),
- "is_active": bool(row[10]),
- "children": []
- }
- menus.append(menu)
-
- # 构建菜单树
- menu_tree = build_menu_tree(menus)
-
- cursor.close()
- conn.close()
-
- return ApiResponse(
- code=0,
- message="获取用户菜单成功",
- data=menu_tree,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"获取用户菜单错误: {e}")
- return ApiResponse(
- code=500,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- def build_menu_tree(menus):
- """构建菜单树结构"""
- menu_map = {menu["id"]: menu for menu in menus}
- tree = []
-
- for menu in menus:
- if menu["parent_id"] is None:
- tree.append(menu)
- else:
- parent = menu_map.get(menu["parent_id"])
- if parent:
- parent["children"].append(menu)
-
- return tree
- @app.get("/api/v1/admin/menus")
- async def api_get_all_menus(
- page: int = 1,
- page_size: int = 1000, # 增大默认页面大小,确保返回所有菜单
- keyword: Optional[str] = None,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """获取所有菜单(管理员)"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=401,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 简化权限检查 - 只检查是否为管理员
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(
- code=403,
- message="权限不足",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 构建查询条件
- where_conditions = []
- params = []
-
- if keyword:
- where_conditions.append("(m.title LIKE %s OR m.name LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%"])
-
- where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
-
- # 查询总数
- cursor.execute(f"SELECT COUNT(*) FROM menus m WHERE {where_clause}", params)
- total = cursor.fetchone()[0]
-
- # 查询菜单列表 - 修改排序逻辑以支持树形结构
- cursor.execute(f"""
- SELECT m.id, m.parent_id, m.name, m.title, m.path, m.component,
- m.icon, m.sort_order, m.menu_type, m.is_hidden, m.is_active,
- m.description, m.created_at, m.updated_at,
- pm.title as parent_title
- FROM menus m
- LEFT JOIN menus pm ON m.parent_id = pm.id
- WHERE {where_clause}
- ORDER BY
- CASE WHEN m.parent_id IS NULL THEN 0 ELSE 1 END,
- m.sort_order,
- CASE WHEN m.menu_type = 'menu' THEN 0 ELSE 1 END,
- m.created_at
- LIMIT %s OFFSET %s
- """, params + [page_size, (page - 1) * page_size])
-
- menus = []
- for row in cursor.fetchall():
- menu = {
- "id": row[0],
- "parent_id": row[1],
- "name": row[2],
- "title": row[3],
- "path": row[4],
- "component": row[5],
- "icon": row[6],
- "sort_order": row[7],
- "menu_type": row[8],
- "is_hidden": bool(row[9]),
- "is_active": bool(row[10]),
- "description": row[11],
- "created_at": row[12].isoformat() if row[12] else None,
- "updated_at": row[13].isoformat() if row[13] else None,
- "parent_title": row[14]
- }
- menus.append(menu)
-
- cursor.close()
- conn.close()
-
- return ApiResponse(
- code=0,
- message="获取菜单列表成功",
- data={
- "items": menus,
- "total": total,
- "page": page,
- "page_size": page_size
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"获取菜单列表错误: {e}")
- return ApiResponse(
- code=500,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.get("/api/v1/admin/roles")
- async def api_get_all_roles(
- page: int = 1,
- page_size: int = 20,
- keyword: Optional[str] = None,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """获取所有角色"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=401,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 简化权限检查 - 只检查是否为管理员
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(
- code=403,
- message="权限不足",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 构建查询条件
- where_conditions = []
- params = []
-
- if keyword:
- where_conditions.append("(r.display_name LIKE %s OR r.name LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%"])
-
- where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
-
- # 查询总数
- cursor.execute(f"SELECT COUNT(*) FROM roles r WHERE {where_clause}", params)
- total = cursor.fetchone()[0]
-
- # 查询角色列表
- offset = (page - 1) * page_size
- cursor.execute(f"""
- SELECT r.id, r.name, r.display_name, r.description, r.is_active,
- r.is_system, r.created_at, r.updated_at,
- COUNT(ur.user_id) as user_count
- FROM roles r
- LEFT JOIN user_roles ur ON r.id = ur.role_id AND ur.is_active = 1
- WHERE {where_clause}
- GROUP BY r.id
- ORDER BY r.is_system DESC, r.created_at
- LIMIT %s OFFSET %s
- """, params + [page_size, offset])
-
- roles = []
- for row in cursor.fetchall():
- role = {
- "id": row[0],
- "name": row[1],
- "display_name": row[2],
- "description": row[3],
- "is_active": bool(row[4]),
- "is_system": bool(row[5]),
- "created_at": row[6].isoformat() if row[6] else None,
- "updated_at": row[7].isoformat() if row[7] else None,
- "user_count": row[8]
- }
- roles.append(role)
-
- cursor.close()
- conn.close()
-
- return ApiResponse(
- code=0,
- message="获取角色列表成功",
- data={
- "items": roles,
- "total": total,
- "page": page,
- "page_size": page_size
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"获取角色列表错误: {e}")
- return ApiResponse(
- code=500,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.get("/api/v1/user/permissions")
- async def api_get_user_permissions(credentials: HTTPAuthorizationCredentials = Depends(security)):
- """获取用户权限"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=401,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- user_id = payload.get("sub")
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 获取用户权限
- cursor.execute("""
- SELECT DISTINCT p.name, p.resource, p.action
- FROM permissions p
- JOIN role_permissions rp ON p.id = rp.permission_id
- JOIN user_roles ur ON rp.role_id = ur.role_id
- WHERE ur.user_id = %s
- AND ur.is_active = 1
- AND p.is_active = 1
- """, (user_id,))
-
- permissions = []
- for row in cursor.fetchall():
- permissions.append({
- "name": row[0],
- "resource": row[1],
- "action": row[2]
- })
-
- cursor.close()
- conn.close()
-
- return ApiResponse(
- code=0,
- message="获取用户权限成功",
- data=permissions,
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"获取用户权限错误: {e}")
- return ApiResponse(
- code=500,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- # 用户管理API
- @app.get("/api/v1/admin/users")
- async def get_users(
- page: int = 1,
- page_size: int = 20,
- keyword: Optional[str] = None,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """获取用户列表"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- cursor = conn.cursor()
-
- # 构建查询条件
- where_conditions = []
- params = []
-
- if keyword:
- where_conditions.append("(u.username LIKE %s OR u.email LIKE %s OR up.real_name LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%", f"%{keyword}%"])
-
- where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
-
- # 查询总数
- cursor.execute(f"SELECT COUNT(*) FROM users u LEFT JOIN user_profiles up ON u.id = up.user_id WHERE {where_clause}", params)
- total = cursor.fetchone()[0]
-
- # 查询用户列表
- offset = (page - 1) * page_size
- cursor.execute(f"""
- SELECT u.id, u.username, u.email, u.phone, u.is_active, u.is_superuser,
- u.last_login_at, u.created_at, up.real_name, up.company, up.department,
- GROUP_CONCAT(r.display_name) as roles
- FROM users u
- LEFT JOIN user_profiles up ON u.id = up.user_id
- LEFT JOIN user_roles ur ON u.id = ur.user_id AND ur.is_active = 1
- LEFT JOIN roles r ON ur.role_id = r.id
- WHERE {where_clause}
- GROUP BY u.id, u.username, u.email, u.phone, u.is_active, u.is_superuser,
- u.last_login_at, u.created_at, up.real_name, up.company, up.department
- ORDER BY u.created_at DESC
- LIMIT %s OFFSET %s
- """, params + [page_size, offset])
-
- users = []
- for row in cursor.fetchall():
- users.append({
- "id": row[0],
- "username": row[1],
- "email": row[2],
- "phone": row[3],
- "is_active": bool(row[4]),
- "is_superuser": bool(row[5]),
- "last_login_at": row[6].isoformat() if row[6] else None,
- "created_at": row[7].isoformat() if row[7] else None,
- "real_name": row[8],
- "company": row[9],
- "department": row[10],
- "roles": row[11].split(',') if row[11] else []
- })
-
- cursor.close()
- conn.close()
-
- return ApiResponse(
- code=0,
- message="获取用户列表成功",
- data={"items": users, "total": total, "page": page, "page_size": page_size},
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"获取用户列表错误: {e}")
- return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
- @app.post("/api/v1/admin/users")
- async def create_user(
- user_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """创建用户"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- cursor = conn.cursor()
-
- # 检查用户名和邮箱是否已存在
- cursor.execute("SELECT id FROM users WHERE username = %s OR email = %s",
- (user_data['username'], user_data['email']))
- if cursor.fetchone():
- cursor.close()
- conn.close()
- return ApiResponse(code=400, message="用户名或邮箱已存在", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- # 生成用户ID
- user_id = str(uuid.uuid4())
-
- # 创建密码哈希
- password_hash = hash_password_simple(user_data['password'])
-
- # 插入用户
- cursor.execute("""
- INSERT INTO users (id, username, email, phone, password_hash, is_active, is_superuser, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """, (user_id, user_data['username'], user_data['email'], user_data.get('phone'),
- password_hash, user_data.get('is_active', True), user_data.get('is_superuser', False)))
-
- # 插入用户详情
- if any(key in user_data for key in ['real_name', 'company', 'department']):
- profile_id = str(uuid.uuid4())
- cursor.execute("""
- INSERT INTO user_profiles (id, user_id, real_name, company, department, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
- """, (profile_id, user_id, user_data.get('real_name'), user_data.get('company'), user_data.get('department')))
-
- # 分配角色
- if 'role_ids' in user_data and user_data['role_ids']:
- for role_id in user_data['role_ids']:
- role_assignment_id = str(uuid.uuid4())
- cursor.execute("""
- INSERT INTO user_roles (id, user_id, role_id, assigned_by, created_at)
- VALUES (%s, %s, %s, %s, NOW())
- """, (role_assignment_id, user_id, role_id, payload.get("sub")))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- return ApiResponse(code=0, message="用户创建成功", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- except Exception as e:
- print(f"创建用户错误: {e}")
- return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
- @app.put("/api/v1/admin/users/{user_id}")
- async def update_user(
- user_id: str,
- user_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """更新用户"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- cursor = conn.cursor()
-
- # 更新用户基本信息
- update_fields = []
- update_values = []
-
- for field in ['email', 'phone', 'is_active', 'is_superuser']:
- if field in user_data:
- update_fields.append(f'{field} = %s')
- update_values.append(user_data[field])
-
- if update_fields:
- update_values.append(user_id)
- cursor.execute(f"""
- UPDATE users
- SET {', '.join(update_fields)}, updated_at = NOW()
- WHERE id = %s
- """, update_values)
-
- # 更新用户详情
- profile_fields = ['real_name', 'company', 'department']
- profile_updates = {k: v for k, v in user_data.items() if k in profile_fields}
-
- if profile_updates:
- # 检查是否已有记录
- cursor.execute("SELECT id FROM user_profiles WHERE user_id = %s", (user_id,))
- profile_exists = cursor.fetchone()
-
- if profile_exists:
- update_fields = []
- update_values = []
- for field, value in profile_updates.items():
- update_fields.append(f'{field} = %s')
- update_values.append(value)
-
- update_values.append(user_id)
- cursor.execute(f"""
- UPDATE user_profiles
- SET {', '.join(update_fields)}, updated_at = NOW()
- WHERE user_id = %s
- """, update_values)
- else:
- profile_id = str(uuid.uuid4())
- fields = ['id', 'user_id'] + list(profile_updates.keys())
- values = [profile_id, user_id] + list(profile_updates.values())
- placeholders = ', '.join(['%s'] * len(values))
-
- cursor.execute(f"""
- INSERT INTO user_profiles ({', '.join(fields)}, created_at, updated_at)
- VALUES ({placeholders}, NOW(), NOW())
- """, values)
-
- # 更新用户角色
- if 'role_ids' in user_data:
- # 删除现有角色
- cursor.execute("DELETE FROM user_roles WHERE user_id = %s", (user_id,))
-
- # 添加新角色
- for role_id in user_data['role_ids']:
- assignment_id = str(uuid.uuid4())
- cursor.execute("""
- INSERT INTO user_roles (id, user_id, role_id, assigned_by, created_at)
- VALUES (%s, %s, %s, %s, NOW())
- """, (assignment_id, user_id, role_id, payload.get("sub")))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- return ApiResponse(code=0, message="用户更新成功", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- except Exception as e:
- print(f"更新用户错误: {e}")
- return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
- @app.delete("/api/v1/admin/users/{user_id}")
- async def delete_user(
- user_id: str,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """删除用户"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- # 不能删除自己
- if user_id == payload.get("sub"):
- return ApiResponse(code=400, message="不能删除自己", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- cursor = conn.cursor()
-
- # 检查是否为超级管理员
- cursor.execute("""
- SELECT COUNT(*) FROM user_roles ur
- JOIN roles r ON ur.role_id = r.id
- WHERE ur.user_id = %s AND r.name = 'super_admin' AND ur.is_active = 1
- """, (user_id,))
-
- if cursor.fetchone()[0] > 0:
- cursor.close()
- conn.close()
- return ApiResponse(code=400, message="不能删除超级管理员", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- # 删除相关数据
- cursor.execute("DELETE FROM user_roles WHERE user_id = %s", (user_id,))
- cursor.execute("DELETE FROM user_profiles WHERE user_id = %s", (user_id,))
- cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- return ApiResponse(code=0, message="用户删除成功", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- except Exception as e:
- print(f"删除用户错误: {e}")
- return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
- # 角色管理API
- @app.post("/api/v1/admin/roles")
- async def create_role(
- role_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """创建角色"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- cursor = conn.cursor()
-
- # 检查角色名是否已存在
- cursor.execute("SELECT id FROM roles WHERE name = %s", (role_data['name'],))
- if cursor.fetchone():
- cursor.close()
- conn.close()
- return ApiResponse(code=400, message="角色名已存在", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- # 创建角色
- role_id = str(uuid.uuid4())
- cursor.execute("""
- INSERT INTO roles (id, name, display_name, description, is_active, is_system, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW())
- """, (role_id, role_data['name'], role_data['display_name'], role_data.get('description'),
- role_data.get('is_active', True), False))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- return ApiResponse(code=0, message="角色创建成功", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- except Exception as e:
- print(f"创建角色错误: {e}")
- return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
- @app.put("/api/v1/admin/roles/{role_id}")
- async def update_role(
- role_id: str,
- role_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """更新角色"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- cursor = conn.cursor()
-
- # 检查是否为系统角色
- cursor.execute("SELECT is_system FROM roles WHERE id = %s", (role_id,))
- role = cursor.fetchone()
- if not role:
- cursor.close()
- conn.close()
- return ApiResponse(code=404, message="角色不存在", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- if role[0]: # is_system
- cursor.close()
- conn.close()
- return ApiResponse(code=400, message="不能修改系统角色", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- # 更新角色
- update_fields = []
- update_values = []
-
- for field in ['display_name', 'description', 'is_active']:
- if field in role_data:
- update_fields.append(f'{field} = %s')
- update_values.append(role_data[field])
-
- if update_fields:
- update_values.append(role_id)
- cursor.execute(f"""
- UPDATE roles
- SET {', '.join(update_fields)}, updated_at = NOW()
- WHERE id = %s
- """, update_values)
-
- conn.commit()
- cursor.close()
- conn.close()
-
- return ApiResponse(code=0, message="角色更新成功", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- except Exception as e:
- print(f"更新角色错误: {e}")
- return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
- @app.delete("/api/v1/admin/roles/{role_id}")
- async def delete_role(
- role_id: str,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """删除角色"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- cursor = conn.cursor()
-
- # 检查是否为系统角色
- cursor.execute("SELECT is_system FROM roles WHERE id = %s", (role_id,))
- role = cursor.fetchone()
- if not role:
- cursor.close()
- conn.close()
- return ApiResponse(code=404, message="角色不存在", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- if role[0]: # is_system
- cursor.close()
- conn.close()
- return ApiResponse(code=400, message="不能删除系统角色", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- # 检查是否有用户使用此角色
- cursor.execute("SELECT COUNT(*) FROM user_roles WHERE role_id = %s", (role_id,))
- if cursor.fetchone()[0] > 0:
- cursor.close()
- conn.close()
- return ApiResponse(code=400, message="该角色正在被使用,无法删除", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- # 删除角色相关数据
- cursor.execute("DELETE FROM role_permissions WHERE role_id = %s", (role_id,))
- cursor.execute("DELETE FROM role_menus WHERE role_id = %s", (role_id,))
- cursor.execute("DELETE FROM roles WHERE id = %s", (role_id,))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- return ApiResponse(code=0, message="角色删除成功", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- except Exception as e:
- print(f"删除角色错误: {e}")
- return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
- # 角色菜单权限管理API
- @app.get("/api/v1/admin/roles/{role_id}/menus")
- async def get_role_menus(
- role_id: str,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """获取角色的菜单权限"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=401,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 检查管理员权限
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(
- code=403,
- message="权限不足",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 检查角色是否存在
- cursor.execute("SELECT id, name FROM roles WHERE id = %s", (role_id,))
- role = cursor.fetchone()
-
- if not role:
- cursor.close()
- conn.close()
- return ApiResponse(
- code=404,
- message="角色不存在",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 检查是否为超级管理员角色
- role_name = role[1]
- is_super_admin_role = role_name == "super_admin"
-
- if is_super_admin_role:
- # 超级管理员默认拥有所有菜单权限
- cursor.execute("""
- SELECT id, name, title, parent_id, menu_type
- FROM menus
- WHERE is_active = 1
- ORDER BY sort_order
- """)
- menu_permissions = cursor.fetchall()
- else:
- # 普通角色查询已分配的菜单权限
- cursor.execute("""
- SELECT m.id, m.name, m.title, m.parent_id, m.menu_type
- FROM role_menus rm
- JOIN menus m ON rm.menu_id = m.id
- WHERE rm.role_id = %s AND m.is_active = 1
- ORDER BY m.sort_order
- """, (role_id,))
- menu_permissions = cursor.fetchall()
-
- cursor.close()
- conn.close()
-
- # 构建返回数据
- menu_ids = [menu[0] for menu in menu_permissions]
- menu_details = []
-
- for menu in menu_permissions:
- menu_details.append({
- "id": menu[0],
- "name": menu[1],
- "title": menu[2],
- "parent_id": menu[3],
- "menu_type": menu[4]
- })
-
- return ApiResponse(
- code=0,
- message="获取角色菜单权限成功",
- data={
- "role_id": role_id,
- "role_name": role[1],
- "menu_ids": menu_ids,
- "menu_details": menu_details,
- "total": len(menu_ids)
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"获取角色菜单权限错误: {e}")
- return ApiResponse(
- code=500,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- @app.put("/api/v1/admin/roles/{role_id}/menus")
- async def update_role_menus(
- role_id: str,
- request: Request,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """更新角色的菜单权限"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(
- code=401,
- message="无效的访问令牌",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 检查管理员权限
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(
- code=403,
- message="权限不足",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 获取请求数据
- body = await request.json()
- menu_ids = body.get("menu_ids", [])
-
- if not isinstance(menu_ids, list):
- return ApiResponse(
- code=400,
- message="菜单ID列表格式错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(
- code=500,
- message="数据库连接失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- cursor = conn.cursor()
-
- # 检查角色是否存在
- cursor.execute("SELECT id, name FROM roles WHERE id = %s", (role_id,))
- role = cursor.fetchone()
-
- if not role:
- cursor.close()
- conn.close()
- return ApiResponse(
- code=404,
- message="角色不存在",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 检查是否为超级管理员角色
- role_name = role[1]
- is_super_admin_role = role_name == "super_admin"
-
- if is_super_admin_role:
- # 超级管理员角色不允许修改权限,始终拥有全部权限
- cursor.close()
- conn.close()
- return ApiResponse(
- code=400,
- message="超级管理员角色拥有全部权限,无需修改",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 验证菜单ID是否存在
- if menu_ids:
- placeholders = ','.join(['%s'] * len(menu_ids))
- cursor.execute(f"""
- SELECT id FROM menus
- WHERE id IN ({placeholders}) AND is_active = 1
- """, menu_ids)
-
- valid_menu_ids = [row[0] for row in cursor.fetchall()]
- invalid_menu_ids = set(menu_ids) - set(valid_menu_ids)
-
- if invalid_menu_ids:
- cursor.close()
- conn.close()
- return ApiResponse(
- code=400,
- message=f"无效的菜单ID: {', '.join(invalid_menu_ids)}",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- # 开始事务
- cursor.execute("START TRANSACTION")
-
- try:
- # 删除角色现有的菜单权限
- cursor.execute("DELETE FROM role_menus WHERE role_id = %s", (role_id,))
-
- # 添加新的菜单权限
- if menu_ids:
- values = [(role_id, menu_id) for menu_id in menu_ids]
- cursor.executemany("""
- INSERT INTO role_menus (role_id, menu_id, created_at)
- VALUES (%s, %s, NOW())
- """, values)
-
- # 提交事务
- conn.commit()
-
- cursor.close()
- conn.close()
-
- return ApiResponse(
- code=0,
- message="角色菜单权限更新成功",
- data={
- "role_id": role_id,
- "role_name": role[1],
- "menu_ids": menu_ids,
- "updated_count": len(menu_ids)
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- # 回滚事务
- conn.rollback()
- cursor.close()
- conn.close()
- raise e
-
- except Exception as e:
- print(f"更新角色菜单权限错误: {e}")
- return ApiResponse(
- code=500,
- message="服务器内部错误",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- # 菜单管理API
- @app.post("/api/v1/admin/menus")
- async def create_menu(
- menu_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """创建菜单"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- cursor = conn.cursor()
-
- # 检查菜单名是否已存在
- cursor.execute("SELECT id FROM menus WHERE name = %s", (menu_data['name'],))
- if cursor.fetchone():
- cursor.close()
- conn.close()
- return ApiResponse(code=400, message="菜单标识已存在", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- # 创建菜单
- menu_id = str(uuid.uuid4())
- cursor.execute("""
- INSERT INTO menus (id, parent_id, name, title, path, component, icon,
- sort_order, menu_type, is_hidden, is_active, description, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """, (
- menu_id, menu_data.get('parent_id'), menu_data['name'], menu_data['title'],
- menu_data.get('path'), menu_data.get('component'), menu_data.get('icon'),
- menu_data.get('sort_order', 0), menu_data.get('menu_type', 'menu'),
- menu_data.get('is_hidden', False), menu_data.get('is_active', True),
- menu_data.get('description')
- ))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- return ApiResponse(code=0, message="菜单创建成功", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- except Exception as e:
- print(f"创建菜单错误: {e}")
- return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
- @app.put("/api/v1/admin/menus/{menu_id}")
- async def update_menu(
- menu_id: str,
- menu_data: dict,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """更新菜单"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- cursor = conn.cursor()
-
- # 更新菜单
- update_fields = []
- update_values = []
-
- for field in ['parent_id', 'title', 'path', 'component', 'icon', 'sort_order',
- 'menu_type', 'is_hidden', 'is_active', 'description']:
- if field in menu_data:
- update_fields.append(f'{field} = %s')
- update_values.append(menu_data[field])
-
- if update_fields:
- update_values.append(menu_id)
- cursor.execute(f"""
- UPDATE menus
- SET {', '.join(update_fields)}, updated_at = NOW()
- WHERE id = %s
- """, update_values)
-
- conn.commit()
- cursor.close()
- conn.close()
-
- return ApiResponse(code=0, message="菜单更新成功", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- except Exception as e:
- print(f"更新菜单错误: {e}")
- return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
- @app.delete("/api/v1/admin/menus/{menu_id}")
- async def delete_menu(
- menu_id: str,
- credentials: HTTPAuthorizationCredentials = Depends(security)
- ):
- """删除菜单"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- is_superuser = payload.get("is_superuser", False)
- if not is_superuser:
- return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- cursor = conn.cursor()
-
- # 检查是否有子菜单
- cursor.execute("SELECT COUNT(*) FROM menus WHERE parent_id = %s", (menu_id,))
- if cursor.fetchone()[0] > 0:
- cursor.close()
- conn.close()
- return ApiResponse(code=400, message="该菜单下有子菜单,无法删除", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- # 删除菜单相关数据
- cursor.execute("DELETE FROM role_menus WHERE menu_id = %s", (menu_id,))
- cursor.execute("DELETE FROM menus WHERE id = %s", (menu_id,))
-
- conn.commit()
- cursor.close()
- conn.close()
-
- return ApiResponse(code=0, message="菜单删除成功", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- except Exception as e:
- print(f"删除菜单错误: {e}")
- return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
- # 获取所有角色(用于下拉选择)
- @app.get("/api/v1/roles/all")
- async def get_all_roles_simple(credentials: HTTPAuthorizationCredentials = Depends(security)):
- """获取所有角色(简化版,用于下拉选择)"""
- try:
- payload = verify_token(credentials.credentials)
- if not payload:
- return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- conn = get_db_connection()
- if not conn:
- return ApiResponse(code=500, message="数据库连接失败", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- cursor = conn.cursor()
-
- cursor.execute("""
- SELECT id, name, display_name, is_system, is_active
- FROM roles
- WHERE is_active = 1
- ORDER BY is_system DESC, display_name
- """)
-
- roles = []
- for row in cursor.fetchall():
- roles.append({
- "id": row[0],
- "name": row[1],
- "display_name": row[2],
- "is_system": bool(row[3]),
- "is_active": bool(row[4])
- })
-
- cursor.close()
- conn.close()
-
- return ApiResponse(code=0, message="获取角色列表成功", data=roles, timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
-
- except Exception as e:
- print(f"获取角色列表错误: {e}")
- return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
- if __name__ == "__main__":
- import uvicorn
-
- # 查找可用端口
- port = find_available_port()
-
- if port is None:
- print("❌ 无法找到可用端口 (8000-8010)")
- print("请手动停止占用这些端口的进程")
- sys.exit(1)
-
- print("=" * 60)
- print("🚀 SSO认证中心完整服务器")
- print("=" * 60)
- print(f"✅ 找到可用端口: {port}")
- print(f"🌐 访问地址: http://localhost:{port}")
- print(f"📚 API文档: http://localhost:{port}/docs")
- print(f"❤️ 健康检查: http://localhost:{port}/health")
- print(f"🔐 登录API: http://localhost:{port}/api/v1/auth/login")
- print("=" * 60)
- print("📝 前端配置:")
- print(f" VITE_API_BASE_URL=http://localhost:{port}")
- print("=" * 60)
- print("👤 测试账号:")
- print(" 用户名: admin")
- print(" 密码: Admin123456")
- print("=" * 60)
- print("按 Ctrl+C 停止服务器")
- print()
-
- try:
- uvicorn.run(
- app,
- host="0.0.0.0",
- port=port,
- log_level="info"
- )
- except KeyboardInterrupt:
- print("\n👋 服务器已停止")
- except Exception as e:
- print(f"❌ 启动失败: {e}")
- sys.exit(1)
- @app.get("/api/v1/auth/captcha")
- async def get_captcha():
- """获取验证码"""
- try:
- # 生成验证码
- captcha_text, captcha_image = generate_captcha()
-
- # 这里应该将验证码文本存储到缓存中(Redis或内存)
- # 为了简化,我们暂时返回固定的验证码
- captcha_id = secrets.token_hex(16)
-
- return ApiResponse(
- code=0,
- message="获取验证码成功",
- data={
- "captcha_id": captcha_id,
- "captcha_image": captcha_image,
- "captcha_text": captcha_text # 生产环境中不应该返回这个
- },
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
-
- except Exception as e:
- print(f"生成验证码错误: {e}")
- return ApiResponse(
- code=500001,
- message="生成验证码失败",
- timestamp=datetime.now(timezone.utc).isoformat()
- ).model_dump()
- def generate_captcha():
- """生成验证码"""
- try:
- import random
- import string
- import base64
-
- # 生成随机验证码文本
- captcha_text = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
-
- # 创建一个简单的SVG验证码
- svg_captcha = f"""
- <svg width="120" height="40" xmlns="http://www.w3.org/2000/svg">
- <rect width="120" height="40" fill="#f0f0f0" stroke="#ccc"/>
- <text x="60" y="25" font-family="Arial" font-size="18" text-anchor="middle" fill="#333">{captcha_text}</text>
- </svg>
- """
- svg_base64 = base64.b64encode(svg_captcha.encode('utf-8')).decode('utf-8')
- return captcha_text, f"data:image/svg+xml;base64,{svg_base64}"
-
- except Exception as e:
- print(f"生成验证码失败: {e}")
- # 返回默认验证码
- return "1234", "data:image/svg+xml;base64,PHN2ZyB3aWR0aD0iMTIwIiBoZWlnaHQ9IjQwIiB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciPjxyZWN0IHdpZHRoPSIxMjAiIGhlaWdodD0iNDAiIGZpbGw9IiNmMGYwZjAiIHN0cm9rZT0iI2NjYyIvPjx0ZXh0IHg9IjYwIiB5PSIyNSIgZm9udC1mYW1pbHk9IkFyaWFsIiBmb250LXNpemU9IjE4IiB0ZXh0LWFuY2hvcj0ibWlkZGxlIiBmaWxsPSIjMzMzIj4xMjM0PC90ZXh0Pjwvc3ZnPg=="
|