| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205 |
- import base64
- import hashlib
- import json
- import os
- import re
- import shutil
- import threading
- import time
- import uuid
- from datetime import timedelta, timezone
- from decimal import Decimal
- from pathlib import Path
- from typing import Any, Iterable
- from urllib.parse import quote, quote_plus, urlencode, urlsplit, urlunsplit
- import requests
- from flask import Flask, Response, abort, has_request_context, jsonify, redirect, render_template, request, send_file, session, url_for
- from werkzeug.security import check_password_hash, generate_password_hash
- from werkzeug.utils import secure_filename
- from .audit import audit
- from .auth import current_admin, current_user, extend_vip, is_vip_active, require_admin, require_user
- from .context import get_config
- from .core import isoformat, parse_datetime, parse_int, utcnow
- from .db import IntegrityError, db_status, execute, fetch_all, fetch_one, get_active_backend, switch_database
- from .gogs import (
- GogsGitError,
- gogs_archive_get,
- gogs_branches,
- gogs_commits,
- gogs_contents,
- gogs_create_repo,
- gogs_delete_repo,
- gogs_git_archive_zip,
- gogs_git_archive_zip_commit,
- gogs_git_list_refs,
- gogs_git_delete_path,
- gogs_git_write_file,
- gogs_my_repos,
- gogs_repo_info,
- gogs_resolve_ref_commit,
- gogs_tags,
- gogs_user_repos,
- )
- from .settings import delete_setting_value, get_setting_value, set_setting_value
- def register_routes(app: Flask) -> None:
- def create_user_message(
- user_id: int,
- title: str,
- content: str,
- *,
- sender_type: str = "SYSTEM",
- sender_id: int | None = None,
- ) -> int:
- cur = execute(
- """
- INSERT INTO user_messages (user_id, title, content, created_at, sender_type, sender_id)
- VALUES (?, ?, ?, ?, ?, ?)
- """,
- (
- user_id,
- (title or "").strip()[:120],
- (content or "").strip()[:4000],
- isoformat(utcnow()),
- (sender_type or "SYSTEM").strip().upper()[:16],
- sender_id,
- ),
- )
- return int(cur.lastrowid)
- def _gogs_base_url_and_token() -> tuple[str, str]:
- config = get_config()
- base_url = (get_setting_value("GOGS_BASE_URL") or config.gogs_base_url or "").strip().rstrip("/")
- token = get_setting_value("GOGS_TOKEN")
- if token is not None:
- token = token.strip() or None
- if token is None:
- token = (config.gogs_token or "").strip() or None
- return base_url, (token or "").strip()
- def _gogs_error_message(resp: requests.Response) -> str | None:
- try:
- data = resp.json() if resp is not None else None
- except Exception:
- data = None
- if isinstance(data, dict):
- msg = data.get("message") or data.get("error") or data.get("error_description")
- if msg:
- return str(msg)[:300]
- try:
- text = (resp.text or "").strip()
- except Exception:
- text = ""
- return text[:300] or None
- def _safe_upstream_url(resp: requests.Response) -> str | None:
- try:
- url = (getattr(resp, "url", None) or "").strip()
- except Exception:
- url = ""
- if not url:
- return None
- url = re.sub(r"([?&])token=[^&]+", r"\1token=***", url)
- url = re.sub(r"//[^/@]*@", "//***@", url)
- return url[:500]
- def _looks_like_html(text: str | None) -> bool:
- s = (text or "").lstrip().lower()
- return s.startswith("<!doctype html") or s.startswith("<html") or s.startswith("<head")
- def _alipay_wrap_key(key: str | None, kind: str) -> str:
- s = (key or "").strip()
- if not s:
- return ""
- if "BEGIN " in s and "END " in s:
- return s
- body = re.sub(r"\s+", "", s)
- if not body:
- return ""
- header = "-----BEGIN PRIVATE KEY-----" if kind == "private" else "-----BEGIN PUBLIC KEY-----"
- footer = "-----END PRIVATE KEY-----" if kind == "private" else "-----END PUBLIC KEY-----"
- lines = [body[i : i + 64] for i in range(0, len(body), 64)]
- return "\n".join([header, *lines, footer, ""])
- def _alipay_sign_content(params: dict[str, Any]) -> str:
- items: list[tuple[str, str]] = []
- for k, v in (params or {}).items():
- if v is None:
- continue
- sv = str(v)
- if sv == "":
- continue
- items.append((str(k), sv))
- items.sort(key=lambda x: x[0])
- return "&".join([f"{k}={v}" for k, v in items])
- def _alipay_rsa2_sign(sign_content: str, private_key: str) -> str:
- try:
- from Crypto.Hash import SHA256
- from Crypto.PublicKey import RSA
- from Crypto.Signature import pkcs1_15
- except Exception as e:
- raise RuntimeError("pycryptodome_required") from e
- key = RSA.import_key(_alipay_wrap_key(private_key, "private"))
- h = SHA256.new((sign_content or "").encode("utf-8"))
- sig = pkcs1_15.new(key).sign(h)
- return base64.b64encode(sig).decode("utf-8")
- def _alipay_rsa2_verify(sign_content: str, signature_b64: str, public_key: str) -> bool:
- try:
- from Crypto.Hash import SHA256
- from Crypto.PublicKey import RSA
- from Crypto.Signature import pkcs1_15
- except Exception as e:
- raise RuntimeError("pycryptodome_required") from e
- key = RSA.import_key(_alipay_wrap_key(public_key, "public"))
- h = SHA256.new((sign_content or "").encode("utf-8"))
- try:
- pkcs1_15.new(key).verify(h, base64.b64decode(signature_b64 or ""))
- return True
- except Exception:
- return False
- def _parse_keywords(value: Any) -> list[str]:
- if isinstance(value, list):
- parts = [str(x).strip() for x in value]
- else:
- raw = str(value or "")
- parts = re.split(r"[,\n\r\t ]+", raw)
- items: list[str] = []
- seen: set[str] = set()
- for p in parts:
- p = (p or "").strip()
- if not p:
- continue
- if len(p) > 32:
- p = p[:32]
- k = p.lower()
- if k in seen:
- continue
- seen.add(k)
- items.append(p)
- if len(items) >= 20:
- break
- return items
- def _slugify_repo_name(title: str) -> str:
- s = (title or "").strip().lower()
- s = re.sub(r"[^a-z0-9]+", "-", s)
- s = s.strip("-")
- if not s:
- s = f"resource-{uuid.uuid4().hex[:8]}"
- if len(s) > 50:
- s = s[:50].rstrip("-")
- return s
- def _uploads_dir() -> Path:
- project_root = Path(__file__).resolve().parent.parent
- d = project_root / "static" / "uploads"
- d.mkdir(parents=True, exist_ok=True)
- return d
- def _extract_upload_names(value: Any) -> set[str]:
- s = str(value or "").strip()
- if not s:
- return set()
- names: set[str] = set()
- for m in re.finditer(r"(?i)(?:/static/uploads/|/uploads/|/)([0-9a-f]{32}(?:\.[a-z0-9]+)?)", s):
- names.add(m.group(1))
- if s.startswith("/static/uploads/") or s.startswith("static/uploads/") or s.startswith("uploads/"):
- name = os.path.basename(s)
- if re.fullmatch(r"(?i)[0-9a-f]{32}(?:\.[a-z0-9]+)?", name or ""):
- names.add(name)
- return names
- def _delete_upload_files(names: set[str]) -> None:
- if not names:
- return
- base = _uploads_dir().resolve()
- for name in names:
- n = os.path.basename(str(name or ""))
- if not re.fullmatch(r"(?i)[0-9a-f]{32}(?:\.[a-z0-9]+)?", n or ""):
- continue
- p = (base / n).resolve()
- if p.parent != base:
- continue
- try:
- p.unlink(missing_ok=True)
- except Exception:
- pass
- def _normalize_upload_prefix(prefix: Any) -> str:
- p = str(prefix or "").strip().replace("\\", "/")
- p = p.lstrip("/")
- if not p:
- p = "uploads/"
- if not p.endswith("/"):
- p = f"{p}/"
- return p
- def _get_upload_storage_mode() -> str:
- v = (get_setting_value("STORAGE_PROVIDER") or "").strip().upper()
- if v in {"LOCAL", "OSS", "AUTO"}:
- return v
- return "AUTO"
- def _get_oss_upload_config() -> dict[str, Any]:
- endpoint = str(get_setting_value("OSS_ENDPOINT") or "").strip().rstrip("/")
- bucket = str(get_setting_value("OSS_BUCKET") or "").strip()
- access_key_id = str(get_setting_value("OSS_ACCESS_KEY_ID") or "").strip()
- access_key_secret = str(get_setting_value("OSS_ACCESS_KEY_SECRET") or "").strip()
- upload_prefix = _normalize_upload_prefix(get_setting_value("OSS_UPLOAD_PREFIX") or "uploads/")
- public_base_url = str(get_setting_value("OSS_PUBLIC_BASE_URL") or "").strip().rstrip("/")
- ok = bool(endpoint and bucket and access_key_id and access_key_secret)
- return {
- "ok": ok,
- "endpoint": endpoint,
- "bucket": bucket,
- "accessKeyId": access_key_id,
- "accessKeySecret": access_key_secret,
- "uploadPrefix": upload_prefix,
- "publicBaseUrl": public_base_url,
- }
- def _build_oss_public_url(*, public_base_url: str, endpoint: str, bucket: str, key: str) -> str:
- key = str(key or "").lstrip("/")
- if public_base_url:
- return f"{public_base_url.rstrip('/')}/{key}"
- try:
- parts = urlsplit(endpoint)
- scheme = parts.scheme or "https"
- host = parts.netloc
- if not host:
- return f"/{key}"
- host = host.split("@", 1)[-1]
- if host.startswith(f"{bucket}."):
- full_host = host
- else:
- full_host = f"{bucket}.{host}"
- return urlunsplit((scheme, full_host, f"/{key}", "", ""))
- except Exception:
- return f"/{key}"
- def _guess_upload_kind(ext: str) -> str:
- e = (ext or "").lower()
- if e in {".png", ".jpg", ".jpeg", ".gif", ".webp"}:
- return "image"
- if e in {".mp4", ".webm", ".mov", ".m4v"}:
- return "video"
- return "file"
- class _LocalUploadStorage:
- def save_upload(self, file_storage: Any, name: str) -> dict[str, Any]:
- out = _uploads_dir() / name
- file_storage.save(out)
- return {"name": name, "url": f"/static/uploads/{name}"}
- def delete_uploads(self, names: set[str]) -> None:
- _delete_upload_files(names)
- def list_items(self) -> list[dict[str, Any]]:
- base = _uploads_dir().resolve()
- all_items: list[dict[str, Any]] = []
- for p in base.iterdir():
- if not p.is_file():
- continue
- name = p.name
- if not re.fullmatch(r"(?i)[0-9a-f]{32}(?:\.[a-z0-9]+)?", name or ""):
- continue
- try:
- st = p.stat()
- except Exception:
- continue
- ext = p.suffix.lower()
- all_items.append(
- {
- "name": name,
- "url": f"/static/uploads/{name}",
- "bytes": int(getattr(st, "st_size", 0) or 0),
- "mtime": int(getattr(st, "st_mtime", 0) or 0),
- "ext": ext,
- "kind": _guess_upload_kind(ext),
- }
- )
- return all_items
- class _OssUploadStorage:
- def __init__(self, cfg: dict[str, Any]):
- try:
- import oss2 # type: ignore
- except Exception:
- raise RuntimeError("oss_client_missing")
- endpoint = str(cfg.get("endpoint") or "").strip().rstrip("/")
- bucket = str(cfg.get("bucket") or "").strip()
- access_key_id = str(cfg.get("accessKeyId") or "").strip()
- access_key_secret = str(cfg.get("accessKeySecret") or "").strip()
- self._upload_prefix = str(cfg.get("uploadPrefix") or "uploads/")
- self._public_base_url = str(cfg.get("publicBaseUrl") or "").strip().rstrip("/")
- if not endpoint or not bucket or not access_key_id or not access_key_secret:
- raise RuntimeError("oss_not_configured")
- auth = oss2.Auth(access_key_id, access_key_secret)
- self._bucket_name = bucket
- self._endpoint = endpoint
- self._bucket = oss2.Bucket(auth, endpoint, bucket)
- self._oss2 = oss2
- def _key_for_name(self, name: str) -> str:
- n = os.path.basename(str(name or ""))
- return f"{self._upload_prefix}{n}"
- def save_upload(self, file_storage: Any, name: str) -> dict[str, Any]:
- key = self._key_for_name(name)
- try:
- file_storage.stream.seek(0)
- except Exception:
- pass
- self._bucket.put_object(key, file_storage.stream)
- url = _build_oss_public_url(public_base_url=self._public_base_url, endpoint=self._endpoint, bucket=self._bucket_name, key=key)
- return {"name": os.path.basename(name), "url": url}
- def delete_uploads(self, names: set[str]) -> None:
- for name in names:
- n = os.path.basename(str(name or ""))
- if not re.fullmatch(r"(?i)[0-9a-f]{32}(?:\.[a-z0-9]+)?", n or ""):
- continue
- key = self._key_for_name(n)
- try:
- self._bucket.delete_object(key)
- except Exception:
- pass
- def list_items(self) -> list[dict[str, Any]]:
- items: list[dict[str, Any]] = []
- for obj in self._oss2.ObjectIterator(self._bucket, prefix=self._upload_prefix):
- key = str(getattr(obj, "key", "") or "")
- if not key.startswith(self._upload_prefix):
- continue
- name = key[len(self._upload_prefix) :]
- if not name or "/" in name:
- continue
- if not re.fullmatch(r"(?i)[0-9a-f]{32}(?:\.[a-z0-9]+)?", name or ""):
- continue
- ext = os.path.splitext(name)[1].lower()
- url = _build_oss_public_url(public_base_url=self._public_base_url, endpoint=self._endpoint, bucket=self._bucket_name, key=key)
- items.append(
- {
- "name": name,
- "url": url,
- "bytes": int(getattr(obj, "size", 0) or 0),
- "mtime": int(getattr(obj, "last_modified", 0) or 0),
- "ext": ext,
- "kind": _guess_upload_kind(ext),
- }
- )
- return items
- def _get_upload_storage() -> Any:
- mode = _get_upload_storage_mode()
- oss_cfg = _get_oss_upload_config()
- if mode == "LOCAL":
- return _LocalUploadStorage()
- if mode == "OSS":
- return _OssUploadStorage(oss_cfg)
- if oss_cfg.get("ok"):
- return _OssUploadStorage(oss_cfg)
- return _LocalUploadStorage()
- def _guest_can_preview_repo_path(path: str) -> bool:
- p = (path or "").strip().replace("\\", "/").lstrip("/")
- if not p:
- return False
- base = os.path.basename(p).lower()
- if base in {
- ".env",
- ".env.local",
- ".env.development",
- ".env.production",
- ".env.test",
- "id_rsa",
- "id_dsa",
- "id_ed25519",
- "id_ecdsa",
- }:
- return False
- ext = os.path.splitext(base)[1].lower()
- if ext in {".key", ".pem", ".p12", ".pfx"}:
- return False
- if base.startswith(("readme", "license", "changelog")):
- return True
- return ext in {".md", ".txt", ".json", ".yml", ".yaml", ".toml", ".ini", ".conf"}
- @app.get("/")
- def page_index() -> str:
- return render_template("index.html")
- @app.get("/ui/resources")
- def page_resources() -> str:
- return render_template("resources.html")
- @app.get("/ui/resources/<int:resource_id>")
- def page_resource_detail(resource_id: int) -> str:
- return render_template("resource_detail.html", resource_id=resource_id)
- @app.get("/ui/login")
- def page_login() -> str:
- return render_template("login.html")
- @app.get("/ui/register")
- def page_register() -> str:
- return render_template("register.html")
- @app.get("/ui/me")
- def page_me() -> str:
- return render_template("me.html")
- @app.get("/ui/messages")
- def page_messages() -> str:
- return render_template("messages.html")
- @app.get("/ui/vip")
- def page_vip() -> str:
- return render_template("vip.html")
- @app.get("/ui/admin")
- def page_admin() -> Response:
- if current_admin() is None:
- return redirect(url_for("page_admin_login"))
- return render_template("admin.html")
- @app.get("/ui/admin/login")
- def page_admin_login() -> str:
- return render_template("admin_login.html")
- @app.get("/admin")
- def page_admin_shortcut() -> Response:
- return redirect(url_for("page_admin"))
- @app.get("/admin/login")
- def page_admin_login_shortcut() -> Response:
- return redirect(url_for("page_admin_login"))
- @app.post("/auth/register")
- def api_register() -> Response:
- payload = request.get_json(silent=True) or {}
- phone = (payload.get("phone") or "").strip()
- password = payload.get("password") or ""
- if not phone or not password:
- return jsonify({"error": "phone_and_password_required"}), 400
- if len(password) < 6:
- return jsonify({"error": "password_too_short"}), 400
- created_at = isoformat(utcnow())
- try:
- cur = execute(
- "INSERT INTO users (phone, password_hash, status, created_at) VALUES (?, ?, 'ACTIVE', ?)",
- (phone, generate_password_hash(password), created_at),
- )
- except IntegrityError:
- return jsonify({"error": "phone_exists"}), 409
- session["user_id"] = cur.lastrowid
- return jsonify({"id": cur.lastrowid, "phone": phone, "vipExpireAt": None})
- @app.post("/auth/login")
- def api_login() -> Response:
- payload = request.get_json(silent=True) or {}
- phone = (payload.get("phone") or "").strip()
- password = payload.get("password") or ""
- user = fetch_one("SELECT * FROM users WHERE phone = ?", (phone,))
- if user is None or not check_password_hash(user["password_hash"], password):
- return jsonify({"error": "invalid_credentials"}), 401
- if user["status"] != "ACTIVE":
- return jsonify({"error": "user_disabled"}), 403
- session["user_id"] = user["id"]
- return jsonify({"id": user["id"], "phone": user["phone"], "vipExpireAt": user["vip_expire_at"]})
- @app.post("/auth/logout")
- def api_logout() -> Response:
- session.pop("user_id", None)
- return jsonify({"ok": True})
- @app.get("/me")
- def api_me() -> Response:
- user = current_user()
- if user is None:
- return jsonify({"user": None})
- return jsonify(
- {
- "user": {
- "id": user["id"],
- "phone": user["phone"],
- "vipExpireAt": user["vip_expire_at"],
- "vipActive": is_vip_active(user),
- }
- }
- )
- @app.get("/plans")
- def api_plans() -> Response:
- rows = fetch_all("SELECT * FROM plans WHERE enabled = 1 ORDER BY sort DESC, id DESC")
- return jsonify(
- [
- {
- "id": row["id"],
- "name": row["name"],
- "durationDays": row["duration_days"],
- "priceCents": row["price_cents"],
- }
- for row in rows
- ]
- )
- @app.get("/resources")
- def api_resources() -> Response:
- q = (request.args.get("q") or "").strip()
- resource_type = (request.args.get("type") or "").strip().upper()
- sort = (request.args.get("sort") or "latest").strip()
- page = max(parse_int(request.args.get("page"), 1), 1)
- page_size = min(max(parse_int(request.args.get("pageSize"), 12), 1), 50)
- where = ["status = 'ONLINE'"]
- params: list[Any] = []
- if q:
- where.append("(title LIKE ? OR summary LIKE ?)")
- params.extend([f"%{q}%", f"%{q}%"])
- if resource_type in {"FREE", "VIP"}:
- where.append("type = ?")
- params.append(resource_type)
- if sort == "hot":
- order_by = "view_count DESC, id DESC"
- else:
- order_by = "updated_at DESC, id DESC"
- where_sql = " AND ".join(where)
- total_row = fetch_one(f"SELECT COUNT(1) AS cnt FROM resources WHERE {where_sql}", params)
- total = int(total_row["cnt"] if total_row is not None else 0)
- offset = (page - 1) * page_size
- rows = fetch_all(
- f"""
- SELECT * FROM resources
- WHERE {where_sql}
- ORDER BY {order_by}
- LIMIT ? OFFSET ?
- """,
- params + [page_size, offset],
- )
- items = []
- for row in rows:
- try:
- tags = json.loads(row["tags_json"] or "[]")
- except Exception:
- tags = []
- items.append(
- {
- "id": row["id"],
- "title": row["title"],
- "summary": row["summary"],
- "type": row["type"],
- "coverUrl": (str(row["cover_url"]).strip() if row["cover_url"] is not None else "") or "/static/images/resources/default.png",
- "tags": tags,
- "updatedAt": row["updated_at"],
- "viewCount": row["view_count"],
- "downloadCount": row["download_count"],
- "repo": {
- "owner": row["repo_owner"],
- "name": row["repo_name"],
- "defaultRef": row["default_ref"],
- },
- }
- )
- return jsonify({"items": items, "page": page, "pageSize": page_size, "total": total})
- @app.get("/resources/<int:resource_id>")
- def api_resource_detail(resource_id: int) -> Response:
- row = fetch_one("SELECT * FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,))
- if row is None:
- abort(404)
- execute("UPDATE resources SET view_count = view_count + 1 WHERE id = ?", (resource_id,))
- try:
- tags = json.loads(row["tags_json"] or "[]")
- except Exception:
- tags = []
- return jsonify(
- {
- "id": row["id"],
- "title": row["title"],
- "summary": row["summary"],
- "type": row["type"],
- "coverUrl": (str(row["cover_url"]).strip() if row["cover_url"] is not None else "") or "/static/images/resources/default.png",
- "tags": tags,
- "updatedAt": row["updated_at"],
- "viewCount": row["view_count"] + 1,
- "downloadCount": row["download_count"],
- "repo": {
- "owner": row["repo_owner"],
- "name": row["repo_name"],
- "htmlUrl": row["repo_html_url"],
- "defaultRef": row["default_ref"],
- "private": bool(row["repo_private"]),
- },
- }
- )
- @app.get("/resources/<int:resource_id>/repo/refs")
- def api_repo_refs(resource_id: int) -> Response:
- row = fetch_one("SELECT repo_owner, repo_name FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,))
- if row is None:
- abort(404)
- owner, repo = row["repo_owner"], row["repo_name"]
- branches_resp = gogs_branches(owner, repo)
- tags_resp = gogs_tags(owner, repo)
- if branches_resp.status_code < 400 and tags_resp.status_code < 400:
- try:
- branches = branches_resp.json()
- tags = tags_resp.json()
- except Exception:
- msg = _gogs_error_message(branches_resp) or _gogs_error_message(tags_resp)
- upstream_url = _safe_upstream_url(branches_resp) or _safe_upstream_url(tags_resp)
- return jsonify({"error": "gogs_invalid_response", "status": 200, "message": msg, "url": upstream_url}), 502
- return jsonify({"branches": [{"name": b.get("name")} for b in (branches or [])], "tags": [{"name": t.get("name")} for t in (tags or [])]})
- if branches_resp.status_code in {401, 403} or tags_resp.status_code in {401, 403}:
- msg = _gogs_error_message(branches_resp) or _gogs_error_message(tags_resp)
- upstream_url = _safe_upstream_url(branches_resp) or _safe_upstream_url(tags_resp)
- status = branches_resp.status_code if branches_resp.status_code >= 400 else tags_resp.status_code
- return jsonify({"error": "gogs_unauthorized", "status": status, "message": msg, "url": upstream_url}), 400
- if branches_resp.status_code == 599 or tags_resp.status_code == 599:
- msg = _gogs_error_message(branches_resp) or _gogs_error_message(tags_resp)
- upstream_url = _safe_upstream_url(branches_resp) or _safe_upstream_url(tags_resp)
- return jsonify({"error": "gogs_unreachable", "status": 599, "message": msg, "url": upstream_url}), 502
- try:
- return jsonify(gogs_git_list_refs(owner, repo))
- except GogsGitError as e:
- resp, status = _git_error_to_response(e)
- return resp, status
- msg = _gogs_error_message(branches_resp) or _gogs_error_message(tags_resp)
- upstream_url = _safe_upstream_url(branches_resp) or _safe_upstream_url(tags_resp)
- status = branches_resp.status_code if branches_resp.status_code >= 400 else tags_resp.status_code
- return jsonify({"error": "gogs_failed", "status": status, "message": msg, "url": upstream_url}), 502
- @app.get("/resources/<int:resource_id>/repo/tree")
- def api_repo_tree(resource_id: int) -> Response:
- ref = (request.args.get("ref") or "").strip()
- path = (request.args.get("path") or "").strip()
- user = current_user()
- row = fetch_one(
- "SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ? AND status = 'ONLINE'",
- (resource_id,),
- )
- if row is None:
- abort(404)
- owner, repo, default_ref = row["repo_owner"], row["repo_name"], row["default_ref"]
- if not ref:
- ref = default_ref
- resp = gogs_contents(owner, repo, path, ref)
- if resp.status_code == 404:
- return jsonify({"error": "path_not_found"}), 404
- if resp.status_code >= 400:
- return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502
- try:
- data = resp.json()
- except Exception:
- msg = _gogs_error_message(resp)
- upstream_url = _safe_upstream_url(resp)
- return jsonify({"error": "gogs_invalid_response", "status": resp.status_code, "message": msg, "url": upstream_url}), 502
- if not isinstance(data, list):
- return jsonify({"error": "not_a_directory"}), 400
- items = []
- for item in data:
- item_path = item.get("path") or ""
- item_type = item.get("type")
- items.append(
- {
- "name": item.get("name"),
- "path": item_path,
- "type": item_type,
- "size": item.get("size"),
- "guestAllowed": True if user is not None else (True if item_type == "dir" else _guest_can_preview_repo_path(item_path)),
- }
- )
- items.sort(key=lambda x: (0 if x["type"] == "dir" else 1, x["name"] or ""))
- return jsonify({"ref": ref, "path": path, "items": items})
- @app.get("/resources/<int:resource_id>/repo/file")
- def api_repo_file(resource_id: int) -> Response:
- config = get_config()
- ref = (request.args.get("ref") or "").strip()
- raw_path = (request.args.get("path") or "").strip()
- if not raw_path:
- return jsonify({"error": "path_required"}), 400
- path = _normalize_repo_path(raw_path) or ""
- if not path:
- return jsonify({"error": "path_invalid"}), 400
- user = current_user()
- if user is None and not _guest_can_preview_repo_path(path):
- return jsonify({"error": "login_required", "path": path}), 401
- row = fetch_one(
- "SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ? AND status = 'ONLINE'",
- (resource_id,),
- )
- if row is None:
- abort(404)
- owner, repo, default_ref = row["repo_owner"], row["repo_name"], row["default_ref"]
- if not ref:
- ref = default_ref
- resp = gogs_contents(owner, repo, path, ref)
- if resp.status_code == 404:
- return jsonify({"error": "file_not_found"}), 404
- if resp.status_code >= 400:
- return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502
- try:
- data = resp.json()
- except Exception:
- msg = _gogs_error_message(resp)
- upstream_url = _safe_upstream_url(resp)
- return jsonify({"error": "gogs_invalid_response", "status": resp.status_code, "message": msg, "url": upstream_url}), 502
- if isinstance(data, list) or data.get("type") != "file":
- return jsonify({"error": "not_a_file"}), 400
- size = parse_int(data.get("size"), 0)
- if size > config.max_preview_bytes:
- return jsonify({"error": "file_too_large", "maxBytes": config.max_preview_bytes, "size": size}), 413
- encoding = data.get("encoding")
- content = data.get("content") or ""
- if encoding != "base64":
- return jsonify({"error": "unsupported_encoding", "encoding": encoding}), 400
- try:
- raw = base64.b64decode(content, validate=False)
- except Exception:
- return jsonify({"error": "decode_failed"}), 400
- try:
- text = raw.decode("utf-8")
- is_text = True
- except UnicodeDecodeError:
- text = ""
- is_text = False
- if not is_text:
- return jsonify({"error": "binary_file_not_previewable"}), 415
- return jsonify({"ref": ref, "path": path, "content": text})
- def _normalize_repo_path(raw: str) -> str | None:
- s = (raw or "").strip().replace("\\", "/").lstrip("/")
- if not s:
- return None
- parts = [p for p in s.split("/") if p]
- if any(p == ".." for p in parts):
- return None
- if ":" in parts[0]:
- return None
- return "/".join(parts)
- def _git_error_to_response(e: GogsGitError) -> tuple[Response, int]:
- if e.code in {"path_required", "ref_required", "gogs_token_required", "invalid_gogs_base_url"}:
- return jsonify({"error": e.code, "message": e.message}), 400
- if e.code == "file_exists":
- return jsonify({"error": e.code, "message": e.message}), 409
- if e.code == "empty_repo":
- return jsonify({"error": e.code, "message": e.message}), 409
- if e.code in {"file_not_found", "path_not_found", "branch_not_found"}:
- return jsonify({"error": e.code, "message": e.message}), 404
- if e.code == "git_not_found":
- return jsonify({"error": e.code, "message": e.message}), 501
- return jsonify({"error": e.code, "message": e.message}), 502
- @app.post("/resources/<int:resource_id>/repo/file")
- def api_repo_file_create(resource_id: int) -> Response:
- _ = require_admin()
- payload = request.get_json(silent=True) or {}
- ref = (payload.get("ref") or "").strip()
- path = _normalize_repo_path(payload.get("path") or "")
- content = payload.get("content") or ""
- message = payload.get("message") or ""
- if not path:
- return jsonify({"error": "path_required"}), 400
- row = fetch_one("SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ?", (resource_id,))
- if row is None:
- abort(404)
- if not ref:
- ref = (row["default_ref"] or "").strip()
- try:
- result = gogs_git_write_file(row["repo_owner"], row["repo_name"], ref, path, str(content), str(message), must_create=True)
- except GogsGitError as e:
- resp, status = _git_error_to_response(e)
- return resp, status
- return jsonify({"ok": True, **result})
- @app.put("/resources/<int:resource_id>/repo/file")
- def api_repo_file_update(resource_id: int) -> Response:
- _ = require_admin()
- payload = request.get_json(silent=True) or {}
- ref = (payload.get("ref") or "").strip()
- path = _normalize_repo_path(payload.get("path") or "")
- content = payload.get("content") or ""
- message = payload.get("message") or ""
- if not path:
- return jsonify({"error": "path_required"}), 400
- row = fetch_one("SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ?", (resource_id,))
- if row is None:
- abort(404)
- if not ref:
- ref = (row["default_ref"] or "").strip()
- try:
- result = gogs_git_write_file(row["repo_owner"], row["repo_name"], ref, path, str(content), str(message), must_create=False)
- except GogsGitError as e:
- resp, status = _git_error_to_response(e)
- return resp, status
- return jsonify({"ok": True, **result})
- @app.delete("/resources/<int:resource_id>/repo/file")
- def api_repo_file_delete(resource_id: int) -> Response:
- _ = require_admin()
- payload = request.get_json(silent=True) or {}
- ref = (payload.get("ref") or "").strip()
- path = _normalize_repo_path(payload.get("path") or "")
- message = payload.get("message") or ""
- if not path:
- return jsonify({"error": "path_required"}), 400
- row = fetch_one("SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ?", (resource_id,))
- if row is None:
- abort(404)
- if not ref:
- ref = (row["default_ref"] or "").strip()
- try:
- result = gogs_git_delete_path(row["repo_owner"], row["repo_name"], ref, path, str(message))
- except GogsGitError as e:
- resp, status = _git_error_to_response(e)
- return resp, status
- return jsonify({"ok": True, **result})
- @app.get("/resources/<int:resource_id>/repo/commits")
- def api_repo_commits(resource_id: int) -> Response:
- ref = (request.args.get("ref") or "").strip()
- raw_path = (request.args.get("path") or "").strip()
- limit = parse_int(request.args.get("limit"), 20)
- if limit < 1:
- limit = 1
- if limit > 50:
- limit = 50
- path = ""
- if raw_path:
- path = _normalize_repo_path(raw_path) or ""
- if not path:
- return jsonify({"error": "path_invalid"}), 400
- row = fetch_one(
- "SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ? AND status = 'ONLINE'",
- (resource_id,),
- )
- if row is None:
- abort(404)
- if not ref:
- ref = (row["default_ref"] or "").strip()
- resp = gogs_commits(row["repo_owner"], row["repo_name"], ref=ref, path=path, limit=limit)
- if resp.status_code >= 400:
- msg = _gogs_error_message(resp)
- upstream_url = _safe_upstream_url(resp)
- return jsonify({"error": "gogs_failed", "status": resp.status_code, "message": msg, "url": upstream_url}), 502
- try:
- data = resp.json()
- except Exception:
- msg = _gogs_error_message(resp)
- upstream_url = _safe_upstream_url(resp)
- return jsonify({"error": "gogs_invalid_response", "status": resp.status_code, "message": msg, "url": upstream_url}), 502
- if not isinstance(data, list):
- return jsonify({"error": "gogs_invalid_response", "status": resp.status_code}), 502
- items = []
- for c in data:
- sha = (c.get("sha") or "").strip()
- commit = c.get("commit") or {}
- author = (commit.get("author") or {}) if isinstance(commit, dict) else {}
- items.append(
- {
- "sha": sha,
- "authorName": (author.get("name") or "") if isinstance(author, dict) else "",
- "authorDate": (author.get("date") or "") if isinstance(author, dict) else "",
- "subject": (commit.get("message") or "").splitlines()[0][:300] if isinstance(commit, dict) else "",
- }
- )
- return jsonify({"ref": ref, "path": path or "", "items": items})
- @app.get("/resources/<int:resource_id>/repo/readme")
- def api_repo_readme(resource_id: int) -> Response:
- config = get_config()
- ref = (request.args.get("ref") or "").strip()
- row = fetch_one(
- "SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ? AND status = 'ONLINE'",
- (resource_id,),
- )
- if row is None:
- abort(404)
- owner, repo, default_ref = row["repo_owner"], row["repo_name"], row["default_ref"]
- if not ref:
- ref = default_ref
- candidates = ["README.md", "readme.md", "README.MD", "Readme.md"]
- for name in candidates:
- resp = gogs_contents(owner, repo, name, ref)
- if resp.status_code == 404:
- continue
- if resp.status_code >= 400:
- return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502
- try:
- data = resp.json()
- except Exception:
- msg = _gogs_error_message(resp)
- upstream_url = _safe_upstream_url(resp)
- return jsonify({"error": "gogs_invalid_response", "status": resp.status_code, "message": msg, "url": upstream_url}), 502
- if data.get("type") != "file":
- continue
- size = parse_int(data.get("size"), 0)
- if size > config.max_preview_bytes:
- return jsonify({"error": "readme_too_large"}), 413
- raw = base64.b64decode(data.get("content") or "", validate=False)
- try:
- text = raw.decode("utf-8")
- except UnicodeDecodeError:
- text = raw.decode("utf-8", errors="replace")
- return jsonify({"ref": ref, "path": name, "content": text})
- return jsonify({"ref": ref, "path": None, "content": None})
- _download_lock = threading.Lock()
- _download_jobs: dict[str, dict[str, Any]] = {}
- _download_build_sema = threading.Semaphore(int(os.environ.get("DOWNLOAD_BUILD_CONCURRENCY", "2") or "2"))
- _download_git_sema = threading.Semaphore(
- int(os.environ.get("DOWNLOAD_GIT_FALLBACK_CONCURRENCY", "1") or "1")
- )
- _download_cache_ttl_seconds = int(os.environ.get("DOWNLOAD_CACHE_TTL_SECONDS", "900") or "900")
- def _download_cache_dir() -> Path:
- cfg = get_config()
- data_dir = cfg.database_path.parent
- d = data_dir / "download_cache"
- d.mkdir(parents=True, exist_ok=True)
- return d
- def _download_cache_path(*, resource_id: int, owner: str, repo: str, cache_key: str) -> Path:
- safe_owner = re.sub(r"[^a-zA-Z0-9._-]+", "_", (owner or "").strip())[:50] or "owner"
- safe_repo = re.sub(r"[^a-zA-Z0-9._-]+", "_", (repo or "").strip())[:50] or "repo"
- key = (cache_key or "").strip()
- if re.fullmatch(r"[0-9a-fA-F]{7,80}", key or ""):
- key_tag = key.lower()[:24]
- else:
- key_tag = hashlib.sha256(key.encode("utf-8")).hexdigest()[:24]
- rid = int(resource_id or 0)
- return _download_cache_dir() / f"res{rid}__{safe_owner}__{safe_repo}__{key_tag}.zip"
- def _download_cache_meta_path(zip_path: Path) -> Path:
- return zip_path.with_suffix(zip_path.suffix + ".meta.json")
- def _download_cache_ready(path: Path) -> bool:
- try:
- st = path.stat()
- except Exception:
- return False
- if st.st_size <= 0:
- return False
- if app.testing or (has_request_context() and bool(getattr(request, "environ", {}).get("werkzeug.test"))):
- return True
- if _download_cache_ttl_seconds <= 0:
- return True
- return (time.time() - float(st.st_mtime)) <= float(_download_cache_ttl_seconds)
- def _read_download_cache_meta(zip_path: Path) -> dict[str, Any] | None:
- meta_path = _download_cache_meta_path(zip_path)
- try:
- raw = meta_path.read_text(encoding="utf-8")
- except Exception:
- return None
- try:
- data = json.loads(raw)
- except Exception:
- return None
- return data if isinstance(data, dict) else None
- def _write_download_cache_meta(zip_path: Path, meta: dict[str, Any]) -> None:
- meta_path = _download_cache_meta_path(zip_path)
- tmp_meta = meta_path.with_suffix(meta_path.suffix + ".partial")
- try:
- tmp_meta.write_text(json.dumps(meta, ensure_ascii=False), encoding="utf-8")
- os.replace(tmp_meta, meta_path)
- finally:
- try:
- if tmp_meta.exists():
- tmp_meta.unlink()
- except Exception:
- pass
- def _looks_like_commit(s: str) -> bool:
- t = (s or "").strip()
- if len(t) < 7 or len(t) > 40:
- return False
- return bool(re.fullmatch(r"[0-9a-fA-F]{7,40}", t))
- def _resolve_download_commit(*, owner: str, repo: str, ref: str) -> dict[str, Any]:
- ref = (ref or "").strip() or "HEAD"
- if _looks_like_commit(ref):
- return {"ok": True, "ref": ref, "commit": ref.lower(), "kind": "commit"}
- try:
- info = gogs_resolve_ref_commit(owner, repo, ref)
- except Exception:
- info = {"ok": False, "ref": ref, "commit": None, "kind": "unknown"}
- if not isinstance(info, dict):
- return {"ok": False, "ref": ref, "commit": None, "kind": "unknown"}
- commit = (info.get("commit") or "").strip()
- if commit and _looks_like_commit(commit):
- return {"ok": True, "ref": ref, "commit": commit.lower(), "kind": info.get("kind") or "unknown"}
- return {"ok": False, "ref": ref, "commit": None, "kind": info.get("kind") or "unknown"}
- def _build_zip_to_cache(
- *, owner: str, repo: str, ref: str, commit: str | None, resolved_kind: str, out_path: Path
- ) -> None:
- out_path.parent.mkdir(parents=True, exist_ok=True)
- tmp_path = out_path.with_suffix(out_path.suffix + ".partial")
- meta: dict[str, Any] = {
- "owner": owner,
- "repo": repo,
- "ref": ref,
- "commit": (commit or "").strip() or None,
- "refKind": resolved_kind or "unknown",
- "builtAt": isoformat(utcnow()),
- }
- try:
- upstream_ref = (commit or "").strip() or ref
- upstream = gogs_archive_get(owner, repo, upstream_ref)
- if upstream.status_code == 404:
- base_url = (get_config().gogs_base_url or "").strip().rstrip("/")
- parts = urlsplit(base_url)
- if parts.scheme in {"http", "https"} and parts.netloc and not parts.username and not parts.password:
- fallback = f"{base_url}/{quote(owner)}/{quote(repo)}/archive/{quote(upstream_ref)}.zip"
- upstream = requests.get(fallback, stream=True, timeout=60, allow_redirects=False)
- if upstream.status_code < 400:
- with open(tmp_path, "wb") as f:
- for chunk in upstream.iter_content(chunk_size=1024 * 256):
- if chunk:
- f.write(chunk)
- os.replace(tmp_path, out_path)
- meta["method"] = "gogs_archive"
- meta["upstreamStatus"] = int(upstream.status_code)
- try:
- meta["bytes"] = int(out_path.stat().st_size)
- meta["mtime"] = int(out_path.stat().st_mtime)
- except Exception:
- pass
- _write_download_cache_meta(out_path, meta)
- return
- got_git = _download_git_sema.acquire(blocking=False)
- if not got_git:
- raise RuntimeError("git_fallback_busy")
- zip_path = None
- try:
- if commit and _looks_like_commit(commit):
- zip_path = gogs_git_archive_zip_commit(owner, repo, commit)
- else:
- zip_path = gogs_git_archive_zip(owner, repo, ref)
- with open(zip_path, "rb") as src, open(tmp_path, "wb") as dst:
- shutil.copyfileobj(src, dst, length=1024 * 256)
- os.replace(tmp_path, out_path)
- meta["method"] = "git_archive"
- meta["upstreamStatus"] = int(upstream.status_code) if upstream is not None else None
- try:
- meta["bytes"] = int(out_path.stat().st_size)
- meta["mtime"] = int(out_path.stat().st_mtime)
- except Exception:
- pass
- _write_download_cache_meta(out_path, meta)
- finally:
- try:
- if zip_path:
- os.unlink(zip_path)
- except Exception:
- pass
- _download_git_sema.release()
- finally:
- try:
- if tmp_path.exists():
- os.unlink(tmp_path)
- except Exception:
- pass
- def _ensure_download_ready(
- *,
- resource_id: int,
- owner: str,
- repo: str,
- ref: str,
- commit: str | None,
- resolved_kind: str,
- force: bool = False,
- ) -> dict[str, Any]:
- ref = (ref or "").strip() or "HEAD"
- commit = (commit or "").strip() or None
- cache_key = commit or ref
- cache_path = _download_cache_path(resource_id=resource_id, owner=owner, repo=repo, cache_key=cache_key)
- if force:
- try:
- if cache_path.exists():
- cache_path.unlink()
- except Exception:
- pass
- try:
- mp = _download_cache_meta_path(cache_path)
- if mp.exists():
- mp.unlink()
- except Exception:
- pass
- if _download_cache_ready(cache_path):
- return {"ready": True, "path": cache_path, "ref": ref, "commit": commit, "cacheKey": cache_key}
- key = f"res{int(resource_id or 0)}:{owner}/{repo}@{cache_key}"
- with _download_lock:
- job = _download_jobs.get(key)
- if job and job.get("state") == "building":
- return {"ready": False, "state": "building", "ref": ref, "commit": commit, "cacheKey": cache_key}
- _download_jobs[key] = {"state": "building", "updatedAt": time.time(), "error": None, "ref": ref, "commit": commit, "cacheKey": cache_key}
- def runner() -> None:
- with app.app_context():
- if not _download_build_sema.acquire(blocking=False):
- with _download_lock:
- _download_jobs[key] = {"state": "error", "updatedAt": time.time(), "error": "build_busy"}
- return
- try:
- _build_zip_to_cache(owner=owner, repo=repo, ref=ref, commit=commit, resolved_kind=resolved_kind, out_path=cache_path)
- with _download_lock:
- _download_jobs[key] = {"state": "ready", "updatedAt": time.time(), "error": None}
- except Exception as e:
- code = str(e) or "build_failed"
- with _download_lock:
- _download_jobs[key] = {"state": "error", "updatedAt": time.time(), "error": code[:120]}
- finally:
- _download_build_sema.release()
- force_sync = bool(app.testing) or (has_request_context() and bool(request.environ.get("werkzeug.test")))
- if force_sync:
- runner()
- else:
- threading.Thread(target=runner, daemon=True).start()
- if _download_cache_ready(cache_path):
- return {"ready": True, "path": cache_path, "ref": ref, "commit": commit, "cacheKey": cache_key}
- with _download_lock:
- job = _download_jobs.get(key) or {}
- return {
- "ready": False,
- "state": job.get("state") or "building",
- "error": job.get("error"),
- "ref": ref,
- "commit": commit,
- "cacheKey": cache_key,
- }
- @app.post("/resources/<int:resource_id>/download")
- def api_download_prepare(resource_id: int) -> Response:
- user = require_user()
- row = fetch_one("SELECT * FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,))
- if row is None:
- abort(404)
- if row["type"] == "VIP" and not is_vip_active(user):
- return jsonify({"error": "vip_required"}), 403
- payload = request.get_json(silent=True) or {}
- ref = (payload.get("ref") or "").strip() or row["default_ref"]
- owner, repo = row["repo_owner"], row["repo_name"]
- ip = (request.headers.get("X-Forwarded-For") or "").split(",")[0].strip() or (request.remote_addr or "")
- ua = (request.headers.get("User-Agent") or "").strip()
- if len(ip) > 64:
- ip = ip[:64]
- if len(ua) > 256:
- ua = ua[:256]
- execute(
- """
- INSERT INTO download_logs
- (user_id, resource_id, resource_title_snapshot, resource_type_snapshot, ref_snapshot, downloaded_at, ip, user_agent)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?)
- """,
- (user["id"], row["id"], row["title"], row["type"], ref, isoformat(utcnow()), ip, ua),
- )
- execute("UPDATE resources SET download_count = download_count + 1 WHERE id = ?", (resource_id,))
- resolved = _resolve_download_commit(owner=owner, repo=repo, ref=ref)
- commit = resolved.get("commit") if resolved.get("ok") else None
- resolved_kind = resolved.get("kind") or "unknown"
- st = _ensure_download_ready(
- resource_id=resource_id,
- owner=owner,
- repo=repo,
- ref=ref,
- commit=commit,
- resolved_kind=resolved_kind,
- force=bool(payload.get("force")),
- )
- cache_key = st.get("cacheKey") or (commit or ref)
- qs = {"ref": ref, "commit": commit} if commit else {"ref": ref}
- return jsonify(
- {
- "ok": True,
- "ready": bool(st.get("ready")),
- "state": st.get("state") or ("ready" if st.get("ready") else "building"),
- "error": st.get("error"),
- "ref": ref,
- "commit": commit,
- "cacheKey": cache_key,
- "downloadUrl": f"/resources/{resource_id}/download?{urlencode(qs)}",
- "statusUrl": f"/resources/{resource_id}/download/status?{urlencode(qs)}",
- }
- )
- @app.get("/resources/<int:resource_id>/download/status")
- def api_download_status(resource_id: int) -> Response:
- user = require_user()
- row = fetch_one("SELECT * FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,))
- if row is None:
- abort(404)
- if row["type"] == "VIP" and not is_vip_active(user):
- return jsonify({"error": "vip_required"}), 403
- ref = (request.args.get("ref") or "").strip() or row["default_ref"]
- commit_q = (request.args.get("commit") or "").strip() or None
- owner, repo = row["repo_owner"], row["repo_name"]
- resolved_kind = "unknown"
- commit = None
- if commit_q and _looks_like_commit(commit_q):
- commit = commit_q.lower()
- else:
- resolved = _resolve_download_commit(owner=owner, repo=repo, ref=ref)
- if resolved.get("ok"):
- commit = resolved.get("commit")
- resolved_kind = resolved.get("kind") or "unknown"
- cache_key = commit or ref
- cache_path = _download_cache_path(resource_id=resource_id, owner=owner, repo=repo, cache_key=cache_key)
- if _download_cache_ready(cache_path):
- meta = _read_download_cache_meta(cache_path)
- size = None
- mtime = None
- ttl_remaining = None
- try:
- st = cache_path.stat()
- size = int(st.st_size)
- mtime = int(st.st_mtime)
- if _download_cache_ttl_seconds > 0:
- ttl_remaining = max(0, int(_download_cache_ttl_seconds - (time.time() - float(st.st_mtime))))
- except Exception:
- pass
- return jsonify(
- {
- "ok": True,
- "ready": True,
- "state": "ready",
- "error": None,
- "ref": ref,
- "commit": commit,
- "cacheKey": cache_key,
- "bytes": size,
- "mtime": mtime,
- "ttlRemainingSeconds": ttl_remaining,
- "meta": meta,
- }
- )
- key = f"res{int(resource_id or 0)}:{owner}/{repo}@{cache_key}"
- with _download_lock:
- job = _download_jobs.get(key) or {}
- state = job.get("state") or "building"
- return jsonify(
- {
- "ok": True,
- "ready": False,
- "state": state,
- "error": job.get("error"),
- "ref": ref,
- "commit": commit,
- "cacheKey": cache_key,
- }
- )
- @app.get("/resources/<int:resource_id>/download")
- def api_download_file(resource_id: int) -> Response:
- user = require_user()
- row = fetch_one("SELECT * FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,))
- if row is None:
- abort(404)
- if row["type"] == "VIP" and not is_vip_active(user):
- return jsonify({"error": "vip_required"}), 403
- ref = (request.args.get("ref") or "").strip() or row["default_ref"]
- commit_q = (request.args.get("commit") or "").strip() or None
- owner, repo = row["repo_owner"], row["repo_name"]
- resolved_kind = "unknown"
- commit = None
- if commit_q and _looks_like_commit(commit_q):
- commit = commit_q.lower()
- else:
- resolved = _resolve_download_commit(owner=owner, repo=repo, ref=ref)
- if resolved.get("ok"):
- commit = resolved.get("commit")
- resolved_kind = resolved.get("kind") or "unknown"
- st = _ensure_download_ready(
- resource_id=resource_id,
- owner=owner,
- repo=repo,
- ref=ref,
- commit=commit,
- resolved_kind=resolved_kind,
- )
- if not st.get("ready"):
- return jsonify({"error": st.get("error") or "building", "state": st.get("state") or "building"}), 202
- cache_path = st.get("path")
- if not isinstance(cache_path, Path) or not cache_path.exists():
- return jsonify({"error": "download_not_ready"}), 409
- filename = f"{owner}-{repo}-{ref}.zip".replace("/", "-")
- if app.testing or (has_request_context() and bool(request.environ.get("werkzeug.test"))):
- with open(cache_path, "rb") as f:
- payload = f.read()
- return Response(
- payload,
- headers={
- "Content-Type": "application/zip",
- "Content-Disposition": f'attachment; filename="{filename}"',
- },
- )
- f = open(cache_path, "rb")
- resp = send_file(
- f,
- mimetype="application/zip",
- as_attachment=True,
- download_name=filename,
- conditional=True,
- max_age=0,
- )
- resp.call_on_close(f.close)
- resp.direct_passthrough = False
- return resp
- @app.post("/orders")
- def api_create_order() -> Response:
- user = require_user()
- payload = request.get_json(silent=True) or {}
- plan_id = parse_int(payload.get("planId"), 0)
- plan = fetch_one("SELECT * FROM plans WHERE id = ? AND enabled = 1", (plan_id,))
- if plan is None:
- return jsonify({"error": "plan_not_found"}), 404
- order_id = uuid.uuid4().hex
- snapshot = {
- "name": plan["name"],
- "durationDays": plan["duration_days"],
- "priceCents": plan["price_cents"],
- }
- execute(
- """
- INSERT INTO orders (id, user_id, plan_id, amount_cents, status, created_at, plan_snapshot_json)
- VALUES (?, ?, ?, ?, 'PENDING', ?, ?)
- """,
- (order_id, user["id"], plan["id"], plan["price_cents"], isoformat(utcnow()), json.dumps(snapshot)),
- )
- return jsonify({"id": order_id, "status": "PENDING", "amountCents": plan["price_cents"], "plan": snapshot})
- @app.get("/orders")
- def api_my_orders() -> Response:
- user = require_user()
- rows = fetch_all(
- "SELECT * FROM orders WHERE user_id = ? ORDER BY created_at DESC",
- (user["id"],),
- )
- items = []
- for row in rows:
- items.append(
- {
- "id": row["id"],
- "status": row["status"],
- "amountCents": row["amount_cents"],
- "createdAt": row["created_at"],
- "paidAt": row["paid_at"],
- "planSnapshot": json.loads(row["plan_snapshot_json"]),
- }
- )
- return jsonify({"items": items})
- @app.get("/me/downloads")
- def api_my_downloads() -> Response:
- user = require_user()
- page = max(parse_int(request.args.get("page"), 1), 1)
- page_size = min(max(parse_int(request.args.get("pageSize"), 20), 1), 50)
- offset = (page - 1) * page_size
- total_row = fetch_one("SELECT COUNT(1) AS cnt FROM download_logs WHERE user_id = ?", (user["id"],))
- total = int(total_row["cnt"] if total_row is not None else 0)
- rows = fetch_all(
- """
- SELECT
- dl.id,
- dl.user_id,
- dl.resource_id,
- dl.resource_title_snapshot,
- dl.resource_type_snapshot,
- dl.ref_snapshot,
- dl.downloaded_at,
- r.id AS r_id,
- r.status AS r_status,
- r.type AS r_type
- FROM download_logs dl
- LEFT JOIN resources r ON r.id = dl.resource_id
- WHERE dl.user_id = ?
- ORDER BY dl.downloaded_at DESC, dl.id DESC
- LIMIT ? OFFSET ?
- """,
- (user["id"], page_size, offset),
- )
- items = []
- for row in rows:
- if row["r_id"] is None:
- resource_state = "DELETED"
- elif row["r_status"] != "ONLINE":
- resource_state = "OFFLINE"
- else:
- resource_state = "ONLINE"
- items.append(
- {
- "id": row["id"],
- "resourceId": row["resource_id"],
- "resourceTitle": row["resource_title_snapshot"],
- "resourceType": row["resource_type_snapshot"],
- "currentResourceType": row["r_type"] if row["r_id"] is not None else None,
- "ref": row["ref_snapshot"],
- "downloadedAt": row["downloaded_at"],
- "resourceState": resource_state,
- }
- )
- return jsonify({"items": items, "total": total, "page": page, "pageSize": page_size})
- @app.get("/me/messages")
- def api_my_messages() -> Response:
- user = require_user()
- unread_only_raw = (request.args.get("unread") or "").strip().lower()
- unread_only = unread_only_raw in {"1", "true", "yes", "on"}
- page = max(parse_int(request.args.get("page"), 1), 1)
- page_size = min(max(parse_int(request.args.get("pageSize"), 20), 1), 50)
- offset = (page - 1) * page_size
- where = ["user_id = ?"]
- params: list[Any] = [user["id"]]
- if unread_only:
- where.append("read_at IS NULL")
- where_sql = f"WHERE {' AND '.join(where)}"
- total_row = fetch_one(f"SELECT COUNT(1) AS cnt FROM user_messages {where_sql}", tuple(params))
- total = int(total_row["cnt"] if total_row is not None else 0)
- unread_row = fetch_one(
- "SELECT COUNT(1) AS cnt FROM user_messages WHERE user_id = ? AND read_at IS NULL",
- (user["id"],),
- )
- unread_count = int(unread_row["cnt"] if unread_row is not None else 0)
- rows = fetch_all(
- f"""
- SELECT id, title, content, created_at, read_at
- FROM user_messages
- {where_sql}
- ORDER BY created_at DESC, id DESC
- LIMIT ? OFFSET ?
- """,
- tuple(params + [page_size, offset]),
- )
- items = []
- for row in rows:
- items.append(
- {
- "id": row["id"],
- "title": row["title"],
- "content": row["content"],
- "createdAt": row["created_at"],
- "readAt": row["read_at"],
- "read": bool(row["read_at"]),
- }
- )
- return jsonify({"items": items, "total": total, "unreadCount": unread_count, "page": page, "pageSize": page_size})
- @app.put("/me/messages/<int:message_id>/read")
- def api_my_message_read(message_id: int) -> Response:
- user = require_user()
- execute(
- """
- UPDATE user_messages
- SET read_at = ?
- WHERE id = ? AND user_id = ? AND read_at IS NULL
- """,
- (isoformat(utcnow()), message_id, user["id"]),
- )
- return jsonify({"ok": True})
- @app.post("/orders/<order_id>/pay")
- def api_pay_order(order_id: str) -> Response:
- user = require_user()
- config = get_config()
- row = fetch_one("SELECT * FROM orders WHERE id = ? AND user_id = ?", (order_id, user["id"]))
- if row is None:
- abort(404)
- if row["status"] != "PENDING":
- return jsonify({"error": "order_not_pending"}), 409
- # 判断是否启用模拟支付
- enable_mock_pay_raw = get_setting_value("ENABLE_MOCK_PAY")
- if enable_mock_pay_raw is None:
- enable_mock_pay = bool(config.enable_mock_pay)
- else:
- enable_mock_pay = enable_mock_pay_raw.strip().lower() in {"1", "true", "yes", "on"}
- snapshot = json.loads(row["plan_snapshot_json"])
- # 模拟支付:直接标记为已支付并发放会员权益
- if enable_mock_pay:
- execute(
- "UPDATE orders SET status = 'PAID', paid_at = ?, pay_channel = ?, pay_trade_no = ? WHERE id = ?",
- (isoformat(utcnow()), "MOCK", None, order_id),
- )
- extend_vip(user["id"], int(snapshot["durationDays"]))
- return jsonify({"ok": True, "provider": "MOCK", "status": "PAID"})
- # 真实支付:调用中间层 REST API 创建支付订单
- host_base = request.host_url.rstrip("/")
- # return_url:优先读 .env,否则回落到本站 /pay/return
- return_url = config.pay_return_url or f"{host_base}/pay/return"
- # callback_url:优先读 .env,否则回落到本站 /pay/notify
- callback_url = config.pay_callback_url or f"{host_base}/pay/notify"
- amount_cents = int(row["amount_cents"] or 0)
- total_amount = float((Decimal(amount_cents) / Decimal(100)).quantize(Decimal("0.01")))
- subject = f"VIP {snapshot.get('name') or ''}".strip()[:120] or "VIP"
- pay_api_base = config.pay_api_base_url
- pay_payload = {
- "bill_no": order_id,
- "amount": total_amount,
- "subject": subject,
- "return_url": return_url,
- "callback_url": callback_url,
- }
- import sys
- print(f"[PAY] 调用中间层: POST {pay_api_base}/api/alipay/pay", file=sys.stderr)
- print(f"[PAY] 请求体: {pay_payload}", file=sys.stderr)
- try:
- resp = requests.post(
- f"{pay_api_base}/api/alipay/pay",
- json=pay_payload,
- timeout=15,
- )
- except Exception as e:
- print(f"[PAY] 连接失败: {e}", file=sys.stderr)
- return jsonify({"error": "pay_api_unreachable", "detail": str(e)}), 502
- print(f"[PAY] 响应状态: {resp.status_code}", file=sys.stderr)
- print(f"[PAY] 响应体: {resp.text[:500]}", file=sys.stderr)
- if resp.status_code != 200:
- try:
- detail = resp.json()
- except Exception:
- detail = resp.text[:200]
- return jsonify({"error": "pay_api_error", "detail": detail}), 502
- data = resp.json()
- pay_url = data.get("payment_url") or ""
- if not pay_url:
- return jsonify({"error": "pay_api_no_payment_url", "detail": data}), 502
- execute("UPDATE orders SET pay_channel = ? WHERE id = ? AND status = 'PENDING'", ("ALIPAY", order_id))
- return jsonify({"ok": True, "provider": "ALIPAY", "status": "PENDING", "payUrl": pay_url})
- @app.post("/orders/<order_id>/query-and-activate")
- def api_order_query_and_activate(order_id: str) -> Response:
- """前端轮询调用:向中间层查询订单状态,若已支付则激活订单并发放会员权益。
- 与 callback_url 回调竞争,后端已做幂等保护,重复调用安全。
- """
- user = require_user()
- row = fetch_one("SELECT * FROM orders WHERE id = ? AND user_id = ?", (order_id, user["id"]))
- if row is None:
- abort(404)
- # 已支付,直接返回,无需再查
- if row["status"] == "PAID":
- return jsonify({"status": "PAID"})
- # 非 PENDING 状态(CLOSED/FAILED)也直接返回
- if row["status"] != "PENDING":
- return jsonify({"status": row["status"]})
- config = get_config()
- pay_api_base = config.pay_api_base_url
- # 调用中间层查询接口
- try:
- resp = requests.get(
- f"{pay_api_base}/api/alipay/query",
- params={"bill_no": order_id},
- timeout=10,
- )
- except Exception as e:
- return jsonify({"status": "PENDING", "error": str(e)}), 200
- if resp.status_code != 200:
- return jsonify({"status": "PENDING"}), 200
- try:
- data = resp.json()
- except Exception:
- return jsonify({"status": "PENDING"}), 200
- trade_status = (data.get("trade_status") or "").strip().upper()
- if trade_status not in {"TRADE_SUCCESS", "TRADE_FINISHED"}:
- # 返回中间层的状态供前端判断是否继续轮询
- return jsonify({"status": "PENDING", "tradeStatus": trade_status}), 200
- # 查询到支付成功,激活订单(幂等:只有 PENDING 状态才会更新)
- trade_no = (data.get("trade_no") or "").strip()
- amount_raw = str(data.get("amount") or "").strip()
- try:
- amount = Decimal(amount_raw).quantize(Decimal("0.01"))
- except Exception:
- return jsonify({"status": "PENDING", "error": "invalid_amount"}), 200
- expect_amount = (Decimal(int(row["amount_cents"] or 0)) / Decimal(100)).quantize(Decimal("0.01"))
- if amount != expect_amount:
- return jsonify({"status": "PENDING", "error": "amount_mismatch"}), 200
- snapshot = json.loads(row["plan_snapshot_json"])
- cur = execute(
- """
- UPDATE orders
- SET status = 'PAID', paid_at = ?, pay_channel = ?, pay_trade_no = ?
- WHERE id = ? AND status = 'PENDING'
- """,
- (isoformat(utcnow()), "ALIPAY", trade_no or None, order_id),
- )
- if getattr(cur, "rowcount", 0) == 1:
- extend_vip(int(row["user_id"]), int(snapshot["durationDays"]))
- return jsonify({"status": "PAID"})
- @app.post("/pay/callback")
- def api_pay_callback() -> Response:
- """兼容旧版支付宝直连回调(form 表单格式),保留以防万一。"""
- params: dict[str, Any] = {}
- try:
- for k in request.form.keys():
- params[k] = request.form.get(k)
- except Exception:
- params = {}
- if not params:
- try:
- params = dict(request.args)
- except Exception:
- params = {}
- sign = (params.get("sign") or "").strip()
- sign_type = (params.get("sign_type") or "RSA2").strip().upper()
- if not sign or sign_type != "RSA2":
- return Response("fail", mimetype="text/plain")
- verify_params = dict(params)
- verify_params.pop("sign", None)
- verify_params.pop("sign_type", None)
- sign_content = _alipay_sign_content(verify_params)
- alipay_public_key = (get_setting_value("ALIPAY_PUBLIC_KEY") or "").strip()
- alipay_app_id = (get_setting_value("ALIPAY_APP_ID") or "").strip()
- if not alipay_public_key:
- return Response("fail", mimetype="text/plain")
- try:
- ok = _alipay_rsa2_verify(sign_content, sign, alipay_public_key)
- except RuntimeError:
- return Response("fail", mimetype="text/plain")
- if not ok:
- return Response("fail", mimetype="text/plain")
- out_trade_no = (params.get("out_trade_no") or "").strip()
- trade_no = (params.get("trade_no") or "").strip()
- trade_status = (params.get("trade_status") or "").strip().upper()
- total_amount_raw = (params.get("total_amount") or "").strip()
- app_id = (params.get("app_id") or "").strip()
- if alipay_app_id and app_id and alipay_app_id != app_id:
- return Response("fail", mimetype="text/plain")
- if not out_trade_no:
- return Response("fail", mimetype="text/plain")
- if trade_status not in {"TRADE_SUCCESS", "TRADE_FINISHED"}:
- return Response("success", mimetype="text/plain")
- row = fetch_one("SELECT * FROM orders WHERE id = ?", (out_trade_no,))
- if row is None:
- return Response("success", mimetype="text/plain")
- if row["status"] == "PAID":
- return Response("success", mimetype="text/plain")
- try:
- amount = Decimal(total_amount_raw).quantize(Decimal("0.01"))
- except Exception:
- return Response("fail", mimetype="text/plain")
- expect_amount = (Decimal(int(row["amount_cents"] or 0)) / Decimal(100)).quantize(Decimal("0.01"))
- if amount != expect_amount:
- return Response("fail", mimetype="text/plain")
- snapshot = json.loads(row["plan_snapshot_json"])
- cur = execute(
- """
- UPDATE orders
- SET status = 'PAID', paid_at = ?, pay_channel = ?, pay_trade_no = ?
- WHERE id = ? AND status = 'PENDING'
- """,
- (isoformat(utcnow()), "ALIPAY", trade_no or None, out_trade_no),
- )
- if getattr(cur, "rowcount", 0) == 1:
- extend_vip(int(row["user_id"]), int(snapshot["durationDays"]))
- return Response("success", mimetype="text/plain")
- @app.post("/pay/notify")
- def api_pay_notify() -> Response:
- """中间层 REST API 支付成功后的异步回调接口(JSON 格式)。
- 接收字段:bill_no, trade_no, trade_status, amount, paid_at
- """
- data = request.get_json(silent=True) or {}
- bill_no = (data.get("bill_no") or "").strip()
- trade_no = (data.get("trade_no") or "").strip()
- trade_status = (data.get("trade_status") or "").strip().upper()
- amount_raw = str(data.get("amount") or "").strip()
- if not bill_no:
- return jsonify({"error": "bill_no_missing"}), 400
- # 只处理支付成功的状态
- if trade_status not in {"TRADE_SUCCESS", "TRADE_FINISHED"}:
- return jsonify({"ok": True, "msg": "ignored"}), 200
- row = fetch_one("SELECT * FROM orders WHERE id = ?", (bill_no,))
- if row is None:
- # 订单不存在,返回 200 避免中间层重试
- return jsonify({"ok": True, "msg": "order_not_found"}), 200
- if row["status"] == "PAID":
- # 幂等:已处理过,直接返回成功
- return jsonify({"ok": True, "msg": "already_paid"}), 200
- # 校验金额
- try:
- amount = Decimal(amount_raw).quantize(Decimal("0.01"))
- except Exception:
- return jsonify({"error": "invalid_amount"}), 400
- expect_amount = (Decimal(int(row["amount_cents"] or 0)) / Decimal(100)).quantize(Decimal("0.01"))
- if amount != expect_amount:
- return jsonify({"error": "amount_mismatch"}), 400
- snapshot = json.loads(row["plan_snapshot_json"])
- cur = execute(
- """
- UPDATE orders
- SET status = 'PAID', paid_at = ?, pay_channel = ?, pay_trade_no = ?
- WHERE id = ? AND status = 'PENDING'
- """,
- (isoformat(utcnow()), "ALIPAY", trade_no or None, bill_no),
- )
- if getattr(cur, "rowcount", 0) == 1:
- extend_vip(int(row["user_id"]), int(snapshot["durationDays"]))
- return jsonify({"ok": True}), 200
- @app.get("/pay/return")
- def api_pay_return() -> Response:
- """支付宝支付完成后的同步跳转落地页。
- 中间层会将用户浏览器重定向到此地址,展示支付结果并跳转到个人中心。
- """
- bill_no = (request.args.get("bill_no") or request.args.get("out_trade_no") or "").strip()
- status = "unknown"
- if bill_no:
- row = fetch_one("SELECT status FROM orders WHERE id = ?", (bill_no,))
- if row:
- status = row["status"]
- # 渲染一个简单的跳转页面,3 秒后自动跳转到个人中心
- html = f"""<!DOCTYPE html>
- <html lang="zh-CN">
- <head>
- <meta charset="UTF-8">
- <meta http-equiv="refresh" content="3;url=/ui/me">
- <title>支付结果</title>
- <style>
- body {{ font-family: sans-serif; display: flex; justify-content: center;
- align-items: center; height: 100vh; margin: 0; background: #f5f5f5; }}
- .box {{ text-align: center; background: #fff; padding: 48px 64px;
- border-radius: 16px; box-shadow: 0 4px 24px rgba(0,0,0,.08); }}
- .icon {{ font-size: 3rem; margin-bottom: 16px; }}
- h2 {{ margin: 0 0 8px; color: #333; }}
- p {{ color: #888; margin: 0 0 24px; }}
- a {{ color: #0ea5e9; text-decoration: none; }}
- </style>
- </head>
- <body>
- <div class="box">
- <div class="icon">{"✅" if status == "PAID" else "⏳"}</div>
- <h2>{"支付成功" if status == "PAID" else "支付处理中"}</h2>
- <p>{"会员权益已生效,正在跳转到个人中心…" if status == "PAID" else "订单处理中,请稍候,正在跳转…"}</p>
- <a href="/ui/me">立即前往个人中心</a>
- </div>
- </body>
- </html>"""
- return Response(html, mimetype="text/html")
- @app.post("/admin/auth/login")
- def api_admin_login() -> Response:
- payload = request.get_json(silent=True) or {}
- username = (payload.get("username") or "").strip()
- password = payload.get("password") or ""
- admin = fetch_one("SELECT * FROM admin_users WHERE username = ?", (username,))
- if admin is None or not check_password_hash(admin["password_hash"], password):
- return jsonify({"error": "invalid_credentials"}), 401
- if admin["status"] != "ACTIVE":
- return jsonify({"error": "admin_disabled"}), 403
- session["admin_user_id"] = admin["id"]
- execute("UPDATE admin_users SET last_login_at = ? WHERE id = ?", (isoformat(utcnow()), admin["id"]))
- return jsonify({"ok": True})
- @app.post("/admin/auth/logout")
- def api_admin_logout() -> Response:
- session.pop("admin_user_id", None)
- return jsonify({"ok": True})
- @app.get("/admin/stats")
- def api_admin_stats() -> Response:
- _ = require_admin()
- from datetime import timedelta
- now = utcnow()
- now_s = isoformat(now)
- since_24h = isoformat(now - timedelta(hours=24))
- def _count(sql: str, params: tuple = ()) -> int:
- row = fetch_one(sql, params)
- if row is None:
- return 0
- v = row.get("c") if isinstance(row, dict) else row["c"]
- try:
- return int(v or 0)
- except Exception:
- return 0
- def _sum(sql: str, params: tuple = ()) -> int:
- row = fetch_one(sql, params)
- if row is None:
- return 0
- v = row.get("s") if isinstance(row, dict) else row["s"]
- try:
- return int(v or 0)
- except Exception:
- return 0
- users_total = _count("SELECT COUNT(1) AS c FROM users")
- users_active = _count("SELECT COUNT(1) AS c FROM users WHERE status = ?", ("ACTIVE",))
- users_vip_active = _count(
- "SELECT COUNT(1) AS c FROM users WHERE vip_expire_at IS NOT NULL AND vip_expire_at > ?",
- (now_s,),
- )
- resources_total = _count("SELECT COUNT(1) AS c FROM resources")
- resources_online = _count("SELECT COUNT(1) AS c FROM resources WHERE status = ?", ("ONLINE",))
- orders_total = _count("SELECT COUNT(1) AS c FROM orders")
- orders_paid = _count("SELECT COUNT(1) AS c FROM orders WHERE status = ?", ("PAID",))
- orders_pending = _count("SELECT COUNT(1) AS c FROM orders WHERE status = ?", ("PENDING",))
- revenue_total_cents = _sum("SELECT COALESCE(SUM(amount_cents), 0) AS s FROM orders WHERE status = ?", ("PAID",))
- revenue_24h_cents = _sum(
- "SELECT COALESCE(SUM(amount_cents), 0) AS s FROM orders WHERE status = ? AND paid_at IS NOT NULL AND paid_at >= ?",
- ("PAID", since_24h),
- )
- downloads_total = _count("SELECT COUNT(1) AS c FROM download_logs")
- downloads_24h = _count("SELECT COUNT(1) AS c FROM download_logs WHERE downloaded_at >= ?", (since_24h,))
- msg_total = _count("SELECT COUNT(1) AS c FROM user_messages")
- msg_24h = _count("SELECT COUNT(1) AS c FROM user_messages WHERE created_at >= ?", (since_24h,))
- return jsonify(
- {
- "now": now_s,
- "users": {"total": users_total, "active": users_active, "vipActive": users_vip_active},
- "resources": {"total": resources_total, "online": resources_online},
- "orders": {"total": orders_total, "paid": orders_paid, "pending": orders_pending},
- "revenue": {"totalCents": revenue_total_cents, "last24hCents": revenue_24h_cents},
- "downloads": {"total": downloads_total, "last24h": downloads_24h},
- "messages": {"total": msg_total, "last24h": msg_24h},
- "backend": get_active_backend(),
- }
- )
- @app.post("/admin/uploads")
- def api_admin_upload() -> Response:
- _ = require_admin()
- f = request.files.get("file")
- if f is None:
- return jsonify({"error": "file_required"}), 400
- max_bytes = 50 * 1024 * 1024
- if request.content_length is not None and int(request.content_length) > max_bytes:
- return jsonify({"error": "file_too_large"}), 413
- original = secure_filename(f.filename or "")
- ext = os.path.splitext(original)[1].lower()
- allowed = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".mp4", ".webm", ".mov", ".m4v"}
- if ext and ext not in allowed:
- return jsonify({"error": "unsupported_file_type"}), 400
- name = f"{uuid.uuid4().hex}{ext}"
- try:
- storage = _get_upload_storage()
- resp = storage.save_upload(f, name)
- return jsonify({"url": resp.get("url"), "name": resp.get("name")})
- except RuntimeError as e:
- return jsonify({"error": str(e) or "upload_failed"}), 500
- except Exception:
- return jsonify({"error": "upload_failed"}), 500
- @app.delete("/admin/uploads/<name>")
- def api_admin_delete_upload(name: str) -> Response:
- _ = require_admin()
- try:
- storage = _get_upload_storage()
- storage.delete_uploads({name})
- except RuntimeError as e:
- return jsonify({"error": str(e) or "delete_failed"}), 500
- except Exception:
- return jsonify({"error": "delete_failed"}), 500
- return jsonify({"ok": True})
- @app.get("/admin/uploads")
- def api_admin_uploads_list() -> Response:
- _ = require_admin()
- q = (request.args.get("q") or "").strip().lower()
- used_filter = (request.args.get("used") or "").strip().lower()
- used_names: set[str] = set()
- for row in fetch_all("SELECT cover_url, summary FROM resources", ()):
- used_names |= _extract_upload_names(row["cover_url"])
- used_names |= _extract_upload_names(row["summary"])
- used_lower = {n.lower() for n in used_names}
- try:
- storage = _get_upload_storage()
- all_items = storage.list_items() or []
- except RuntimeError as e:
- return jsonify({"error": str(e) or "list_failed"}), 500
- except Exception:
- return jsonify({"error": "list_failed"}), 500
- for it in all_items:
- name = str(it.get("name") or "")
- it["used"] = bool(name.lower() in used_lower)
- all_items.sort(key=lambda x: x["mtime"], reverse=True)
- stats = {
- "totalCount": len(all_items),
- "totalBytes": sum(int(i.get("bytes") or 0) for i in all_items),
- "usedCount": sum(1 for i in all_items if i.get("used")),
- "usedBytes": sum(int(i.get("bytes") or 0) for i in all_items if i.get("used")),
- }
- stats["unusedCount"] = int(stats["totalCount"] - stats["usedCount"])
- stats["unusedBytes"] = int(stats["totalBytes"] - stats["usedBytes"])
- items = all_items
- if q:
- items = [i for i in items if q in str(i.get("name") or "").lower()]
- if used_filter == "used":
- items = [i for i in items if i.get("used")]
- elif used_filter == "unused":
- items = [i for i in items if not i.get("used")]
- return jsonify({"items": items, "stats": stats})
- @app.post("/admin/uploads/cleanup-unused")
- def api_admin_uploads_cleanup_unused() -> Response:
- _ = require_admin()
- used_names: set[str] = set()
- for row in fetch_all("SELECT cover_url, summary FROM resources", ()):
- used_names |= _extract_upload_names(row["cover_url"])
- used_names |= _extract_upload_names(row["summary"])
- used_lower = {n.lower() for n in used_names}
- try:
- storage = _get_upload_storage()
- all_items = storage.list_items() or []
- all_names = {str(i.get("name") or "") for i in all_items if str(i.get("name") or "")}
- unused = {n for n in all_names if n.lower() not in used_lower}
- storage.delete_uploads(unused)
- return jsonify({"ok": True, "deletedCount": len(unused)})
- except RuntimeError as e:
- return jsonify({"error": str(e) or "cleanup_failed"}), 500
- except Exception:
- return jsonify({"error": "cleanup_failed"}), 500
- @app.get("/admin/settings")
- def api_admin_settings_get() -> Response:
- _ = require_admin()
- config = get_config()
- gogs_base_url = (get_setting_value("GOGS_BASE_URL") or config.gogs_base_url).rstrip("/")
- gogs_token = get_setting_value("GOGS_TOKEN")
- if gogs_token is not None:
- gogs_token = gogs_token.strip() or None
- if gogs_token is None:
- gogs_token = config.gogs_token
- pay_provider = (get_setting_value("PAY_PROVIDER") or "MOCK").strip().upper()
- pay_api_key = get_setting_value("PAY_API_KEY")
- if pay_api_key is not None:
- pay_api_key = pay_api_key.strip() or None
- alipay_app_id = (get_setting_value("ALIPAY_APP_ID") or "").strip()
- alipay_gateway = (get_setting_value("ALIPAY_GATEWAY") or "https://openapi.alipay.com/gateway.do").strip()
- alipay_notify_url = (get_setting_value("ALIPAY_NOTIFY_URL") or "").strip()
- alipay_return_url = (get_setting_value("ALIPAY_RETURN_URL") or "").strip()
- alipay_private_key = get_setting_value("ALIPAY_PRIVATE_KEY")
- if alipay_private_key is not None:
- alipay_private_key = alipay_private_key.strip() or None
- alipay_public_key = get_setting_value("ALIPAY_PUBLIC_KEY")
- if alipay_public_key is not None:
- alipay_public_key = alipay_public_key.strip() or None
- llm_provider = (get_setting_value("LLM_PROVIDER") or "").strip()
- llm_base_url = (get_setting_value("LLM_BASE_URL") or "").strip()
- llm_model = (get_setting_value("LLM_MODEL") or "").strip()
- llm_api_key = get_setting_value("LLM_API_KEY")
- if llm_api_key is not None:
- llm_api_key = llm_api_key.strip() or None
- redis_url = get_setting_value("REDIS_URL")
- if redis_url is not None:
- redis_url = redis_url.strip() or None
- redis_url_safe = ""
- if redis_url:
- try:
- parts = urlsplit(redis_url)
- if parts.scheme in {"redis", "rediss"} and parts.netloc:
- netloc = parts.netloc
- if "@" in netloc:
- netloc = netloc.split("@", 1)[1]
- redis_url_safe = urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment))
- else:
- redis_url_safe = redis_url
- except Exception:
- redis_url_safe = ""
- enable_mock_pay_raw = get_setting_value("ENABLE_MOCK_PAY")
- if enable_mock_pay_raw is None:
- enable_mock_pay = bool(config.enable_mock_pay)
- else:
- enable_mock_pay = enable_mock_pay_raw.strip().lower() in {"1", "true", "yes", "on"}
- storage_provider = (get_setting_value("STORAGE_PROVIDER") or "AUTO").strip().upper()
- if storage_provider not in {"AUTO", "LOCAL", "OSS"}:
- storage_provider = "AUTO"
- oss_endpoint = (get_setting_value("OSS_ENDPOINT") or "").strip().rstrip("/")
- oss_bucket = (get_setting_value("OSS_BUCKET") or "").strip()
- oss_access_key_id = (get_setting_value("OSS_ACCESS_KEY_ID") or "").strip()
- oss_access_key_secret = get_setting_value("OSS_ACCESS_KEY_SECRET")
- if oss_access_key_secret is not None:
- oss_access_key_secret = oss_access_key_secret.strip() or None
- oss_upload_prefix = _normalize_upload_prefix(get_setting_value("OSS_UPLOAD_PREFIX") or "uploads/")
- oss_public_base_url = (get_setting_value("OSS_PUBLIC_BASE_URL") or "").strip().rstrip("/")
- return jsonify(
- {
- "gogsBaseUrl": gogs_base_url,
- "hasGogsToken": bool(gogs_token),
- "payment": {
- "provider": pay_provider,
- "hasApiKey": bool(pay_api_key),
- "enableMockPay": enable_mock_pay,
- "alipay": {
- "appId": alipay_app_id,
- "gateway": alipay_gateway,
- "notifyUrl": alipay_notify_url,
- "returnUrl": alipay_return_url,
- "hasPrivateKey": bool(alipay_private_key),
- "hasPublicKey": bool(alipay_public_key),
- },
- },
- "llm": {"provider": llm_provider, "baseUrl": llm_base_url, "model": llm_model, "hasApiKey": bool(llm_api_key)},
- "cache": {"hasRedisUrl": bool(redis_url), "redisUrl": redis_url_safe},
- "storage": {
- "provider": storage_provider,
- "oss": {
- "endpoint": oss_endpoint,
- "bucket": oss_bucket,
- "accessKeyId": oss_access_key_id,
- "hasAccessKeySecret": bool(oss_access_key_secret),
- "uploadPrefix": oss_upload_prefix,
- "publicBaseUrl": oss_public_base_url,
- },
- },
- "db": db_status(),
- }
- )
- @app.put("/admin/settings")
- def api_admin_settings_put() -> Response:
- admin = require_admin()
- payload = request.get_json(silent=True) or {}
- config = get_config()
- gogs_base_url = payload.get("gogsBaseUrl")
- gogs_token = payload.get("gogsToken")
- clear_token = bool(payload.get("clearGogsToken"))
- pay = payload.get("payment") or {}
- llm = payload.get("llm") or {}
- cache = payload.get("cache") or {}
- storage = payload.get("storage") or {}
- mysql = payload.get("mysql") or {}
- before = {
- "gogsBaseUrl": (get_setting_value("GOGS_BASE_URL") or config.gogs_base_url).rstrip("/"),
- "hasGogsToken": bool((get_setting_value("GOGS_TOKEN") or "").strip() or config.gogs_token),
- "payment": {
- "provider": (get_setting_value("PAY_PROVIDER") or "MOCK").strip().upper(),
- "hasApiKey": bool((get_setting_value("PAY_API_KEY") or "").strip()),
- "enableMockPay": (get_setting_value("ENABLE_MOCK_PAY") or "").strip().lower() in {"1", "true", "yes", "on"},
- "alipay": {
- "appId": (get_setting_value("ALIPAY_APP_ID") or "").strip(),
- "gateway": (get_setting_value("ALIPAY_GATEWAY") or "").strip(),
- "notifyUrl": (get_setting_value("ALIPAY_NOTIFY_URL") or "").strip(),
- "returnUrl": (get_setting_value("ALIPAY_RETURN_URL") or "").strip(),
- "hasPrivateKey": bool((get_setting_value("ALIPAY_PRIVATE_KEY") or "").strip()),
- "hasPublicKey": bool((get_setting_value("ALIPAY_PUBLIC_KEY") or "").strip()),
- },
- },
- "llm": {
- "provider": (get_setting_value("LLM_PROVIDER") or "").strip(),
- "baseUrl": (get_setting_value("LLM_BASE_URL") or "").strip(),
- "model": (get_setting_value("LLM_MODEL") or "").strip(),
- "hasApiKey": bool((get_setting_value("LLM_API_KEY") or "").strip()),
- },
- "cache": {"hasRedisUrl": bool((get_setting_value("REDIS_URL") or "").strip())},
- "storage": {
- "provider": (get_setting_value("STORAGE_PROVIDER") or "AUTO").strip().upper(),
- "oss": {
- "endpoint": (get_setting_value("OSS_ENDPOINT") or "").strip().rstrip("/"),
- "bucket": (get_setting_value("OSS_BUCKET") or "").strip(),
- "accessKeyId": (get_setting_value("OSS_ACCESS_KEY_ID") or "").strip(),
- "hasAccessKeySecret": bool((get_setting_value("OSS_ACCESS_KEY_SECRET") or "").strip()),
- "uploadPrefix": _normalize_upload_prefix(get_setting_value("OSS_UPLOAD_PREFIX") or "uploads/"),
- "publicBaseUrl": (get_setting_value("OSS_PUBLIC_BASE_URL") or "").strip().rstrip("/"),
- },
- },
- "db": db_status(),
- }
- if gogs_base_url is not None:
- gogs_base_url = str(gogs_base_url).strip().rstrip("/")
- if gogs_base_url and not (gogs_base_url.startswith("http://") or gogs_base_url.startswith("https://")):
- return jsonify({"error": "invalid_gogs_base_url"}), 400
- if gogs_base_url:
- set_setting_value("GOGS_BASE_URL", gogs_base_url, category="GOGS")
- else:
- delete_setting_value("GOGS_BASE_URL")
- if clear_token:
- delete_setting_value("GOGS_TOKEN")
- elif gogs_token is not None:
- gogs_token = str(gogs_token).strip()
- if gogs_token:
- set_setting_value("GOGS_TOKEN", gogs_token, category="GOGS")
- pay_provider = pay.get("provider")
- pay_api_key = pay.get("apiKey")
- clear_pay_api_key = bool(pay.get("clearApiKey"))
- enable_mock_pay = pay.get("enableMockPay")
- alipay = pay.get("alipay") or {}
- if pay_provider is not None:
- pay_provider = str(pay_provider).strip().upper()
- if pay_provider:
- set_setting_value("PAY_PROVIDER", pay_provider, category="PAYMENT")
- if enable_mock_pay is not None:
- set_setting_value("ENABLE_MOCK_PAY", "1" if bool(enable_mock_pay) else "0", category="PAYMENT")
- if clear_pay_api_key:
- delete_setting_value("PAY_API_KEY")
- elif pay_api_key is not None:
- pay_api_key = str(pay_api_key).strip()
- if pay_api_key:
- set_setting_value("PAY_API_KEY", pay_api_key, category="PAYMENT")
- alipay_app_id = alipay.get("appId")
- alipay_gateway = alipay.get("gateway")
- alipay_notify_url = alipay.get("notifyUrl")
- alipay_return_url = alipay.get("returnUrl")
- alipay_private_key = alipay.get("privateKey")
- clear_alipay_private_key = bool(alipay.get("clearPrivateKey"))
- alipay_public_key = alipay.get("publicKey")
- clear_alipay_public_key = bool(alipay.get("clearPublicKey"))
- if alipay_app_id is not None:
- alipay_app_id = str(alipay_app_id).strip()
- if alipay_app_id:
- set_setting_value("ALIPAY_APP_ID", alipay_app_id, category="PAYMENT")
- else:
- delete_setting_value("ALIPAY_APP_ID")
- if alipay_gateway is not None:
- alipay_gateway = str(alipay_gateway).strip().rstrip("/")
- if alipay_gateway:
- if not (alipay_gateway.startswith("http://") or alipay_gateway.startswith("https://")):
- return jsonify({"error": "invalid_alipay_gateway"}), 400
- set_setting_value("ALIPAY_GATEWAY", alipay_gateway, category="PAYMENT")
- else:
- delete_setting_value("ALIPAY_GATEWAY")
- if alipay_notify_url is not None:
- alipay_notify_url = str(alipay_notify_url).strip()
- if alipay_notify_url:
- if not (alipay_notify_url.startswith("http://") or alipay_notify_url.startswith("https://")):
- return jsonify({"error": "invalid_alipay_notify_url"}), 400
- set_setting_value("ALIPAY_NOTIFY_URL", alipay_notify_url, category="PAYMENT")
- else:
- delete_setting_value("ALIPAY_NOTIFY_URL")
- if alipay_return_url is not None:
- alipay_return_url = str(alipay_return_url).strip()
- if alipay_return_url:
- if not (alipay_return_url.startswith("http://") or alipay_return_url.startswith("https://")):
- return jsonify({"error": "invalid_alipay_return_url"}), 400
- set_setting_value("ALIPAY_RETURN_URL", alipay_return_url, category="PAYMENT")
- else:
- delete_setting_value("ALIPAY_RETURN_URL")
- if clear_alipay_private_key:
- delete_setting_value("ALIPAY_PRIVATE_KEY")
- elif alipay_private_key is not None:
- alipay_private_key = str(alipay_private_key).strip()
- if alipay_private_key:
- set_setting_value("ALIPAY_PRIVATE_KEY", alipay_private_key, category="PAYMENT")
- if clear_alipay_public_key:
- delete_setting_value("ALIPAY_PUBLIC_KEY")
- elif alipay_public_key is not None:
- alipay_public_key = str(alipay_public_key).strip()
- if alipay_public_key:
- set_setting_value("ALIPAY_PUBLIC_KEY", alipay_public_key, category="PAYMENT")
- llm_provider = llm.get("provider")
- llm_base_url = llm.get("baseUrl")
- llm_model = llm.get("model")
- llm_api_key = llm.get("apiKey")
- clear_llm_api_key = bool(llm.get("clearApiKey"))
- if llm_provider is not None:
- llm_provider = str(llm_provider).strip()
- if llm_provider:
- set_setting_value("LLM_PROVIDER", llm_provider, category="LLM")
- if llm_base_url is not None:
- llm_base_url = str(llm_base_url).strip().rstrip("/")
- if llm_base_url:
- set_setting_value("LLM_BASE_URL", llm_base_url, category="LLM")
- if llm_model is not None:
- llm_model = str(llm_model).strip()
- if llm_model:
- set_setting_value("LLM_MODEL", llm_model, category="LLM")
- if clear_llm_api_key:
- delete_setting_value("LLM_API_KEY")
- elif llm_api_key is not None:
- llm_api_key = str(llm_api_key).strip()
- if llm_api_key:
- set_setting_value("LLM_API_KEY", llm_api_key, category="LLM")
- redis_url = cache.get("redisUrl")
- clear_redis_url = bool(cache.get("clearRedisUrl"))
- if clear_redis_url:
- delete_setting_value("REDIS_URL")
- elif redis_url is not None:
- redis_url = str(redis_url).strip()
- if redis_url:
- if not (redis_url.startswith("redis://") or redis_url.startswith("rediss://")):
- return jsonify({"error": "invalid_redis_url"}), 400
- set_setting_value("REDIS_URL", redis_url, category="CACHE")
- storage_provider = storage.get("provider")
- oss = storage.get("oss") or {}
- if storage_provider is not None:
- storage_provider = str(storage_provider).strip().upper()
- if storage_provider and storage_provider not in {"AUTO", "LOCAL", "OSS"}:
- return jsonify({"error": "invalid_storage_provider"}), 400
- if storage_provider:
- set_setting_value("STORAGE_PROVIDER", storage_provider, category="STORAGE")
- else:
- delete_setting_value("STORAGE_PROVIDER")
- oss_endpoint = oss.get("endpoint")
- oss_bucket = oss.get("bucket")
- oss_access_key_id = oss.get("accessKeyId")
- oss_access_key_secret = oss.get("accessKeySecret")
- oss_upload_prefix = oss.get("uploadPrefix")
- oss_public_base_url = oss.get("publicBaseUrl")
- clear_oss_access_key_secret = bool(oss.get("clearAccessKeySecret"))
- if oss_endpoint is not None:
- oss_endpoint = str(oss_endpoint).strip().rstrip("/")
- if oss_endpoint:
- if not (oss_endpoint.startswith("http://") or oss_endpoint.startswith("https://")):
- return jsonify({"error": "invalid_oss_endpoint"}), 400
- set_setting_value("OSS_ENDPOINT", oss_endpoint, category="STORAGE")
- else:
- delete_setting_value("OSS_ENDPOINT")
- if oss_bucket is not None:
- oss_bucket = str(oss_bucket).strip()
- if oss_bucket:
- set_setting_value("OSS_BUCKET", oss_bucket, category="STORAGE")
- else:
- delete_setting_value("OSS_BUCKET")
- if oss_access_key_id is not None:
- oss_access_key_id = str(oss_access_key_id).strip()
- if oss_access_key_id:
- set_setting_value("OSS_ACCESS_KEY_ID", oss_access_key_id, category="STORAGE")
- else:
- delete_setting_value("OSS_ACCESS_KEY_ID")
- if clear_oss_access_key_secret:
- delete_setting_value("OSS_ACCESS_KEY_SECRET")
- elif oss_access_key_secret is not None:
- oss_access_key_secret = str(oss_access_key_secret).strip()
- if oss_access_key_secret:
- set_setting_value("OSS_ACCESS_KEY_SECRET", oss_access_key_secret, category="STORAGE")
- if oss_upload_prefix is not None:
- oss_upload_prefix = _normalize_upload_prefix(oss_upload_prefix)
- if oss_upload_prefix:
- set_setting_value("OSS_UPLOAD_PREFIX", oss_upload_prefix, category="STORAGE")
- else:
- delete_setting_value("OSS_UPLOAD_PREFIX")
- if oss_public_base_url is not None:
- oss_public_base_url = str(oss_public_base_url).strip().rstrip("/")
- if oss_public_base_url:
- if not (oss_public_base_url.startswith("http://") or oss_public_base_url.startswith("https://")):
- return jsonify({"error": "invalid_oss_public_base_url"}), 400
- set_setting_value("OSS_PUBLIC_BASE_URL", oss_public_base_url, category="STORAGE")
- else:
- delete_setting_value("OSS_PUBLIC_BASE_URL")
- mysql_host = mysql.get("host")
- mysql_port = mysql.get("port")
- mysql_user = mysql.get("user")
- mysql_database = mysql.get("database")
- mysql_password = mysql.get("password")
- clear_mysql_password = bool(mysql.get("clearPassword"))
- if mysql_host is not None:
- mysql_host = str(mysql_host).strip()
- if mysql_host:
- set_setting_value("MYSQL_HOST", mysql_host, category="DB")
- else:
- delete_setting_value("MYSQL_HOST")
- if mysql_port is not None:
- mysql_port = str(mysql_port).strip()
- if mysql_port:
- p = parse_int(mysql_port, 0)
- if p <= 0 or p > 65535:
- return jsonify({"error": "invalid_mysql_port"}), 400
- set_setting_value("MYSQL_PORT", str(p), category="DB")
- else:
- delete_setting_value("MYSQL_PORT")
- if mysql_user is not None:
- mysql_user = str(mysql_user).strip()
- if mysql_user:
- set_setting_value("MYSQL_USER", mysql_user, category="DB")
- else:
- delete_setting_value("MYSQL_USER")
- if mysql_database is not None:
- mysql_database = str(mysql_database).strip()
- if mysql_database:
- set_setting_value("MYSQL_DATABASE", mysql_database, category="DB")
- else:
- delete_setting_value("MYSQL_DATABASE")
- if clear_mysql_password:
- delete_setting_value("MYSQL_PASSWORD")
- elif mysql_password is not None:
- mysql_password = str(mysql_password).strip()
- if mysql_password:
- set_setting_value("MYSQL_PASSWORD", mysql_password, category="DB")
- after = {
- "gogsBaseUrl": (get_setting_value("GOGS_BASE_URL") or config.gogs_base_url).rstrip("/"),
- "hasGogsToken": bool((get_setting_value("GOGS_TOKEN") or "").strip() or config.gogs_token),
- "payment": {
- "provider": (get_setting_value("PAY_PROVIDER") or "MOCK").strip().upper(),
- "hasApiKey": bool((get_setting_value("PAY_API_KEY") or "").strip()),
- "enableMockPay": (get_setting_value("ENABLE_MOCK_PAY") or "").strip().lower() in {"1", "true", "yes", "on"},
- "alipay": {
- "appId": (get_setting_value("ALIPAY_APP_ID") or "").strip(),
- "gateway": (get_setting_value("ALIPAY_GATEWAY") or "").strip(),
- "notifyUrl": (get_setting_value("ALIPAY_NOTIFY_URL") or "").strip(),
- "returnUrl": (get_setting_value("ALIPAY_RETURN_URL") or "").strip(),
- "hasPrivateKey": bool((get_setting_value("ALIPAY_PRIVATE_KEY") or "").strip()),
- "hasPublicKey": bool((get_setting_value("ALIPAY_PUBLIC_KEY") or "").strip()),
- },
- },
- "llm": {
- "provider": (get_setting_value("LLM_PROVIDER") or "").strip(),
- "baseUrl": (get_setting_value("LLM_BASE_URL") or "").strip(),
- "model": (get_setting_value("LLM_MODEL") or "").strip(),
- "hasApiKey": bool((get_setting_value("LLM_API_KEY") or "").strip()),
- },
- "cache": {"hasRedisUrl": bool((get_setting_value("REDIS_URL") or "").strip())},
- "storage": {
- "provider": (get_setting_value("STORAGE_PROVIDER") or "AUTO").strip().upper(),
- "oss": {
- "endpoint": (get_setting_value("OSS_ENDPOINT") or "").strip().rstrip("/"),
- "bucket": (get_setting_value("OSS_BUCKET") or "").strip(),
- "accessKeyId": (get_setting_value("OSS_ACCESS_KEY_ID") or "").strip(),
- "hasAccessKeySecret": bool((get_setting_value("OSS_ACCESS_KEY_SECRET") or "").strip()),
- "uploadPrefix": _normalize_upload_prefix(get_setting_value("OSS_UPLOAD_PREFIX") or "uploads/"),
- "publicBaseUrl": (get_setting_value("OSS_PUBLIC_BASE_URL") or "").strip().rstrip("/"),
- },
- },
- "db": db_status(),
- }
- audit("ADMIN", admin["id"], "SETTINGS_UPDATE", "AppSettings", "-", before, after)
- return jsonify({"ok": True})
- @app.post("/admin/redis/test")
- def api_admin_redis_test() -> Response:
- _ = require_admin()
- payload = request.get_json(silent=True) or {}
- url = (payload.get("url") or get_setting_value("REDIS_URL") or os.environ.get("REDIS_URL") or "").strip()
- if not url:
- return jsonify({"error": "redis_not_configured"}), 400
- try:
- import redis # type: ignore
- except Exception:
- return jsonify({"error": "redis_client_missing"}), 500
- try:
- r = redis.Redis.from_url(url, socket_connect_timeout=1, socket_timeout=2, decode_responses=False)
- ok = bool(r.ping())
- return jsonify({"ok": ok})
- except Exception:
- return jsonify({"error": "connect_failed"}), 502
- @app.get("/admin/db/status")
- def api_admin_db_status() -> Response:
- _ = require_admin()
- probe = {"connectOk": True, "effective": None, "error": None}
- try:
- _ = fetch_one("SELECT 1 AS one", ())
- probe["connectOk"] = True
- except Exception as e:
- probe["connectOk"] = False
- probe["error"] = str(e) or "connect_failed"
- try:
- probe["effective"] = get_active_backend()
- except Exception:
- probe["effective"] = None
- return jsonify({"ok": True, "db": db_status(), "probe": probe})
- @app.post("/admin/db/switch")
- def api_admin_db_switch() -> Response:
- admin = require_admin()
- payload = request.get_json(silent=True) or {}
- target = (payload.get("target") or "").strip().lower()
- force = bool(payload.get("force"))
- mysql = payload.get("mysql") or {}
- if isinstance(mysql, dict) and mysql:
- mysql_host = mysql.get("host")
- mysql_port = mysql.get("port")
- mysql_user = mysql.get("user")
- mysql_database = mysql.get("database")
- mysql_password = mysql.get("password")
- clear_mysql_password = bool(mysql.get("clearPassword"))
- if mysql_host is not None:
- mysql_host = str(mysql_host).strip()
- if mysql_host:
- set_setting_value("MYSQL_HOST", mysql_host, category="DB")
- else:
- delete_setting_value("MYSQL_HOST")
- if mysql_port is not None:
- mysql_port = str(mysql_port).strip()
- if mysql_port:
- p = parse_int(mysql_port, 0)
- if p <= 0 or p > 65535:
- return jsonify({"error": "invalid_mysql_port"}), 400
- set_setting_value("MYSQL_PORT", str(p), category="DB")
- else:
- delete_setting_value("MYSQL_PORT")
- if mysql_user is not None:
- mysql_user = str(mysql_user).strip()
- if mysql_user:
- set_setting_value("MYSQL_USER", mysql_user, category="DB")
- else:
- delete_setting_value("MYSQL_USER")
- if mysql_database is not None:
- mysql_database = str(mysql_database).strip()
- if mysql_database:
- set_setting_value("MYSQL_DATABASE", mysql_database, category="DB")
- else:
- delete_setting_value("MYSQL_DATABASE")
- if clear_mysql_password:
- delete_setting_value("MYSQL_PASSWORD")
- elif mysql_password is not None:
- mysql_password = str(mysql_password).strip()
- if mysql_password:
- set_setting_value("MYSQL_PASSWORD", mysql_password, category="DB")
- try:
- result = switch_database(target=target, force=force)
- except RuntimeError as e:
- code = str(e) or "switch_failed"
- status = 500
- if code in {"invalid_target"}:
- status = 400
- elif code in {"mysql_not_configured"}:
- status = 400
- elif code in {"connect_failed", "db_create_failed"}:
- status = 502
- elif code in {"target_not_empty"}:
- status = 409
- elif code in {"migration_running"}:
- status = 409
- elif code in {"pymysql_required"}:
- status = 500
- elif code in {"verify_failed"}:
- status = 502
- return jsonify({"error": code}), status
- except Exception:
- return jsonify({"error": "switch_failed"}), 500
- audit("ADMIN", admin["id"], "DB_SWITCH", "Database", "-", {"target": target, "force": force}, result)
- return jsonify(result)
- @app.post("/admin/mysql/test")
- def api_admin_mysql_test() -> Response:
- _ = require_admin()
- payload = request.get_json(silent=True) or {}
- config = get_config()
- host = (payload.get("host") or get_setting_value("MYSQL_HOST") or config.mysql_host or "").strip()
- port = parse_int(payload.get("port") or get_setting_value("MYSQL_PORT") or "", config.mysql_port or 3306)
- user = (payload.get("user") or get_setting_value("MYSQL_USER") or config.mysql_user or "").strip()
- database = (payload.get("database") or get_setting_value("MYSQL_DATABASE") or config.mysql_database or "").strip()
- password = payload.get("password")
- if password is None:
- password = get_setting_value("MYSQL_PASSWORD")
- if password is None:
- password = config.mysql_password
- password = (str(password) if password is not None else "").strip()
- if not host or not user or not database:
- return jsonify({"error": "mysql_params_required"}), 400
- if port <= 0 or port > 65535:
- return jsonify({"error": "invalid_mysql_port"}), 400
- try:
- import pymysql
- except Exception:
- return jsonify({"error": "pymysql_required"}), 500
- try:
- created_db = False
- try:
- conn = pymysql.connect(
- host=host,
- port=port,
- user=user,
- password=password,
- database=database,
- charset="utf8mb4",
- connect_timeout=3,
- read_timeout=3,
- write_timeout=3,
- autocommit=True,
- )
- except Exception as e:
- errno = int(getattr(e, "args", [0])[0] or 0) if getattr(e, "args", None) else 0
- if errno != 1049:
- raise
- server_conn = pymysql.connect(
- host=host,
- port=port,
- user=user,
- password=password,
- charset="utf8mb4",
- connect_timeout=3,
- read_timeout=3,
- write_timeout=3,
- autocommit=True,
- )
- try:
- esc = database.replace("`", "``")
- cur = server_conn.cursor()
- cur.execute(
- f"CREATE DATABASE IF NOT EXISTS `{esc}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
- )
- created_db = True
- finally:
- server_conn.close()
- conn = pymysql.connect(
- host=host,
- port=port,
- user=user,
- password=password,
- database=database,
- charset="utf8mb4",
- connect_timeout=3,
- read_timeout=3,
- write_timeout=3,
- autocommit=True,
- )
- try:
- cur = conn.cursor()
- cur.execute("SELECT 1")
- _ = cur.fetchone()
- finally:
- conn.close()
- except Exception as e:
- errno = int(getattr(e, "args", [0])[0] or 0) if getattr(e, "args", None) else 0
- msg = str(getattr(e, "args", [""])[1] if getattr(e, "args", None) and len(e.args) > 1 else str(e) or "")
- msg = (msg or "").strip()[:200]
- return jsonify({"ok": False, "error": "connect_failed", "errno": errno, "message": msg}), 502
- return jsonify({"ok": True, "createdDatabase": bool(created_db)})
- @app.get("/admin/gogs/repo")
- def api_admin_gogs_repo() -> Response:
- _ = require_admin()
- owner = (request.args.get("owner") or "").strip()
- repo = (request.args.get("repo") or "").strip()
- if not owner or not repo:
- return jsonify({"error": "owner_repo_required"}), 400
- resp = gogs_repo_info(owner, repo)
- if resp.status_code == 404:
- return jsonify({"error": "repo_not_found"}), 404
- if resp.status_code >= 400:
- if resp.status_code == 599:
- return jsonify({"error": "gogs_failed", "status": resp.status_code, "url": resp.url}), 502
- return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502
- data = resp.json()
- return jsonify(
- {
- "owner": owner,
- "repo": repo,
- "fullName": data.get("full_name"),
- "description": data.get("description"),
- "private": bool(data.get("private")),
- "defaultBranch": data.get("default_branch"),
- "htmlUrl": data.get("html_url"),
- "cloneUrl": data.get("clone_url"),
- "sshUrl": data.get("ssh_url"),
- "updatedAt": data.get("updated_at") or data.get("updated_at_unix"),
- }
- )
- @app.get("/admin/gogs/user-repos")
- def api_admin_gogs_user_repos() -> Response:
- _ = require_admin()
- owner = (request.args.get("owner") or "").strip()
- q = (request.args.get("q") or "").strip().lower()
- if not owner:
- return jsonify({"error": "owner_required"}), 400
- resp = gogs_user_repos(owner)
- if resp.status_code == 404:
- return jsonify({"error": "user_not_found"}), 404
- if resp.status_code >= 400:
- if resp.status_code == 599:
- return jsonify({"error": "gogs_failed", "status": resp.status_code, "url": resp.url}), 502
- return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502
- repos = []
- for item in resp.json() or []:
- name = (item.get("name") or "").strip()
- full_name = (item.get("full_name") or "").strip()
- if q and (q not in name.lower() and q not in full_name.lower()):
- continue
- repos.append(
- {
- "id": item.get("id"),
- "name": name,
- "fullName": full_name,
- "private": bool(item.get("private")),
- "defaultBranch": item.get("default_branch"),
- "description": item.get("description"),
- "htmlUrl": item.get("html_url"),
- "updatedAt": item.get("updated_at") or item.get("updated_at_unix"),
- }
- )
- return jsonify({"items": repos})
- @app.get("/admin/gogs/branches")
- def api_admin_gogs_branches() -> Response:
- _ = require_admin()
- owner = (request.args.get("owner") or "").strip()
- repo = (request.args.get("repo") or "").strip()
- if not owner or not repo:
- return jsonify({"error": "owner_repo_required"}), 400
- resp = gogs_branches(owner, repo)
- if resp.status_code == 404:
- return jsonify({"error": "repo_not_found"}), 404
- if resp.status_code >= 400:
- if resp.status_code == 599:
- return jsonify({"error": "gogs_failed", "status": resp.status_code, "url": resp.url}), 502
- return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502
- items = []
- for b in resp.json() or []:
- name = (b.get("name") or "").strip()
- if not name:
- continue
- items.append({"name": name, "commit": (b.get("commit") or {}).get("id")})
- items.sort(key=lambda x: x["name"])
- return jsonify({"items": items})
- @app.get("/admin/gogs/tags")
- def api_admin_gogs_tags() -> Response:
- _ = require_admin()
- owner = (request.args.get("owner") or "").strip()
- repo = (request.args.get("repo") or "").strip()
- if not owner or not repo:
- return jsonify({"error": "owner_repo_required"}), 400
- resp = gogs_tags(owner, repo)
- if resp.status_code == 404:
- return jsonify({"error": "repo_not_found"}), 404
- if resp.status_code >= 400:
- if resp.status_code == 599:
- return jsonify({"error": "gogs_failed", "status": resp.status_code, "url": resp.url}), 502
- return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502
- items = []
- for t in resp.json() or []:
- name = (t.get("name") or "").strip()
- if not name:
- continue
- items.append({"name": name})
- items.sort(key=lambda x: x["name"])
- return jsonify({"items": items})
- @app.get("/admin/gogs/file-text")
- def api_admin_gogs_file_text() -> Response:
- _ = require_admin()
- owner = (request.args.get("owner") or "").strip()
- repo = (request.args.get("repo") or "").strip()
- ref = (request.args.get("ref") or "").strip() or "AUTO"
- path = (request.args.get("path") or "").strip() or "README.md"
- if not owner or not repo:
- return jsonify({"error": "owner_repo_required"}), 400
- path = path.lstrip("/")
- if not path:
- return jsonify({"error": "path_required"}), 400
- if ref.upper() == "AUTO":
- repo_resp = gogs_repo_info(owner, repo)
- if repo_resp.status_code == 404:
- return jsonify({"error": "repo_not_found"}), 404
- if repo_resp.status_code >= 400:
- if repo_resp.status_code == 599:
- return jsonify({"error": "gogs_failed", "status": repo_resp.status_code, "url": repo_resp.url}), 502
- return jsonify({"error": "gogs_failed", "status": repo_resp.status_code}), 502
- repo_data = repo_resp.json() or {}
- ref = (repo_data.get("default_branch") or "master").strip() or "master"
- resp = gogs_contents(owner, repo, path, ref)
- if resp.status_code == 404:
- return jsonify({"error": "file_not_found"}), 404
- if resp.status_code >= 400:
- if resp.status_code == 599:
- return jsonify({"error": "gogs_failed", "status": resp.status_code, "url": resp.url}), 502
- return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502
- data = resp.json() or {}
- if (data.get("type") or "").strip().lower() != "file":
- return jsonify({"error": "not_a_file"}), 400
- encoding = (data.get("encoding") or "").strip().lower()
- if encoding != "base64":
- return jsonify({"error": "unsupported_encoding"}), 502
- content_b64 = (data.get("content") or "").strip()
- if not content_b64:
- return jsonify({"error": "empty_content"}), 404
- try:
- raw = base64.b64decode(content_b64, validate=False)
- except Exception:
- return jsonify({"error": "decode_failed"}), 502
- if len(raw) > 200_000:
- return jsonify({"error": "file_too_large"}), 413
- try:
- text = raw.decode("utf-8")
- except Exception:
- text = raw.decode("utf-8", errors="replace")
- return jsonify({"owner": owner, "repo": repo, "ref": ref, "path": path, "text": text, "sha": data.get("sha")})
- @app.get("/admin/gogs/repos")
- def api_admin_gogs_repos() -> Response:
- _ = require_admin()
- owner = (request.args.get("owner") or "").strip()
- q = (request.args.get("q") or "").strip().lower()
- config = get_config()
- gogs_token = get_setting_value("GOGS_TOKEN")
- if gogs_token is not None:
- gogs_token = gogs_token.strip() or None
- if gogs_token is None:
- gogs_token = config.gogs_token
- if not owner and not gogs_token:
- return jsonify({"error": "gogs_token_required"}), 400
- resp = gogs_user_repos(owner) if owner else gogs_my_repos()
- if resp.status_code == 404 and owner:
- return jsonify({"error": "user_not_found"}), 404
- if resp.status_code >= 400:
- if resp.status_code == 599:
- return jsonify({"error": "gogs_failed", "status": resp.status_code, "url": resp.url}), 502
- return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502
- repos = []
- for item in resp.json() or []:
- name = (item.get("name") or "").strip()
- full_name = (item.get("full_name") or "").strip()
- if q and (q not in name.lower() and q not in full_name.lower()):
- continue
- repo_owner = ((item.get("owner") or {}).get("username") or "").strip()
- if not repo_owner and "/" in full_name:
- repo_owner = full_name.split("/", 1)[0].strip()
- repos.append(
- {
- "id": item.get("id"),
- "owner": repo_owner,
- "name": name,
- "fullName": full_name,
- "private": bool(item.get("private")),
- "defaultBranch": item.get("default_branch"),
- "description": item.get("description"),
- "htmlUrl": item.get("html_url"),
- "updatedAt": item.get("updated_at") or item.get("updated_at_unix"),
- }
- )
- return jsonify({"items": repos})
- @app.get("/admin/plans")
- def api_admin_plans() -> Response:
- _ = require_admin()
- rows = fetch_all("SELECT * FROM plans ORDER BY sort DESC, id DESC")
- return jsonify(
- [
- {
- "id": row["id"],
- "name": row["name"],
- "durationDays": row["duration_days"],
- "priceCents": row["price_cents"],
- "enabled": bool(row["enabled"]),
- "sort": row["sort"],
- }
- for row in rows
- ]
- )
- @app.post("/admin/plans")
- def api_admin_create_plan() -> Response:
- admin = require_admin()
- payload = request.get_json(silent=True) or {}
- name = (payload.get("name") or "").strip()
- duration_days = parse_int(payload.get("durationDays"), 0)
- price_cents = parse_int(payload.get("priceCents"), 0)
- enabled = 1 if payload.get("enabled", True) else 0
- sort = parse_int(payload.get("sort"), 0)
- if not name or duration_days <= 0 or price_cents <= 0:
- return jsonify({"error": "invalid_payload"}), 400
- cur = execute(
- "INSERT INTO plans (name, duration_days, price_cents, enabled, sort) VALUES (?, ?, ?, ?, ?)",
- (name, duration_days, price_cents, enabled, sort),
- )
- audit("ADMIN", admin["id"], "PLAN_CREATE", "Plan", str(cur.lastrowid), None, payload)
- return jsonify({"id": cur.lastrowid})
- @app.put("/admin/plans/<int:plan_id>")
- def api_admin_update_plan(plan_id: int) -> Response:
- admin = require_admin()
- before_row = fetch_one("SELECT * FROM plans WHERE id = ?", (plan_id,))
- if before_row is None:
- abort(404)
- payload = request.get_json(silent=True) or {}
- name = (payload.get("name") or before_row["name"]).strip()
- duration_days = parse_int(payload.get("durationDays", before_row["duration_days"]), before_row["duration_days"])
- price_cents = parse_int(payload.get("priceCents", before_row["price_cents"]), before_row["price_cents"])
- enabled = 1 if payload.get("enabled", bool(before_row["enabled"])) else 0
- sort = parse_int(payload.get("sort", before_row["sort"]), before_row["sort"])
- if not name or duration_days <= 0 or price_cents <= 0:
- return jsonify({"error": "invalid_payload"}), 400
- execute(
- "UPDATE plans SET name = ?, duration_days = ?, price_cents = ?, enabled = ?, sort = ? WHERE id = ?",
- (name, duration_days, price_cents, enabled, sort, plan_id),
- )
- after_row = fetch_one("SELECT * FROM plans WHERE id = ?", (plan_id,))
- audit(
- "ADMIN",
- admin["id"],
- "PLAN_UPDATE",
- "Plan",
- str(plan_id),
- dict(before_row),
- dict(after_row) if after_row is not None else None,
- )
- return jsonify({"ok": True})
- @app.delete("/admin/plans/<int:plan_id>")
- def api_admin_delete_plan(plan_id: int) -> Response:
- admin = require_admin()
- before_row = fetch_one("SELECT * FROM plans WHERE id = ?", (plan_id,))
- if before_row is None:
- abort(404)
- execute("DELETE FROM plans WHERE id = ?", (plan_id,))
- audit("ADMIN", admin["id"], "PLAN_DELETE", "Plan", str(plan_id), dict(before_row), None)
- return jsonify({"ok": True})
- @app.get("/admin/resources")
- def api_admin_resources() -> Response:
- _ = require_admin()
- q = (request.args.get("q") or "").strip()
- resource_type = (request.args.get("type") or "").strip().upper()
- status = (request.args.get("status") or "").strip().upper()
- page = max(1, parse_int(request.args.get("page"), 1))
- page_size = min(100, max(1, parse_int(request.args.get("pageSize"), 20)))
- where = []
- params: list[Any] = []
- if q:
- where.append("(title LIKE ? OR summary LIKE ? OR repo_owner LIKE ? OR repo_name LIKE ?)")
- like = f"%{q}%"
- params.extend([like, like, like, like])
- if resource_type in {"FREE", "VIP"}:
- where.append("type = ?")
- params.append(resource_type)
- if status in {"DRAFT", "ONLINE", "OFFLINE"}:
- where.append("status = ?")
- params.append(status)
- where_sql = f"WHERE {' AND '.join(where)}" if where else ""
- total_row = fetch_one(f"SELECT COUNT(1) AS cnt FROM resources {where_sql}", tuple(params))
- total = int(total_row["cnt"]) if total_row is not None else 0
- offset = (page - 1) * page_size
- rows = fetch_all(
- f"""
- SELECT *
- FROM resources
- {where_sql}
- ORDER BY updated_at DESC, id DESC
- LIMIT ? OFFSET ?
- """,
- tuple(params + [page_size, offset]),
- )
- items = []
- for row in rows:
- try:
- tags = json.loads(row["tags_json"] or "[]")
- except Exception:
- tags = []
- items.append(
- {
- "id": row["id"],
- "title": row["title"],
- "summary": row["summary"],
- "type": row["type"],
- "status": row["status"],
- "coverUrl": row["cover_url"],
- "tags": tags,
- "repoOwner": row["repo_owner"],
- "repoName": row["repo_name"],
- "repoPrivate": bool(row["repo_private"]),
- "repoHtmlUrl": row["repo_html_url"],
- "defaultRef": row["default_ref"],
- "updatedAt": row["updated_at"],
- }
- )
- return jsonify({"items": items, "total": total, "page": page, "pageSize": page_size})
- @app.post("/admin/resources")
- def api_admin_create_resource() -> Response:
- admin = require_admin()
- payload = request.get_json(silent=True) or {}
- title = (payload.get("title") or "").strip()
- summary = (payload.get("summary") or "").strip()
- resource_type = (payload.get("type") or "").strip().upper()
- status = (payload.get("status") or "").strip().upper()
- cover_url = (payload.get("coverUrl") or "").strip() or None
- tags = _parse_keywords(payload.get("keywords") if "keywords" in payload else payload.get("tags"))
- sync_readme = payload.get("syncReadme")
- if sync_readme is None:
- sync_readme = True
- create_repo = payload.get("createRepo")
- if create_repo is None:
- create_repo = True
- repo_owner = (payload.get("repoOwner") or "").strip()
- repo_name = (payload.get("repoName") or "").strip()
- repo_private = 1 if payload.get("repoPrivate") else 0
- repo_html_url: str | None = None
- default_ref = (payload.get("defaultRef") or "").strip()
- requested_ref = default_ref
- if not title or resource_type not in {"FREE", "VIP"}:
- return jsonify({"error": "invalid_payload"}), 400
- if status not in {"DRAFT", "ONLINE", "OFFLINE"}:
- status = "DRAFT"
- base_url, token = _gogs_base_url_and_token()
- if not base_url:
- return jsonify({"error": "gogs_base_url_required"}), 400
- if not token:
- return jsonify({"error": "gogs_token_required"}), 400
- if create_repo:
- desired_name = repo_name or _slugify_repo_name(title)
- desired_owner = repo_owner
- description = title
- repo_data: dict[str, Any] | None = None
- created_resp: requests.Response | None = None
- for i in range(5):
- try_name = desired_name if i == 0 else f"{desired_name}-{uuid.uuid4().hex[:6]}"
- resp = gogs_create_repo(desired_owner, try_name, description, bool(repo_private))
- if desired_owner and resp.status_code in {403, 404}:
- resp = gogs_create_repo("", try_name, description, bool(repo_private))
- if resp.status_code == 409:
- if repo_name:
- return jsonify({"error": "repo_exists"}), 409
- continue
- if resp.status_code >= 400:
- msg = _gogs_error_message(resp)
- upstream_url = _safe_upstream_url(resp)
- if resp.status_code in {401, 403}:
- return jsonify({"error": "gogs_unauthorized", "status": resp.status_code, "message": msg, "url": upstream_url}), 400
- if resp.status_code == 599:
- return jsonify({"error": "gogs_unreachable", "status": resp.status_code, "message": msg, "url": upstream_url}), 502
- return jsonify({"error": "gogs_failed", "status": resp.status_code, "message": msg, "url": upstream_url}), 502
- created_resp = resp
- repo_data = resp.json() or {}
- break
- if created_resp is None or repo_data is None:
- return jsonify({"error": "repo_create_failed"}), 502
- full_name = (repo_data.get("full_name") or "").strip()
- if "/" in full_name:
- owner_part, repo_part = full_name.split("/", 1)
- repo_owner = owner_part.strip()
- repo_name = repo_part.strip()
- else:
- repo_owner = (repo_data.get("owner", {}) or {}).get("username") or repo_owner
- repo_name = (repo_data.get("name") or repo_name).strip()
- repo_html_url = (repo_data.get("html_url") or "").strip() or None
- if repo_data.get("private") is not None:
- repo_private = 1 if repo_data.get("private") else 0
- default_ref = (repo_data.get("default_branch") or "master").strip()
- if sync_readme:
- readme = f"# {title}\n\n{summary}\n" if summary else f"# {title}\n"
- try:
- gogs_git_write_file(repo_owner, repo_name, default_ref, "README.md", readme, "init README", must_create=True)
- except GogsGitError as e:
- if e.code != "file_exists":
- if e.code in {"ref_required", "gogs_token_required", "invalid_gogs_base_url"}:
- return jsonify({"error": e.code, "message": e.message}), 400
- if e.code == "git_not_found":
- return jsonify({"error": e.code, "message": e.message}), 501
- return jsonify({"error": "readme_sync_failed", "message": e.message}), 502
- else:
- if not repo_owner or not repo_name:
- return jsonify({"error": "repo_required"}), 400
- repo_resp = gogs_repo_info(repo_owner, repo_name)
- if repo_resp.status_code == 404:
- return jsonify({"error": "repo_not_found"}), 400
- if repo_resp.status_code >= 400:
- msg = _gogs_error_message(repo_resp)
- upstream_url = _safe_upstream_url(repo_resp)
- if repo_resp.status_code in {401, 403}:
- return jsonify({"error": "gogs_unauthorized", "status": repo_resp.status_code, "message": msg, "url": upstream_url}), 400
- if repo_resp.status_code == 599:
- return jsonify({"error": "gogs_unreachable", "status": repo_resp.status_code, "message": msg, "url": upstream_url}), 502
- return jsonify({"error": "gogs_failed", "status": repo_resp.status_code, "message": msg, "url": upstream_url}), 502
- repo_data = repo_resp.json()
- repo_html_url = (repo_data.get("html_url") or "").strip() or None
- if repo_data.get("private") is not None:
- repo_private = 1 if repo_data.get("private") else 0
- if not default_ref or default_ref.upper() == "AUTO":
- default_ref = (repo_data.get("default_branch") or "master").strip()
- if requested_ref and requested_ref.upper() != "AUTO":
- branches_resp = gogs_branches(repo_owner, repo_name)
- tags_resp = gogs_tags(repo_owner, repo_name)
- if branches_resp.status_code >= 400:
- msg = _gogs_error_message(branches_resp)
- upstream_url = _safe_upstream_url(branches_resp)
- if branches_resp.status_code in {401, 403}:
- return jsonify({"error": "gogs_unauthorized", "status": branches_resp.status_code, "message": msg, "url": upstream_url}), 400
- if branches_resp.status_code == 599:
- return jsonify({"error": "gogs_unreachable", "status": branches_resp.status_code, "message": msg, "url": upstream_url}), 502
- return jsonify({"error": "gogs_failed", "status": branches_resp.status_code, "message": msg, "url": upstream_url}), 502
- if tags_resp.status_code >= 400:
- msg = _gogs_error_message(tags_resp)
- upstream_url = _safe_upstream_url(tags_resp)
- if tags_resp.status_code in {401, 403}:
- return jsonify({"error": "gogs_unauthorized", "status": tags_resp.status_code, "message": msg, "url": upstream_url}), 400
- if tags_resp.status_code == 599:
- return jsonify({"error": "gogs_unreachable", "status": tags_resp.status_code, "message": msg, "url": upstream_url}), 502
- return jsonify({"error": "gogs_failed", "status": tags_resp.status_code, "message": msg, "url": upstream_url}), 502
- exists = False
- for b in branches_resp.json() or []:
- if (b.get("name") or "").strip() == requested_ref:
- exists = True
- break
- if not exists:
- for t in tags_resp.json() or []:
- if (t.get("name") or "").strip() == requested_ref:
- exists = True
- break
- if not exists:
- return jsonify({"error": "invalid_ref"}), 400
- if sync_readme:
- readme = f"# {title}\n\n{summary}\n" if summary else f"# {title}\n"
- try:
- gogs_git_write_file(repo_owner, repo_name, default_ref, "README.md", readme, "sync README", must_create=False)
- except GogsGitError as e:
- if e.code == "file_not_found":
- try:
- gogs_git_write_file(repo_owner, repo_name, default_ref, "README.md", readme, "sync README", must_create=True)
- except GogsGitError as e2:
- if e2.code in {"ref_required", "gogs_token_required", "invalid_gogs_base_url"}:
- return jsonify({"error": e2.code, "message": e2.message}), 400
- if e2.code == "git_not_found":
- return jsonify({"error": e2.code, "message": e2.message}), 501
- return jsonify({"error": "readme_sync_failed", "message": e2.message}), 502
- else:
- if e.code in {"ref_required", "gogs_token_required", "invalid_gogs_base_url"}:
- return jsonify({"error": e.code, "message": e.message}), 400
- if e.code == "git_not_found":
- return jsonify({"error": e.code, "message": e.message}), 501
- return jsonify({"error": "readme_sync_failed", "message": e.message}), 502
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- (title, summary, resource_type, status, cover_url, json.dumps(tags, ensure_ascii=False), repo_owner, repo_name, repo_private, repo_html_url, default_ref, now, now),
- )
- audit("ADMIN", admin["id"], "RESOURCE_CREATE", "Resource", str(cur.lastrowid), None, payload)
- return jsonify({"id": cur.lastrowid})
- @app.put("/admin/resources/<int:resource_id>")
- def api_admin_update_resource(resource_id: int) -> Response:
- admin = require_admin()
- before_row = fetch_one("SELECT * FROM resources WHERE id = ?", (resource_id,))
- if before_row is None:
- abort(404)
- payload = request.get_json(silent=True) or {}
- title = (payload.get("title") or before_row["title"]).strip()
- summary = (payload.get("summary") or before_row["summary"]).strip()
- resource_type = (payload.get("type") or before_row["type"]).strip().upper()
- status = (payload.get("status") or before_row["status"]).strip().upper()
- cover_url = (payload.get("coverUrl") if "coverUrl" in payload else before_row["cover_url"]) or None
- if cover_url is not None:
- cover_url = str(cover_url).strip() or None
- if "keywords" in payload or "tags" in payload:
- tags = _parse_keywords(payload.get("keywords") if "keywords" in payload else payload.get("tags"))
- tags_json = json.dumps(tags, ensure_ascii=False)
- else:
- tags_json = before_row["tags_json"]
- repo_owner = (payload.get("repoOwner") or before_row["repo_owner"]).strip()
- repo_name = (payload.get("repoName") or before_row["repo_name"]).strip()
- default_ref = (payload.get("defaultRef") or before_row["default_ref"]).strip()
- requested_ref = (payload.get("defaultRef") or "").strip()
- if not title or resource_type not in {"FREE", "VIP"}:
- return jsonify({"error": "invalid_payload"}), 400
- if status not in {"DRAFT", "ONLINE", "OFFLINE"}:
- return jsonify({"error": "invalid_status"}), 400
- if not repo_owner or not repo_name:
- return jsonify({"error": "repo_required"}), 400
- base_url, token = _gogs_base_url_and_token()
- if not base_url:
- return jsonify({"error": "gogs_base_url_required"}), 400
- if not token:
- return jsonify({"error": "gogs_token_required"}), 400
- repo_resp = gogs_repo_info(repo_owner, repo_name)
- if repo_resp.status_code == 404:
- return jsonify({"error": "repo_not_found"}), 400
- if repo_resp.status_code >= 400:
- msg = _gogs_error_message(repo_resp)
- if repo_resp.status_code in {401, 403}:
- return jsonify({"error": "gogs_unauthorized", "status": repo_resp.status_code, "message": msg}), 400
- if repo_resp.status_code == 599:
- return jsonify({"error": "gogs_unreachable", "status": repo_resp.status_code, "message": msg}), 502
- return jsonify({"error": "gogs_failed", "status": repo_resp.status_code, "message": msg}), 502
- repo_data = repo_resp.json()
- repo_html_url = (repo_data.get("html_url") or "").strip() or None
- repo_private = 1 if repo_data.get("private") else 0
- if not default_ref or default_ref.upper() == "AUTO":
- default_ref = (repo_data.get("default_branch") or "master").strip()
- if requested_ref and requested_ref.upper() != "AUTO":
- branches_resp = gogs_branches(repo_owner, repo_name)
- tags_resp = gogs_tags(repo_owner, repo_name)
- if branches_resp.status_code >= 400:
- msg = _gogs_error_message(branches_resp)
- if branches_resp.status_code in {401, 403}:
- return jsonify({"error": "gogs_unauthorized", "status": branches_resp.status_code, "message": msg}), 400
- if branches_resp.status_code == 599:
- return jsonify({"error": "gogs_unreachable", "status": branches_resp.status_code, "message": msg}), 502
- return jsonify({"error": "gogs_failed", "status": branches_resp.status_code, "message": msg}), 502
- if tags_resp.status_code >= 400:
- msg = _gogs_error_message(tags_resp)
- if tags_resp.status_code in {401, 403}:
- return jsonify({"error": "gogs_unauthorized", "status": tags_resp.status_code, "message": msg}), 400
- if tags_resp.status_code == 599:
- return jsonify({"error": "gogs_unreachable", "status": tags_resp.status_code, "message": msg}), 502
- return jsonify({"error": "gogs_failed", "status": tags_resp.status_code, "message": msg}), 502
- exists = False
- for b in branches_resp.json() or []:
- if (b.get("name") or "").strip() == requested_ref:
- exists = True
- break
- if not exists:
- for t in tags_resp.json() or []:
- if (t.get("name") or "").strip() == requested_ref:
- exists = True
- break
- if not exists:
- return jsonify({"error": "invalid_ref"}), 400
- sync_readme = payload.get("syncReadme")
- if sync_readme is None:
- sync_readme = ("title" in payload) or ("summary" in payload)
- if sync_readme:
- readme = f"# {title}\n\n{summary}\n" if summary else f"# {title}\n"
- try:
- gogs_git_write_file(repo_owner, repo_name, default_ref, "README.md", readme, "sync README", must_create=False)
- except GogsGitError as e:
- if e.code == "file_not_found":
- try:
- gogs_git_write_file(repo_owner, repo_name, default_ref, "README.md", readme, "sync README", must_create=True)
- except GogsGitError as e2:
- if e2.code in {"ref_required", "gogs_token_required", "invalid_gogs_base_url"}:
- return jsonify({"error": e2.code, "message": e2.message}), 400
- if e2.code == "git_not_found":
- return jsonify({"error": e2.code, "message": e2.message}), 501
- return jsonify({"error": "readme_sync_failed", "message": e2.message}), 502
- else:
- if e.code in {"ref_required", "gogs_token_required", "invalid_gogs_base_url"}:
- return jsonify({"error": e.code, "message": e.message}), 400
- if e.code == "git_not_found":
- return jsonify({"error": e.code, "message": e.message}), 501
- return jsonify({"error": "readme_sync_failed", "message": e.message}), 502
- execute(
- """
- UPDATE resources
- SET title = ?, summary = ?, type = ?, status = ?, cover_url = ?, tags_json = ?, repo_owner = ?, repo_name = ?, repo_private = ?, repo_html_url = ?, default_ref = ?, updated_at = ?
- WHERE id = ?
- """,
- (
- title,
- summary,
- resource_type,
- status,
- cover_url,
- tags_json,
- repo_owner,
- repo_name,
- repo_private,
- repo_html_url,
- default_ref,
- isoformat(utcnow()),
- resource_id,
- ),
- )
- after_row = fetch_one("SELECT * FROM resources WHERE id = ?", (resource_id,))
- audit(
- "ADMIN",
- admin["id"],
- "RESOURCE_UPDATE",
- "Resource",
- str(resource_id),
- dict(before_row),
- dict(after_row) if after_row is not None else None,
- )
- return jsonify({"ok": True})
- @app.delete("/admin/resources/<int:resource_id>")
- def api_admin_delete_resource(resource_id: int) -> Response:
- admin = require_admin()
- before_row = fetch_one("SELECT * FROM resources WHERE id = ?", (resource_id,))
- if before_row is None:
- abort(404)
- repo_owner = (before_row["repo_owner"] or "").strip()
- repo_name = (before_row["repo_name"] or "").strip()
- if repo_owner and repo_name:
- resp = gogs_delete_repo(repo_owner, repo_name)
- if resp.status_code not in {204, 404} and resp.status_code >= 400:
- msg = _gogs_error_message(resp)
- upstream_url = _safe_upstream_url(resp)
- if resp.status_code in {401, 403}:
- return jsonify({"error": "gogs_unauthorized", "status": resp.status_code, "message": msg, "url": upstream_url}), 400
- if resp.status_code == 599:
- return jsonify({"error": "gogs_unreachable", "status": resp.status_code, "message": msg, "url": upstream_url}), 502
- return jsonify({"error": "gogs_failed", "status": resp.status_code, "message": msg, "url": upstream_url}), 502
- upload_names: set[str] = set()
- upload_names |= _extract_upload_names(before_row["cover_url"])
- upload_names |= _extract_upload_names(before_row["summary"])
- execute("DELETE FROM resources WHERE id = ?", (resource_id,))
- audit("ADMIN", admin["id"], "RESOURCE_DELETE", "Resource", str(resource_id), dict(before_row), None)
- _delete_upload_files(upload_names)
- return jsonify({"ok": True})
- def _download_cache_glob_prefix(*, resource_id: int, owner: str, repo: str) -> str:
- safe_owner = re.sub(r"[^a-zA-Z0-9._-]+", "_", (owner or "").strip())[:50] or "owner"
- safe_repo = re.sub(r"[^a-zA-Z0-9._-]+", "_", (repo or "").strip())[:50] or "repo"
- rid = int(resource_id or 0)
- return f"res{rid}__{safe_owner}__{safe_repo}__"
- def _admin_cache_entry_from_path(p: Path) -> dict[str, Any]:
- try:
- st = p.stat()
- except Exception:
- st = None
- meta = _read_download_cache_meta(p)
- ttl_remaining = None
- if st is not None and _download_cache_ttl_seconds > 0:
- ttl_remaining = max(0, int(_download_cache_ttl_seconds - (time.time() - float(st.st_mtime))))
- commit = None
- ref = None
- if isinstance(meta, dict):
- commit = (meta.get("commit") or "").strip() or None
- ref = (meta.get("ref") or "").strip() or None
- return {
- "fileName": p.name,
- "path": str(p),
- "bytes": int(st.st_size) if st is not None else None,
- "mtime": int(st.st_mtime) if st is not None else None,
- "ttlRemainingSeconds": ttl_remaining,
- "ref": ref,
- "commit": commit,
- "meta": meta,
- "ready": bool(st is not None and st.st_size > 0),
- }
- def _admin_cache_building_for(*, resource_id: int, owner: str, repo: str) -> list[dict[str, Any]]:
- rid = int(resource_id or 0)
- prefix = f"res{rid}:{owner}/{repo}@"
- out: list[dict[str, Any]] = []
- with _download_lock:
- for k, v in _download_jobs.items():
- if not isinstance(k, str) or not k.startswith(prefix):
- continue
- if not isinstance(v, dict):
- continue
- out.append(
- {
- "key": k,
- "state": v.get("state"),
- "updatedAt": v.get("updatedAt"),
- "error": v.get("error"),
- "ref": v.get("ref"),
- "commit": v.get("commit"),
- "cacheKey": v.get("cacheKey"),
- }
- )
- out.sort(key=lambda x: float(x.get("updatedAt") or 0), reverse=True)
- return out[:20]
- @app.get("/admin/resources/<int:resource_id>/download-cache/summary")
- def api_admin_resource_download_cache_summary(resource_id: int) -> Response:
- _ = require_admin()
- row = fetch_one("SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ?", (resource_id,))
- if row is None:
- abort(404)
- owner, repo, default_ref = row["repo_owner"], row["repo_name"], row["default_ref"]
- cache_dir = _download_cache_dir()
- prefix = _download_cache_glob_prefix(resource_id=resource_id, owner=owner, repo=repo)
- files = list(cache_dir.glob(prefix + "*.zip"))
- latest = None
- latest_mtime = -1
- for p in files:
- try:
- st = p.stat()
- except Exception:
- continue
- if st.st_size <= 0:
- continue
- if float(st.st_mtime) > float(latest_mtime):
- latest_mtime = float(st.st_mtime)
- latest = p
- latest_entry = _admin_cache_entry_from_path(latest) if latest else None
- building = _admin_cache_building_for(resource_id=resource_id, owner=owner, repo=repo)
- return jsonify(
- {
- "ok": True,
- "resourceId": int(resource_id),
- "owner": owner,
- "repo": repo,
- "defaultRef": default_ref,
- "count": len(files),
- "latest": latest_entry,
- "jobs": building,
- }
- )
- @app.get("/admin/resources/<int:resource_id>/download-cache/list")
- def api_admin_resource_download_cache_list(resource_id: int) -> Response:
- _ = require_admin()
- row = fetch_one("SELECT repo_owner, repo_name FROM resources WHERE id = ?", (resource_id,))
- if row is None:
- abort(404)
- owner, repo = row["repo_owner"], row["repo_name"]
- cache_dir = _download_cache_dir()
- prefix = _download_cache_glob_prefix(resource_id=resource_id, owner=owner, repo=repo)
- files = list(cache_dir.glob(prefix + "*.zip"))
- items: list[dict[str, Any]] = []
- for p in files:
- items.append(_admin_cache_entry_from_path(p))
- items.sort(key=lambda x: float(x.get("mtime") or 0), reverse=True)
- return jsonify({"ok": True, "items": items[:50], "total": len(items)})
- @app.get("/admin/resources/<int:resource_id>/download-cache/status")
- def api_admin_resource_download_cache_status(resource_id: int) -> Response:
- _ = require_admin()
- row = fetch_one("SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ?", (resource_id,))
- if row is None:
- abort(404)
- owner, repo, default_ref = row["repo_owner"], row["repo_name"], row["default_ref"]
- ref = (request.args.get("ref") or "").strip() or default_ref
- resolved = _resolve_download_commit(owner=owner, repo=repo, ref=ref)
- commit = resolved.get("commit") if resolved.get("ok") else None
- resolved_kind = resolved.get("kind") or "unknown"
- cache_key = commit or ref
- cache_path = _download_cache_path(resource_id=resource_id, owner=owner, repo=repo, cache_key=cache_key)
- if _download_cache_ready(cache_path):
- return jsonify({"ok": True, "ready": True, "state": "ready", "ref": ref, "commit": commit, "cacheKey": cache_key})
- key = f"res{int(resource_id or 0)}:{owner}/{repo}@{cache_key}"
- with _download_lock:
- job = _download_jobs.get(key) or {}
- return jsonify(
- {
- "ok": True,
- "ready": False,
- "state": job.get("state") or "building",
- "error": job.get("error"),
- "ref": ref,
- "commit": commit,
- "cacheKey": cache_key,
- "refKind": resolved_kind,
- }
- )
- @app.post("/admin/resources/<int:resource_id>/download-cache/refresh")
- def api_admin_resource_download_cache_refresh(resource_id: int) -> Response:
- admin = require_admin()
- row = fetch_one("SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ?", (resource_id,))
- if row is None:
- abort(404)
- payload = request.get_json(silent=True) or {}
- owner, repo, default_ref = row["repo_owner"], row["repo_name"], row["default_ref"]
- ref = (payload.get("ref") or "").strip() or default_ref
- resolved = _resolve_download_commit(owner=owner, repo=repo, ref=ref)
- commit = resolved.get("commit") if resolved.get("ok") else None
- resolved_kind = resolved.get("kind") or "unknown"
- st = _ensure_download_ready(
- resource_id=resource_id,
- owner=owner,
- repo=repo,
- ref=ref,
- commit=commit,
- resolved_kind=resolved_kind,
- force=True,
- )
- audit("ADMIN", admin["id"], "DOWNLOAD_CACHE_REFRESH", "Resource", str(resource_id), None, {"ref": ref, "commit": commit})
- return jsonify(
- {
- "ok": True,
- "ready": bool(st.get("ready")),
- "state": st.get("state") or ("ready" if st.get("ready") else "building"),
- "error": st.get("error"),
- "ref": ref,
- "commit": commit,
- "cacheKey": st.get("cacheKey") or (commit or ref),
- }
- )
- @app.delete("/admin/resources/<int:resource_id>/download-cache")
- def api_admin_resource_download_cache_clear(resource_id: int) -> Response:
- admin = require_admin()
- row = fetch_one("SELECT repo_owner, repo_name FROM resources WHERE id = ?", (resource_id,))
- if row is None:
- abort(404)
- owner, repo = row["repo_owner"], row["repo_name"]
- commit = (request.args.get("commit") or "").strip() or None
- clear_all = (request.args.get("all") or "").strip() in {"1", "true", "True"}
- cache_dir = _download_cache_dir()
- prefix = _download_cache_glob_prefix(resource_id=resource_id, owner=owner, repo=repo)
- files = list(cache_dir.glob(prefix + "*.zip"))
- removed = 0
- for p in files:
- if not clear_all and commit:
- meta = _read_download_cache_meta(p) or {}
- if (meta.get("commit") or "").strip().lower() != commit.lower():
- continue
- try:
- p.unlink()
- removed += 1
- except Exception:
- pass
- try:
- mp = _download_cache_meta_path(p)
- if mp.exists():
- mp.unlink()
- except Exception:
- pass
- audit("ADMIN", admin["id"], "DOWNLOAD_CACHE_CLEAR", "Resource", str(resource_id), None, {"commit": commit, "all": clear_all, "removed": removed})
- return jsonify({"ok": True, "removed": removed})
- @app.get("/admin/resources/<int:resource_id>/download-cache/file")
- def api_admin_resource_download_cache_file(resource_id: int) -> Response:
- _ = require_admin()
- row = fetch_one("SELECT repo_owner, repo_name FROM resources WHERE id = ?", (resource_id,))
- if row is None:
- abort(404)
- owner, repo = row["repo_owner"], row["repo_name"]
- commit = (request.args.get("commit") or "").strip()
- if not commit or not _looks_like_commit(commit):
- return jsonify({"error": "commit_required"}), 400
- cache_path = _download_cache_path(resource_id=resource_id, owner=owner, repo=repo, cache_key=commit.lower())
- if not cache_path.exists():
- abort(404)
- filename = f"{owner}-{repo}-{commit[:12]}.zip".replace("/", "-")
- f = open(cache_path, "rb")
- resp = send_file(
- f,
- mimetype="application/zip",
- as_attachment=True,
- download_name=filename,
- conditional=True,
- max_age=0,
- )
- resp.call_on_close(f.close)
- resp.direct_passthrough = False
- return resp
- @app.get("/admin/orders")
- def api_admin_orders() -> Response:
- _ = require_admin()
- q = (request.args.get("q") or "").strip()
- status = (request.args.get("status") or "").strip().upper()
- page = max(1, parse_int(request.args.get("page"), 1))
- page_size = min(100, max(1, parse_int(request.args.get("pageSize"), 20)))
- where = []
- params: list[Any] = []
- if q:
- where.append("(o.id LIKE ? OR u.phone LIKE ?)")
- like = f"%{q}%"
- params.extend([like, like])
- if status in {"PENDING", "PAID", "CLOSED", "FAILED"}:
- where.append("o.status = ?")
- params.append(status)
- where_sql = f"WHERE {' AND '.join(where)}" if where else ""
- total_row = fetch_one(
- f"""
- SELECT COUNT(1) AS cnt
- FROM orders o
- JOIN users u ON u.id = o.user_id
- {where_sql}
- """,
- tuple(params),
- )
- total = int(total_row["cnt"]) if total_row is not None else 0
- offset = (page - 1) * page_size
- rows = fetch_all(
- f"""
- SELECT o.*, u.phone as user_phone
- FROM orders o
- JOIN users u ON u.id = o.user_id
- {where_sql}
- ORDER BY o.created_at DESC
- LIMIT ? OFFSET ?
- """,
- tuple(params + [page_size, offset]),
- )
- items = []
- for row in rows:
- items.append(
- {
- "id": row["id"],
- "status": row["status"],
- "amountCents": row["amount_cents"],
- "userId": row["user_id"],
- "userPhone": row["user_phone"],
- "createdAt": row["created_at"],
- "paidAt": row["paid_at"],
- "planSnapshot": json.loads(row["plan_snapshot_json"]),
- }
- )
- return jsonify({"items": items, "total": total, "page": page, "pageSize": page_size})
- @app.get("/admin/orders/<order_id>")
- def api_admin_get_order(order_id: str) -> Response:
- _ = require_admin()
- row = fetch_one(
- """
- SELECT o.*, u.phone as user_phone
- FROM orders o
- JOIN users u ON u.id = o.user_id
- WHERE o.id = ?
- """,
- (order_id,),
- )
- if row is None:
- abort(404)
- return jsonify(
- {
- "id": row["id"],
- "status": row["status"],
- "amountCents": row["amount_cents"],
- "userId": row["user_id"],
- "userPhone": row["user_phone"],
- "planId": row["plan_id"],
- "payChannel": row["pay_channel"],
- "payTradeNo": row["pay_trade_no"],
- "createdAt": row["created_at"],
- "paidAt": row["paid_at"],
- "planSnapshot": json.loads(row["plan_snapshot_json"]),
- }
- )
- @app.post("/admin/orders")
- def api_admin_create_order() -> Response:
- admin = require_admin()
- payload = request.get_json(silent=True) or {}
- user_id = parse_int(payload.get("userId"), 0)
- user_phone = (payload.get("userPhone") or "").strip()
- plan_id = parse_int(payload.get("planId"), 0)
- status = (payload.get("status") or "PENDING").strip().upper()
- if status not in {"PENDING", "PAID", "CLOSED", "FAILED"}:
- return jsonify({"error": "invalid_status"}), 400
- if user_id <= 0 and not user_phone:
- return jsonify({"error": "user_required"}), 400
- if plan_id <= 0:
- return jsonify({"error": "plan_required"}), 400
- user_row = None
- if user_id > 0:
- user_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,))
- else:
- user_row = fetch_one("SELECT * FROM users WHERE phone = ?", (user_phone,))
- if user_row is None:
- return jsonify({"error": "user_not_found"}), 404
- plan = fetch_one("SELECT * FROM plans WHERE id = ? AND enabled = 1", (plan_id,))
- if plan is None:
- return jsonify({"error": "plan_not_found"}), 404
- order_id = uuid.uuid4().hex
- snapshot = {
- "name": plan["name"],
- "durationDays": plan["duration_days"],
- "priceCents": plan["price_cents"],
- }
- created_at = isoformat(utcnow())
- paid_at = isoformat(utcnow()) if status == "PAID" else None
- execute(
- """
- INSERT INTO orders (id, user_id, plan_id, amount_cents, status, created_at, paid_at, plan_snapshot_json)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?)
- """,
- (
- order_id,
- user_row["id"],
- plan["id"],
- plan["price_cents"],
- status,
- created_at,
- paid_at,
- json.dumps(snapshot, ensure_ascii=False),
- ),
- )
- if status == "PAID":
- extend_vip(int(user_row["id"]), int(snapshot["durationDays"]))
- after_row = fetch_one("SELECT * FROM orders WHERE id = ?", (order_id,))
- audit(
- "ADMIN",
- admin["id"],
- "ORDER_CREATE",
- "Order",
- order_id,
- None,
- dict(after_row) if after_row is not None else None,
- )
- return jsonify({"ok": True, "id": order_id})
- @app.put("/admin/orders/<order_id>")
- def api_admin_update_order(order_id: str) -> Response:
- admin = require_admin()
- before_row = fetch_one("SELECT * FROM orders WHERE id = ?", (order_id,))
- if before_row is None:
- abort(404)
- payload = request.get_json(silent=True) or {}
- status = (payload.get("status") or "").strip().upper()
- if status not in {"PENDING", "PAID", "CLOSED", "FAILED"}:
- return jsonify({"error": "invalid_status"}), 400
- before_status = before_row["status"]
- if before_status == "PAID" and status != "PAID":
- return jsonify({"error": "cannot_change_paid_order"}), 409
- if before_status == "CLOSED" and status != "CLOSED":
- return jsonify({"error": "cannot_change_closed_order"}), 409
- allowed = False
- if before_status == status:
- allowed = True
- elif before_status == "PENDING" and status in {"PAID", "CLOSED", "FAILED"}:
- allowed = True
- elif before_status == "FAILED" and status in {"PAID", "CLOSED"}:
- allowed = True
- if not allowed:
- return jsonify({"error": "invalid_transition"}), 409
- paid_at = before_row["paid_at"]
- if status == "PAID":
- if before_status != "PAID":
- paid_at = isoformat(utcnow())
- snapshot = json.loads(before_row["plan_snapshot_json"])
- extend_vip(int(before_row["user_id"]), int(snapshot.get("durationDays") or 0))
- else:
- paid_at = None
- execute("UPDATE orders SET status = ?, paid_at = ? WHERE id = ?", (status, paid_at, order_id))
- after_row = fetch_one("SELECT * FROM orders WHERE id = ?", (order_id,))
- audit(
- "ADMIN",
- admin["id"],
- "ORDER_UPDATE",
- "Order",
- order_id,
- dict(before_row),
- dict(after_row) if after_row is not None else None,
- )
- return jsonify({"ok": True})
- @app.delete("/admin/orders/<order_id>")
- def api_admin_delete_order(order_id: str) -> Response:
- admin = require_admin()
- before_row = fetch_one("SELECT * FROM orders WHERE id = ?", (order_id,))
- if before_row is None:
- abort(404)
- if before_row["status"] == "PAID":
- return jsonify({"error": "cannot_delete_paid_order"}), 409
- execute("DELETE FROM orders WHERE id = ?", (order_id,))
- audit("ADMIN", admin["id"], "ORDER_DELETE", "Order", order_id, dict(before_row), None)
- return jsonify({"ok": True})
- @app.get("/admin/users")
- def api_admin_users() -> Response:
- _ = require_admin()
- q = (request.args.get("q") or "").strip()
- status = (request.args.get("status") or "").strip().upper()
- vip = (request.args.get("vip") or "").strip().upper()
- page = max(1, parse_int(request.args.get("page"), 1))
- page_size = min(100, max(1, parse_int(request.args.get("pageSize"), 20)))
- now_dt = utcnow()
- now_iso = isoformat(now_dt)
- where = []
- params: list[Any] = []
- if q:
- where.append("(phone LIKE ?)")
- params.append(f"%{q}%")
- if status in {"ACTIVE", "DISABLED"}:
- where.append("status = ?")
- params.append(status)
- if vip in {"VIP", "ACTIVE"}:
- where.append("(vip_expire_at IS NOT NULL AND vip_expire_at > ?)")
- params.append(now_iso)
- elif vip in {"NONVIP", "NOVIP", "INACTIVE"}:
- where.append("(vip_expire_at IS NULL OR vip_expire_at <= ?)")
- params.append(now_iso)
- where_sql = f"WHERE {' AND '.join(where)}" if where else ""
- total_row = fetch_one(f"SELECT COUNT(1) AS cnt FROM users {where_sql}", tuple(params))
- total = int(total_row["cnt"]) if total_row is not None else 0
- offset = (page - 1) * page_size
- rows = fetch_all(
- f"""
- SELECT *
- FROM users
- {where_sql}
- ORDER BY created_at DESC, id DESC
- LIMIT ? OFFSET ?
- """,
- tuple(params + [page_size, offset]),
- )
- items = []
- for row in rows:
- expire_dt = parse_datetime(row["vip_expire_at"]) if row["vip_expire_at"] else None
- vip_active = bool(expire_dt is not None and expire_dt > now_dt)
- vip_remaining_days = None
- if vip_active and expire_dt is not None:
- seconds = (expire_dt - now_dt).total_seconds()
- vip_remaining_days = max(0, int((seconds + 86399) // 86400))
- items.append(
- {
- "id": row["id"],
- "phone": row["phone"],
- "status": row["status"],
- "vipActive": vip_active,
- "vipRemainingDays": vip_remaining_days,
- "vipExpireAt": row["vip_expire_at"],
- "createdAt": row["created_at"],
- }
- )
- return jsonify({"items": items, "total": total, "page": page, "pageSize": page_size})
- @app.get("/admin/download-logs")
- def api_admin_download_logs() -> Response:
- _ = require_admin()
- q = (request.args.get("q") or "").strip()
- typ = (request.args.get("type") or "").strip().upper()
- state = (request.args.get("state") or "").strip().upper()
- page = max(1, parse_int(request.args.get("page"), 1))
- page_size = min(100, max(1, parse_int(request.args.get("pageSize"), 20)))
- where = []
- params: list[Any] = []
- if q:
- like = f"%{q}%"
- where.append("(u.phone LIKE ? OR dl.resource_title_snapshot LIKE ? OR dl.ref_snapshot LIKE ? OR dl.ip LIKE ?)")
- params.extend([like, like, like, like])
- if typ in {"FREE", "VIP"}:
- where.append("dl.resource_type_snapshot = ?")
- params.append(typ)
- if state == "DELETED":
- where.append("r.id IS NULL")
- elif state == "OFFLINE":
- where.append("(r.id IS NOT NULL AND r.status != 'ONLINE')")
- elif state == "ONLINE":
- where.append("r.status = 'ONLINE'")
- where_sql = f"WHERE {' AND '.join(where)}" if where else ""
- total_row = fetch_one(
- f"""
- SELECT COUNT(1) AS cnt
- FROM download_logs dl
- JOIN users u ON u.id = dl.user_id
- LEFT JOIN resources r ON r.id = dl.resource_id
- {where_sql}
- """,
- tuple(params),
- )
- total = int(total_row["cnt"]) if total_row is not None else 0
- offset = (page - 1) * page_size
- rows = fetch_all(
- f"""
- SELECT
- dl.id,
- dl.user_id,
- u.phone AS user_phone,
- dl.resource_id,
- dl.resource_title_snapshot,
- dl.resource_type_snapshot,
- dl.ref_snapshot,
- dl.downloaded_at,
- dl.ip,
- dl.user_agent,
- r.id AS r_id,
- r.status AS r_status,
- r.type AS r_type
- FROM download_logs dl
- JOIN users u ON u.id = dl.user_id
- LEFT JOIN resources r ON r.id = dl.resource_id
- {where_sql}
- ORDER BY dl.downloaded_at DESC, dl.id DESC
- LIMIT ? OFFSET ?
- """,
- tuple(params + [page_size, offset]),
- )
- items = []
- for row in rows:
- if row["r_id"] is None:
- resource_state = "DELETED"
- elif row["r_status"] != "ONLINE":
- resource_state = "OFFLINE"
- else:
- resource_state = "ONLINE"
- items.append(
- {
- "id": row["id"],
- "userId": row["user_id"],
- "userPhone": row["user_phone"],
- "resourceId": row["resource_id"],
- "resourceTitle": row["resource_title_snapshot"],
- "resourceType": row["resource_type_snapshot"],
- "currentResourceType": row["r_type"] if row["r_id"] is not None else None,
- "ref": row["ref_snapshot"],
- "downloadedAt": row["downloaded_at"],
- "resourceState": resource_state,
- "ip": row["ip"],
- "userAgent": row["user_agent"],
- }
- )
- return jsonify({"items": items, "total": total, "page": page, "pageSize": page_size})
- @app.put("/admin/users/<int:user_id>")
- def api_admin_update_user(user_id: int) -> Response:
- admin = require_admin()
- before_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,))
- if before_row is None:
- abort(404)
- payload = request.get_json(silent=True) or {}
- status = (payload.get("status") or before_row["status"]).strip().upper()
- if status not in {"ACTIVE", "DISABLED"}:
- return jsonify({"error": "invalid_status"}), 400
- execute("UPDATE users SET status = ? WHERE id = ?", (status, user_id))
- after_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,))
- audit(
- "ADMIN",
- admin["id"],
- "USER_UPDATE",
- "User",
- str(user_id),
- dict(before_row),
- dict(after_row) if after_row is not None else None,
- )
- return jsonify({"ok": True})
- @app.post("/admin/users/<int:user_id>/password-reset")
- def api_admin_reset_user_password(user_id: int) -> Response:
- admin = require_admin()
- before_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,))
- if before_row is None:
- abort(404)
- payload = request.get_json(silent=True) or {}
- password = payload.get("password") or ""
- if not password:
- return jsonify({"error": "password_required"}), 400
- if len(password) < 6:
- return jsonify({"error": "password_too_short"}), 400
- execute("UPDATE users SET password_hash = ? WHERE id = ?", (generate_password_hash(password), user_id))
- after_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,))
- audit(
- "ADMIN",
- admin["id"],
- "USER_PASSWORD_RESET",
- "User",
- str(user_id),
- dict(before_row),
- dict(after_row) if after_row is not None else None,
- )
- return jsonify({"ok": True})
- @app.post("/admin/users/<int:user_id>/vip-adjust")
- def api_admin_vip_adjust(user_id: int) -> Response:
- admin = require_admin()
- payload = request.get_json(silent=True) or {}
- duration_days = parse_int(payload.get("addDays"), 0)
- if duration_days == 0:
- return jsonify({"error": "addDays_required"}), 400
- before_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,))
- if before_row is None:
- abort(404)
- extend_vip(user_id, duration_days)
- after_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,))
- before_expire = before_row["vip_expire_at"] or ""
- after_expire = after_row["vip_expire_at"] if after_row is not None else ""
- delta = f"{duration_days:+d} 天"
- create_user_message(
- user_id,
- "会员期限变更",
- "\n".join(
- [
- f"管理员已调整你的会员天数:{delta}",
- f"调整前到期:{before_expire or '无'}",
- f"调整后到期:{after_expire or '无'}",
- ]
- ),
- sender_type="ADMIN",
- sender_id=int(admin["id"]),
- )
- audit(
- "ADMIN",
- admin["id"],
- "VIP_ADJUST",
- "User",
- str(user_id),
- dict(before_row),
- dict(after_row) if after_row is not None else None,
- )
- return jsonify({"ok": True})
- @app.get("/admin/messages")
- def api_admin_messages() -> Response:
- admin = require_admin()
- _ = admin
- q = (request.args.get("q") or "").strip()
- sender_type = (request.args.get("senderType") or "").strip().upper()
- read_raw = (request.args.get("read") or "").strip().lower()
- user_id = parse_int(request.args.get("user_id"), 0)
- page = max(parse_int(request.args.get("page"), 1), 1)
- page_size = min(max(parse_int(request.args.get("pageSize"), 20), 1), 50)
- offset = (page - 1) * page_size
- where: list[str] = []
- params: list[Any] = []
- if user_id > 0:
- where.append("m.user_id = ?")
- params.append(user_id)
- if q:
- where.append("(u.phone LIKE ? OR m.title LIKE ? OR m.content LIKE ?)")
- like = f"%{q}%"
- params.extend([like, like, like])
- if sender_type in {"SYSTEM", "ADMIN"}:
- where.append("m.sender_type = ?")
- params.append(sender_type)
- if read_raw in {"1", "true", "yes", "on", "read"}:
- where.append("m.read_at IS NOT NULL")
- elif read_raw in {"0", "false", "no", "off", "unread"}:
- where.append("m.read_at IS NULL")
- where_sql = f"WHERE {' AND '.join(where)}" if where else ""
- total_row = fetch_one(
- f"""
- SELECT COUNT(1) AS cnt
- FROM user_messages m
- JOIN users u ON u.id = m.user_id
- {where_sql}
- """,
- tuple(params),
- )
- total = int(total_row["cnt"] if total_row is not None else 0)
- rows = fetch_all(
- f"""
- SELECT
- m.id, m.user_id, m.title, m.content, m.created_at, m.read_at, m.sender_type, m.sender_id,
- u.phone AS user_phone
- FROM user_messages m
- JOIN users u ON u.id = m.user_id
- {where_sql}
- ORDER BY m.created_at DESC, m.id DESC
- LIMIT ? OFFSET ?
- """,
- tuple(params + [page_size, offset]),
- )
- items = []
- for row in rows:
- items.append(
- {
- "id": row["id"],
- "userId": row["user_id"],
- "userPhone": row["user_phone"],
- "title": row["title"],
- "content": row["content"],
- "createdAt": row["created_at"],
- "readAt": row["read_at"],
- "read": bool(row["read_at"]),
- "senderType": row["sender_type"] or "SYSTEM",
- "senderId": row["sender_id"],
- }
- )
- return jsonify({"items": items, "total": total, "page": page, "pageSize": page_size})
- @app.post("/admin/messages/send")
- def api_admin_message_send() -> Response:
- admin = require_admin()
- payload = request.get_json(silent=True) or {}
- user_id = parse_int(payload.get("userId"), 0)
- phone = (payload.get("phone") or "").strip()
- title = (payload.get("title") or "").strip()
- content = (payload.get("content") or "").strip()
- if not title or not content:
- return jsonify({"error": "title_and_content_required"}), 400
- if user_id <= 0 and not phone:
- return jsonify({"error": "user_required"}), 400
- if user_id <= 0 and phone:
- row = fetch_one("SELECT id FROM users WHERE phone = ?", (phone,))
- if row is None:
- return jsonify({"error": "user_not_found"}), 404
- user_id = int(row["id"])
- msg_id = create_user_message(user_id, title, content, sender_type="ADMIN", sender_id=int(admin["id"]))
- audit(
- "ADMIN",
- admin["id"],
- "MESSAGE_SEND",
- "User",
- str(user_id),
- None,
- {"title": title[:120], "contentLen": len(content)},
- )
- return jsonify({"ok": True, "id": msg_id})
- @app.post("/admin/messages/broadcast")
- def api_admin_message_broadcast() -> Response:
- admin = require_admin()
- payload = request.get_json(silent=True) or {}
- audience = (payload.get("audience") or "ALL").strip().upper()
- title = (payload.get("title") or "").strip()
- content = (payload.get("content") or "").strip()
- if not title or not content:
- return jsonify({"error": "title_and_content_required"}), 400
- now_iso = isoformat(utcnow()) or ""
- where = ["status = 'ACTIVE'"]
- params: list[Any] = []
- if audience == "VIP":
- where.append("vip_expire_at IS NOT NULL AND vip_expire_at > ?")
- params.append(now_iso)
- elif audience == "NONVIP":
- where.append("(vip_expire_at IS NULL OR vip_expire_at <= ?)")
- params.append(now_iso)
- where_sql = f"WHERE {' AND '.join(where)}"
- rows = fetch_all(f"SELECT id FROM users {where_sql}", tuple(params))
- user_ids = [int(r["id"]) for r in rows]
- for uid in user_ids:
- create_user_message(uid, title, content, sender_type="ADMIN", sender_id=int(admin["id"]))
- audit(
- "ADMIN",
- admin["id"],
- "MESSAGE_BROADCAST",
- "Users",
- audience,
- None,
- {"title": title[:120], "contentLen": len(content), "count": len(user_ids)},
- )
- return jsonify({"ok": True, "count": len(user_ids)})
- @app.delete("/admin/messages/<int:message_id>")
- def api_admin_message_delete(message_id: int) -> Response:
- admin = require_admin()
- before = fetch_one("SELECT * FROM user_messages WHERE id = ?", (message_id,))
- if before is None:
- abort(404)
- execute("DELETE FROM user_messages WHERE id = ?", (message_id,))
- audit(
- "ADMIN",
- admin["id"],
- "MESSAGE_DELETE",
- "UserMessage",
- str(message_id),
- dict(before),
- None,
- )
- return jsonify({"ok": True})
|