import base64 import hashlib import json import os import re import shutil import threading import time import uuid from datetime import timedelta, timezone from decimal import Decimal from pathlib import Path from typing import Any, Iterable from urllib.parse import quote, quote_plus, urlencode, urlsplit, urlunsplit import requests from flask import Flask, Response, abort, has_request_context, jsonify, redirect, render_template, request, send_file, session, url_for from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.utils import secure_filename from .audit import audit from .auth import current_admin, current_user, extend_vip, is_vip_active, require_admin, require_user from .context import get_config from .core import isoformat, parse_datetime, parse_int, utcnow from .db import IntegrityError, db_status, execute, fetch_all, fetch_one, get_active_backend, switch_database from .gogs import ( GogsGitError, gogs_archive_get, gogs_branches, gogs_commits, gogs_contents, gogs_create_repo, gogs_delete_repo, gogs_git_archive_zip, gogs_git_archive_zip_commit, gogs_git_list_refs, gogs_git_delete_path, gogs_git_write_file, gogs_my_repos, gogs_repo_info, gogs_resolve_ref_commit, gogs_tags, gogs_user_repos, ) from .settings import delete_setting_value, get_setting_value, set_setting_value def register_routes(app: Flask) -> None: def create_user_message( user_id: int, title: str, content: str, *, sender_type: str = "SYSTEM", sender_id: int | None = None, ) -> int: cur = execute( """ INSERT INTO user_messages (user_id, title, content, created_at, sender_type, sender_id) VALUES (?, ?, ?, ?, ?, ?) """, ( user_id, (title or "").strip()[:120], (content or "").strip()[:4000], isoformat(utcnow()), (sender_type or "SYSTEM").strip().upper()[:16], sender_id, ), ) return int(cur.lastrowid) def _gogs_base_url_and_token() -> tuple[str, str]: config = get_config() base_url = (get_setting_value("GOGS_BASE_URL") or config.gogs_base_url or "").strip().rstrip("/") token = get_setting_value("GOGS_TOKEN") if token is not None: token = token.strip() or None if token is None: token = (config.gogs_token or "").strip() or None return base_url, (token or "").strip() def _gogs_error_message(resp: requests.Response) -> str | None: try: data = resp.json() if resp is not None else None except Exception: data = None if isinstance(data, dict): msg = data.get("message") or data.get("error") or data.get("error_description") if msg: return str(msg)[:300] try: text = (resp.text or "").strip() except Exception: text = "" return text[:300] or None def _safe_upstream_url(resp: requests.Response) -> str | None: try: url = (getattr(resp, "url", None) or "").strip() except Exception: url = "" if not url: return None url = re.sub(r"([?&])token=[^&]+", r"\1token=***", url) url = re.sub(r"//[^/@]*@", "//***@", url) return url[:500] def _looks_like_html(text: str | None) -> bool: s = (text or "").lstrip().lower() return s.startswith(" str: s = (key or "").strip() if not s: return "" if "BEGIN " in s and "END " in s: return s body = re.sub(r"\s+", "", s) if not body: return "" header = "-----BEGIN PRIVATE KEY-----" if kind == "private" else "-----BEGIN PUBLIC KEY-----" footer = "-----END PRIVATE KEY-----" if kind == "private" else "-----END PUBLIC KEY-----" lines = [body[i : i + 64] for i in range(0, len(body), 64)] return "\n".join([header, *lines, footer, ""]) def _alipay_sign_content(params: dict[str, Any]) -> str: items: list[tuple[str, str]] = [] for k, v in (params or {}).items(): if v is None: continue sv = str(v) if sv == "": continue items.append((str(k), sv)) items.sort(key=lambda x: x[0]) return "&".join([f"{k}={v}" for k, v in items]) def _alipay_rsa2_sign(sign_content: str, private_key: str) -> str: try: from Crypto.Hash import SHA256 from Crypto.PublicKey import RSA from Crypto.Signature import pkcs1_15 except Exception as e: raise RuntimeError("pycryptodome_required") from e key = RSA.import_key(_alipay_wrap_key(private_key, "private")) h = SHA256.new((sign_content or "").encode("utf-8")) sig = pkcs1_15.new(key).sign(h) return base64.b64encode(sig).decode("utf-8") def _alipay_rsa2_verify(sign_content: str, signature_b64: str, public_key: str) -> bool: try: from Crypto.Hash import SHA256 from Crypto.PublicKey import RSA from Crypto.Signature import pkcs1_15 except Exception as e: raise RuntimeError("pycryptodome_required") from e key = RSA.import_key(_alipay_wrap_key(public_key, "public")) h = SHA256.new((sign_content or "").encode("utf-8")) try: pkcs1_15.new(key).verify(h, base64.b64decode(signature_b64 or "")) return True except Exception: return False def _parse_keywords(value: Any) -> list[str]: if isinstance(value, list): parts = [str(x).strip() for x in value] else: raw = str(value or "") parts = re.split(r"[,\n\r\t ]+", raw) items: list[str] = [] seen: set[str] = set() for p in parts: p = (p or "").strip() if not p: continue if len(p) > 32: p = p[:32] k = p.lower() if k in seen: continue seen.add(k) items.append(p) if len(items) >= 20: break return items def _slugify_repo_name(title: str) -> str: s = (title or "").strip().lower() s = re.sub(r"[^a-z0-9]+", "-", s) s = s.strip("-") if not s: s = f"resource-{uuid.uuid4().hex[:8]}" if len(s) > 50: s = s[:50].rstrip("-") return s def _uploads_dir() -> Path: project_root = Path(__file__).resolve().parent.parent d = project_root / "static" / "uploads" d.mkdir(parents=True, exist_ok=True) return d def _extract_upload_names(value: Any) -> set[str]: s = str(value or "").strip() if not s: return set() names: set[str] = set() for m in re.finditer(r"(?i)(?:/static/uploads/|/uploads/|/)([0-9a-f]{32}(?:\.[a-z0-9]+)?)", s): names.add(m.group(1)) if s.startswith("/static/uploads/") or s.startswith("static/uploads/") or s.startswith("uploads/"): name = os.path.basename(s) if re.fullmatch(r"(?i)[0-9a-f]{32}(?:\.[a-z0-9]+)?", name or ""): names.add(name) return names def _delete_upload_files(names: set[str]) -> None: if not names: return base = _uploads_dir().resolve() for name in names: n = os.path.basename(str(name or "")) if not re.fullmatch(r"(?i)[0-9a-f]{32}(?:\.[a-z0-9]+)?", n or ""): continue p = (base / n).resolve() if p.parent != base: continue try: p.unlink(missing_ok=True) except Exception: pass def _normalize_upload_prefix(prefix: Any) -> str: p = str(prefix or "").strip().replace("\\", "/") p = p.lstrip("/") if not p: p = "uploads/" if not p.endswith("/"): p = f"{p}/" return p def _get_upload_storage_mode() -> str: v = (get_setting_value("STORAGE_PROVIDER") or "").strip().upper() if v in {"LOCAL", "OSS", "AUTO"}: return v return "AUTO" def _get_oss_upload_config() -> dict[str, Any]: endpoint = str(get_setting_value("OSS_ENDPOINT") or "").strip().rstrip("/") bucket = str(get_setting_value("OSS_BUCKET") or "").strip() access_key_id = str(get_setting_value("OSS_ACCESS_KEY_ID") or "").strip() access_key_secret = str(get_setting_value("OSS_ACCESS_KEY_SECRET") or "").strip() upload_prefix = _normalize_upload_prefix(get_setting_value("OSS_UPLOAD_PREFIX") or "uploads/") public_base_url = str(get_setting_value("OSS_PUBLIC_BASE_URL") or "").strip().rstrip("/") ok = bool(endpoint and bucket and access_key_id and access_key_secret) return { "ok": ok, "endpoint": endpoint, "bucket": bucket, "accessKeyId": access_key_id, "accessKeySecret": access_key_secret, "uploadPrefix": upload_prefix, "publicBaseUrl": public_base_url, } def _build_oss_public_url(*, public_base_url: str, endpoint: str, bucket: str, key: str) -> str: key = str(key or "").lstrip("/") if public_base_url: return f"{public_base_url.rstrip('/')}/{key}" try: parts = urlsplit(endpoint) scheme = parts.scheme or "https" host = parts.netloc if not host: return f"/{key}" host = host.split("@", 1)[-1] if host.startswith(f"{bucket}."): full_host = host else: full_host = f"{bucket}.{host}" return urlunsplit((scheme, full_host, f"/{key}", "", "")) except Exception: return f"/{key}" def _guess_upload_kind(ext: str) -> str: e = (ext or "").lower() if e in {".png", ".jpg", ".jpeg", ".gif", ".webp"}: return "image" if e in {".mp4", ".webm", ".mov", ".m4v"}: return "video" return "file" class _LocalUploadStorage: def save_upload(self, file_storage: Any, name: str) -> dict[str, Any]: out = _uploads_dir() / name file_storage.save(out) return {"name": name, "url": f"/static/uploads/{name}"} def delete_uploads(self, names: set[str]) -> None: _delete_upload_files(names) def list_items(self) -> list[dict[str, Any]]: base = _uploads_dir().resolve() all_items: list[dict[str, Any]] = [] for p in base.iterdir(): if not p.is_file(): continue name = p.name if not re.fullmatch(r"(?i)[0-9a-f]{32}(?:\.[a-z0-9]+)?", name or ""): continue try: st = p.stat() except Exception: continue ext = p.suffix.lower() all_items.append( { "name": name, "url": f"/static/uploads/{name}", "bytes": int(getattr(st, "st_size", 0) or 0), "mtime": int(getattr(st, "st_mtime", 0) or 0), "ext": ext, "kind": _guess_upload_kind(ext), } ) return all_items class _OssUploadStorage: def __init__(self, cfg: dict[str, Any]): try: import oss2 # type: ignore except Exception: raise RuntimeError("oss_client_missing") endpoint = str(cfg.get("endpoint") or "").strip().rstrip("/") bucket = str(cfg.get("bucket") or "").strip() access_key_id = str(cfg.get("accessKeyId") or "").strip() access_key_secret = str(cfg.get("accessKeySecret") or "").strip() self._upload_prefix = str(cfg.get("uploadPrefix") or "uploads/") self._public_base_url = str(cfg.get("publicBaseUrl") or "").strip().rstrip("/") if not endpoint or not bucket or not access_key_id or not access_key_secret: raise RuntimeError("oss_not_configured") auth = oss2.Auth(access_key_id, access_key_secret) self._bucket_name = bucket self._endpoint = endpoint self._bucket = oss2.Bucket(auth, endpoint, bucket) self._oss2 = oss2 def _key_for_name(self, name: str) -> str: n = os.path.basename(str(name or "")) return f"{self._upload_prefix}{n}" def save_upload(self, file_storage: Any, name: str) -> dict[str, Any]: key = self._key_for_name(name) try: file_storage.stream.seek(0) except Exception: pass self._bucket.put_object(key, file_storage.stream) url = _build_oss_public_url(public_base_url=self._public_base_url, endpoint=self._endpoint, bucket=self._bucket_name, key=key) return {"name": os.path.basename(name), "url": url} def delete_uploads(self, names: set[str]) -> None: for name in names: n = os.path.basename(str(name or "")) if not re.fullmatch(r"(?i)[0-9a-f]{32}(?:\.[a-z0-9]+)?", n or ""): continue key = self._key_for_name(n) try: self._bucket.delete_object(key) except Exception: pass def list_items(self) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] for obj in self._oss2.ObjectIterator(self._bucket, prefix=self._upload_prefix): key = str(getattr(obj, "key", "") or "") if not key.startswith(self._upload_prefix): continue name = key[len(self._upload_prefix) :] if not name or "/" in name: continue if not re.fullmatch(r"(?i)[0-9a-f]{32}(?:\.[a-z0-9]+)?", name or ""): continue ext = os.path.splitext(name)[1].lower() url = _build_oss_public_url(public_base_url=self._public_base_url, endpoint=self._endpoint, bucket=self._bucket_name, key=key) items.append( { "name": name, "url": url, "bytes": int(getattr(obj, "size", 0) or 0), "mtime": int(getattr(obj, "last_modified", 0) or 0), "ext": ext, "kind": _guess_upload_kind(ext), } ) return items def _get_upload_storage() -> Any: mode = _get_upload_storage_mode() oss_cfg = _get_oss_upload_config() if mode == "LOCAL": return _LocalUploadStorage() if mode == "OSS": return _OssUploadStorage(oss_cfg) if oss_cfg.get("ok"): return _OssUploadStorage(oss_cfg) return _LocalUploadStorage() def _guest_can_preview_repo_path(path: str) -> bool: p = (path or "").strip().replace("\\", "/").lstrip("/") if not p: return False base = os.path.basename(p).lower() if base in { ".env", ".env.local", ".env.development", ".env.production", ".env.test", "id_rsa", "id_dsa", "id_ed25519", "id_ecdsa", }: return False ext = os.path.splitext(base)[1].lower() if ext in {".key", ".pem", ".p12", ".pfx"}: return False if base.startswith(("readme", "license", "changelog")): return True return ext in {".md", ".txt", ".json", ".yml", ".yaml", ".toml", ".ini", ".conf"} @app.get("/") def page_index() -> str: return render_template("index.html") @app.get("/ui/resources") def page_resources() -> str: return render_template("resources.html") @app.get("/ui/resources/") def page_resource_detail(resource_id: int) -> str: return render_template("resource_detail.html", resource_id=resource_id) @app.get("/ui/login") def page_login() -> str: return render_template("login.html") @app.get("/ui/register") def page_register() -> str: return render_template("register.html") @app.get("/ui/me") def page_me() -> str: return render_template("me.html") @app.get("/ui/messages") def page_messages() -> str: return render_template("messages.html") @app.get("/ui/vip") def page_vip() -> str: return render_template("vip.html") @app.get("/ui/admin") def page_admin() -> Response: if current_admin() is None: return redirect(url_for("page_admin_login")) return render_template("admin.html") @app.get("/ui/admin/login") def page_admin_login() -> str: return render_template("admin_login.html") @app.get("/admin") def page_admin_shortcut() -> Response: return redirect(url_for("page_admin")) @app.get("/admin/login") def page_admin_login_shortcut() -> Response: return redirect(url_for("page_admin_login")) @app.post("/auth/register") def api_register() -> Response: payload = request.get_json(silent=True) or {} phone = (payload.get("phone") or "").strip() password = payload.get("password") or "" if not phone or not password: return jsonify({"error": "phone_and_password_required"}), 400 if len(password) < 6: return jsonify({"error": "password_too_short"}), 400 created_at = isoformat(utcnow()) try: cur = execute( "INSERT INTO users (phone, password_hash, status, created_at) VALUES (?, ?, 'ACTIVE', ?)", (phone, generate_password_hash(password), created_at), ) except IntegrityError: return jsonify({"error": "phone_exists"}), 409 session["user_id"] = cur.lastrowid return jsonify({"id": cur.lastrowid, "phone": phone, "vipExpireAt": None}) @app.post("/auth/login") def api_login() -> Response: payload = request.get_json(silent=True) or {} phone = (payload.get("phone") or "").strip() password = payload.get("password") or "" user = fetch_one("SELECT * FROM users WHERE phone = ?", (phone,)) if user is None or not check_password_hash(user["password_hash"], password): return jsonify({"error": "invalid_credentials"}), 401 if user["status"] != "ACTIVE": return jsonify({"error": "user_disabled"}), 403 session["user_id"] = user["id"] return jsonify({"id": user["id"], "phone": user["phone"], "vipExpireAt": user["vip_expire_at"]}) @app.post("/auth/logout") def api_logout() -> Response: session.pop("user_id", None) return jsonify({"ok": True}) @app.get("/me") def api_me() -> Response: user = current_user() if user is None: return jsonify({"user": None}) return jsonify( { "user": { "id": user["id"], "phone": user["phone"], "vipExpireAt": user["vip_expire_at"], "vipActive": is_vip_active(user), } } ) @app.get("/plans") def api_plans() -> Response: rows = fetch_all("SELECT * FROM plans WHERE enabled = 1 ORDER BY sort DESC, id DESC") return jsonify( [ { "id": row["id"], "name": row["name"], "durationDays": row["duration_days"], "priceCents": row["price_cents"], } for row in rows ] ) @app.get("/resources") def api_resources() -> Response: q = (request.args.get("q") or "").strip() resource_type = (request.args.get("type") or "").strip().upper() sort = (request.args.get("sort") or "latest").strip() page = max(parse_int(request.args.get("page"), 1), 1) page_size = min(max(parse_int(request.args.get("pageSize"), 12), 1), 50) where = ["status = 'ONLINE'"] params: list[Any] = [] if q: where.append("(title LIKE ? OR summary LIKE ?)") params.extend([f"%{q}%", f"%{q}%"]) if resource_type in {"FREE", "VIP"}: where.append("type = ?") params.append(resource_type) if sort == "hot": order_by = "view_count DESC, id DESC" else: order_by = "updated_at DESC, id DESC" where_sql = " AND ".join(where) total_row = fetch_one(f"SELECT COUNT(1) AS cnt FROM resources WHERE {where_sql}", params) total = int(total_row["cnt"] if total_row is not None else 0) offset = (page - 1) * page_size rows = fetch_all( f""" SELECT * FROM resources WHERE {where_sql} ORDER BY {order_by} LIMIT ? OFFSET ? """, params + [page_size, offset], ) items = [] for row in rows: try: tags = json.loads(row["tags_json"] or "[]") except Exception: tags = [] items.append( { "id": row["id"], "title": row["title"], "summary": row["summary"], "type": row["type"], "coverUrl": (str(row["cover_url"]).strip() if row["cover_url"] is not None else "") or "/static/images/resources/default.png", "tags": tags, "updatedAt": row["updated_at"], "viewCount": row["view_count"], "downloadCount": row["download_count"], "repo": { "owner": row["repo_owner"], "name": row["repo_name"], "defaultRef": row["default_ref"], }, } ) return jsonify({"items": items, "page": page, "pageSize": page_size, "total": total}) @app.get("/resources/") def api_resource_detail(resource_id: int) -> Response: row = fetch_one("SELECT * FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,)) if row is None: abort(404) execute("UPDATE resources SET view_count = view_count + 1 WHERE id = ?", (resource_id,)) try: tags = json.loads(row["tags_json"] or "[]") except Exception: tags = [] return jsonify( { "id": row["id"], "title": row["title"], "summary": row["summary"], "type": row["type"], "coverUrl": (str(row["cover_url"]).strip() if row["cover_url"] is not None else "") or "/static/images/resources/default.png", "tags": tags, "updatedAt": row["updated_at"], "viewCount": row["view_count"] + 1, "downloadCount": row["download_count"], "repo": { "owner": row["repo_owner"], "name": row["repo_name"], "htmlUrl": row["repo_html_url"], "defaultRef": row["default_ref"], "private": bool(row["repo_private"]), }, } ) @app.get("/resources//repo/refs") def api_repo_refs(resource_id: int) -> Response: row = fetch_one("SELECT repo_owner, repo_name FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,)) if row is None: abort(404) owner, repo = row["repo_owner"], row["repo_name"] branches_resp = gogs_branches(owner, repo) tags_resp = gogs_tags(owner, repo) if branches_resp.status_code < 400 and tags_resp.status_code < 400: try: branches = branches_resp.json() tags = tags_resp.json() except Exception: msg = _gogs_error_message(branches_resp) or _gogs_error_message(tags_resp) upstream_url = _safe_upstream_url(branches_resp) or _safe_upstream_url(tags_resp) return jsonify({"error": "gogs_invalid_response", "status": 200, "message": msg, "url": upstream_url}), 502 return jsonify({"branches": [{"name": b.get("name")} for b in (branches or [])], "tags": [{"name": t.get("name")} for t in (tags or [])]}) if branches_resp.status_code in {401, 403} or tags_resp.status_code in {401, 403}: msg = _gogs_error_message(branches_resp) or _gogs_error_message(tags_resp) upstream_url = _safe_upstream_url(branches_resp) or _safe_upstream_url(tags_resp) status = branches_resp.status_code if branches_resp.status_code >= 400 else tags_resp.status_code return jsonify({"error": "gogs_unauthorized", "status": status, "message": msg, "url": upstream_url}), 400 if branches_resp.status_code == 599 or tags_resp.status_code == 599: msg = _gogs_error_message(branches_resp) or _gogs_error_message(tags_resp) upstream_url = _safe_upstream_url(branches_resp) or _safe_upstream_url(tags_resp) return jsonify({"error": "gogs_unreachable", "status": 599, "message": msg, "url": upstream_url}), 502 try: return jsonify(gogs_git_list_refs(owner, repo)) except GogsGitError as e: resp, status = _git_error_to_response(e) return resp, status msg = _gogs_error_message(branches_resp) or _gogs_error_message(tags_resp) upstream_url = _safe_upstream_url(branches_resp) or _safe_upstream_url(tags_resp) status = branches_resp.status_code if branches_resp.status_code >= 400 else tags_resp.status_code return jsonify({"error": "gogs_failed", "status": status, "message": msg, "url": upstream_url}), 502 @app.get("/resources//repo/tree") def api_repo_tree(resource_id: int) -> Response: ref = (request.args.get("ref") or "").strip() path = (request.args.get("path") or "").strip() user = current_user() row = fetch_one( "SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,), ) if row is None: abort(404) owner, repo, default_ref = row["repo_owner"], row["repo_name"], row["default_ref"] if not ref: ref = default_ref resp = gogs_contents(owner, repo, path, ref) if resp.status_code == 404: return jsonify({"error": "path_not_found"}), 404 if resp.status_code >= 400: return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502 try: data = resp.json() except Exception: msg = _gogs_error_message(resp) upstream_url = _safe_upstream_url(resp) return jsonify({"error": "gogs_invalid_response", "status": resp.status_code, "message": msg, "url": upstream_url}), 502 if not isinstance(data, list): return jsonify({"error": "not_a_directory"}), 400 items = [] for item in data: item_path = item.get("path") or "" item_type = item.get("type") items.append( { "name": item.get("name"), "path": item_path, "type": item_type, "size": item.get("size"), "guestAllowed": True if user is not None else (True if item_type == "dir" else _guest_can_preview_repo_path(item_path)), } ) items.sort(key=lambda x: (0 if x["type"] == "dir" else 1, x["name"] or "")) return jsonify({"ref": ref, "path": path, "items": items}) @app.get("/resources//repo/file") def api_repo_file(resource_id: int) -> Response: config = get_config() ref = (request.args.get("ref") or "").strip() raw_path = (request.args.get("path") or "").strip() if not raw_path: return jsonify({"error": "path_required"}), 400 path = _normalize_repo_path(raw_path) or "" if not path: return jsonify({"error": "path_invalid"}), 400 user = current_user() if user is None and not _guest_can_preview_repo_path(path): return jsonify({"error": "login_required", "path": path}), 401 row = fetch_one( "SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,), ) if row is None: abort(404) owner, repo, default_ref = row["repo_owner"], row["repo_name"], row["default_ref"] if not ref: ref = default_ref resp = gogs_contents(owner, repo, path, ref) if resp.status_code == 404: return jsonify({"error": "file_not_found"}), 404 if resp.status_code >= 400: return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502 try: data = resp.json() except Exception: msg = _gogs_error_message(resp) upstream_url = _safe_upstream_url(resp) return jsonify({"error": "gogs_invalid_response", "status": resp.status_code, "message": msg, "url": upstream_url}), 502 if isinstance(data, list) or data.get("type") != "file": return jsonify({"error": "not_a_file"}), 400 size = parse_int(data.get("size"), 0) if size > config.max_preview_bytes: return jsonify({"error": "file_too_large", "maxBytes": config.max_preview_bytes, "size": size}), 413 encoding = data.get("encoding") content = data.get("content") or "" if encoding != "base64": return jsonify({"error": "unsupported_encoding", "encoding": encoding}), 400 try: raw = base64.b64decode(content, validate=False) except Exception: return jsonify({"error": "decode_failed"}), 400 try: text = raw.decode("utf-8") is_text = True except UnicodeDecodeError: text = "" is_text = False if not is_text: return jsonify({"error": "binary_file_not_previewable"}), 415 return jsonify({"ref": ref, "path": path, "content": text}) def _normalize_repo_path(raw: str) -> str | None: s = (raw or "").strip().replace("\\", "/").lstrip("/") if not s: return None parts = [p for p in s.split("/") if p] if any(p == ".." for p in parts): return None if ":" in parts[0]: return None return "/".join(parts) def _git_error_to_response(e: GogsGitError) -> tuple[Response, int]: if e.code in {"path_required", "ref_required", "gogs_token_required", "invalid_gogs_base_url"}: return jsonify({"error": e.code, "message": e.message}), 400 if e.code == "file_exists": return jsonify({"error": e.code, "message": e.message}), 409 if e.code == "empty_repo": return jsonify({"error": e.code, "message": e.message}), 409 if e.code in {"file_not_found", "path_not_found", "branch_not_found"}: return jsonify({"error": e.code, "message": e.message}), 404 if e.code == "git_not_found": return jsonify({"error": e.code, "message": e.message}), 501 return jsonify({"error": e.code, "message": e.message}), 502 @app.post("/resources//repo/file") def api_repo_file_create(resource_id: int) -> Response: _ = require_admin() payload = request.get_json(silent=True) or {} ref = (payload.get("ref") or "").strip() path = _normalize_repo_path(payload.get("path") or "") content = payload.get("content") or "" message = payload.get("message") or "" if not path: return jsonify({"error": "path_required"}), 400 row = fetch_one("SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ?", (resource_id,)) if row is None: abort(404) if not ref: ref = (row["default_ref"] or "").strip() try: result = gogs_git_write_file(row["repo_owner"], row["repo_name"], ref, path, str(content), str(message), must_create=True) except GogsGitError as e: resp, status = _git_error_to_response(e) return resp, status return jsonify({"ok": True, **result}) @app.put("/resources//repo/file") def api_repo_file_update(resource_id: int) -> Response: _ = require_admin() payload = request.get_json(silent=True) or {} ref = (payload.get("ref") or "").strip() path = _normalize_repo_path(payload.get("path") or "") content = payload.get("content") or "" message = payload.get("message") or "" if not path: return jsonify({"error": "path_required"}), 400 row = fetch_one("SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ?", (resource_id,)) if row is None: abort(404) if not ref: ref = (row["default_ref"] or "").strip() try: result = gogs_git_write_file(row["repo_owner"], row["repo_name"], ref, path, str(content), str(message), must_create=False) except GogsGitError as e: resp, status = _git_error_to_response(e) return resp, status return jsonify({"ok": True, **result}) @app.delete("/resources//repo/file") def api_repo_file_delete(resource_id: int) -> Response: _ = require_admin() payload = request.get_json(silent=True) or {} ref = (payload.get("ref") or "").strip() path = _normalize_repo_path(payload.get("path") or "") message = payload.get("message") or "" if not path: return jsonify({"error": "path_required"}), 400 row = fetch_one("SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ?", (resource_id,)) if row is None: abort(404) if not ref: ref = (row["default_ref"] or "").strip() try: result = gogs_git_delete_path(row["repo_owner"], row["repo_name"], ref, path, str(message)) except GogsGitError as e: resp, status = _git_error_to_response(e) return resp, status return jsonify({"ok": True, **result}) @app.get("/resources//repo/commits") def api_repo_commits(resource_id: int) -> Response: ref = (request.args.get("ref") or "").strip() raw_path = (request.args.get("path") or "").strip() limit = parse_int(request.args.get("limit"), 20) if limit < 1: limit = 1 if limit > 50: limit = 50 path = "" if raw_path: path = _normalize_repo_path(raw_path) or "" if not path: return jsonify({"error": "path_invalid"}), 400 row = fetch_one( "SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,), ) if row is None: abort(404) if not ref: ref = (row["default_ref"] or "").strip() resp = gogs_commits(row["repo_owner"], row["repo_name"], ref=ref, path=path, limit=limit) if resp.status_code >= 400: msg = _gogs_error_message(resp) upstream_url = _safe_upstream_url(resp) return jsonify({"error": "gogs_failed", "status": resp.status_code, "message": msg, "url": upstream_url}), 502 try: data = resp.json() except Exception: msg = _gogs_error_message(resp) upstream_url = _safe_upstream_url(resp) return jsonify({"error": "gogs_invalid_response", "status": resp.status_code, "message": msg, "url": upstream_url}), 502 if not isinstance(data, list): return jsonify({"error": "gogs_invalid_response", "status": resp.status_code}), 502 items = [] for c in data: sha = (c.get("sha") or "").strip() commit = c.get("commit") or {} author = (commit.get("author") or {}) if isinstance(commit, dict) else {} items.append( { "sha": sha, "authorName": (author.get("name") or "") if isinstance(author, dict) else "", "authorDate": (author.get("date") or "") if isinstance(author, dict) else "", "subject": (commit.get("message") or "").splitlines()[0][:300] if isinstance(commit, dict) else "", } ) return jsonify({"ref": ref, "path": path or "", "items": items}) @app.get("/resources//repo/readme") def api_repo_readme(resource_id: int) -> Response: config = get_config() ref = (request.args.get("ref") or "").strip() row = fetch_one( "SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,), ) if row is None: abort(404) owner, repo, default_ref = row["repo_owner"], row["repo_name"], row["default_ref"] if not ref: ref = default_ref candidates = ["README.md", "readme.md", "README.MD", "Readme.md"] for name in candidates: resp = gogs_contents(owner, repo, name, ref) if resp.status_code == 404: continue if resp.status_code >= 400: return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502 try: data = resp.json() except Exception: msg = _gogs_error_message(resp) upstream_url = _safe_upstream_url(resp) return jsonify({"error": "gogs_invalid_response", "status": resp.status_code, "message": msg, "url": upstream_url}), 502 if data.get("type") != "file": continue size = parse_int(data.get("size"), 0) if size > config.max_preview_bytes: return jsonify({"error": "readme_too_large"}), 413 raw = base64.b64decode(data.get("content") or "", validate=False) try: text = raw.decode("utf-8") except UnicodeDecodeError: text = raw.decode("utf-8", errors="replace") return jsonify({"ref": ref, "path": name, "content": text}) return jsonify({"ref": ref, "path": None, "content": None}) _download_lock = threading.Lock() _download_jobs: dict[str, dict[str, Any]] = {} _download_build_sema = threading.Semaphore(int(os.environ.get("DOWNLOAD_BUILD_CONCURRENCY", "2") or "2")) _download_git_sema = threading.Semaphore( int(os.environ.get("DOWNLOAD_GIT_FALLBACK_CONCURRENCY", "1") or "1") ) _download_cache_ttl_seconds = int(os.environ.get("DOWNLOAD_CACHE_TTL_SECONDS", "900") or "900") def _download_cache_dir() -> Path: cfg = get_config() data_dir = cfg.database_path.parent d = data_dir / "download_cache" d.mkdir(parents=True, exist_ok=True) return d def _download_cache_path(*, resource_id: int, owner: str, repo: str, cache_key: str) -> Path: safe_owner = re.sub(r"[^a-zA-Z0-9._-]+", "_", (owner or "").strip())[:50] or "owner" safe_repo = re.sub(r"[^a-zA-Z0-9._-]+", "_", (repo or "").strip())[:50] or "repo" key = (cache_key or "").strip() if re.fullmatch(r"[0-9a-fA-F]{7,80}", key or ""): key_tag = key.lower()[:24] else: key_tag = hashlib.sha256(key.encode("utf-8")).hexdigest()[:24] rid = int(resource_id or 0) return _download_cache_dir() / f"res{rid}__{safe_owner}__{safe_repo}__{key_tag}.zip" def _download_cache_meta_path(zip_path: Path) -> Path: return zip_path.with_suffix(zip_path.suffix + ".meta.json") def _download_cache_ready(path: Path) -> bool: try: st = path.stat() except Exception: return False if st.st_size <= 0: return False if app.testing or (has_request_context() and bool(getattr(request, "environ", {}).get("werkzeug.test"))): return True if _download_cache_ttl_seconds <= 0: return True return (time.time() - float(st.st_mtime)) <= float(_download_cache_ttl_seconds) def _read_download_cache_meta(zip_path: Path) -> dict[str, Any] | None: meta_path = _download_cache_meta_path(zip_path) try: raw = meta_path.read_text(encoding="utf-8") except Exception: return None try: data = json.loads(raw) except Exception: return None return data if isinstance(data, dict) else None def _write_download_cache_meta(zip_path: Path, meta: dict[str, Any]) -> None: meta_path = _download_cache_meta_path(zip_path) tmp_meta = meta_path.with_suffix(meta_path.suffix + ".partial") try: tmp_meta.write_text(json.dumps(meta, ensure_ascii=False), encoding="utf-8") os.replace(tmp_meta, meta_path) finally: try: if tmp_meta.exists(): tmp_meta.unlink() except Exception: pass def _looks_like_commit(s: str) -> bool: t = (s or "").strip() if len(t) < 7 or len(t) > 40: return False return bool(re.fullmatch(r"[0-9a-fA-F]{7,40}", t)) def _resolve_download_commit(*, owner: str, repo: str, ref: str) -> dict[str, Any]: ref = (ref or "").strip() or "HEAD" if _looks_like_commit(ref): return {"ok": True, "ref": ref, "commit": ref.lower(), "kind": "commit"} try: info = gogs_resolve_ref_commit(owner, repo, ref) except Exception: info = {"ok": False, "ref": ref, "commit": None, "kind": "unknown"} if not isinstance(info, dict): return {"ok": False, "ref": ref, "commit": None, "kind": "unknown"} commit = (info.get("commit") or "").strip() if commit and _looks_like_commit(commit): return {"ok": True, "ref": ref, "commit": commit.lower(), "kind": info.get("kind") or "unknown"} return {"ok": False, "ref": ref, "commit": None, "kind": info.get("kind") or "unknown"} def _build_zip_to_cache( *, owner: str, repo: str, ref: str, commit: str | None, resolved_kind: str, out_path: Path ) -> None: out_path.parent.mkdir(parents=True, exist_ok=True) tmp_path = out_path.with_suffix(out_path.suffix + ".partial") meta: dict[str, Any] = { "owner": owner, "repo": repo, "ref": ref, "commit": (commit or "").strip() or None, "refKind": resolved_kind or "unknown", "builtAt": isoformat(utcnow()), } try: upstream_ref = (commit or "").strip() or ref upstream = gogs_archive_get(owner, repo, upstream_ref) if upstream.status_code == 404: base_url = (get_config().gogs_base_url or "").strip().rstrip("/") parts = urlsplit(base_url) if parts.scheme in {"http", "https"} and parts.netloc and not parts.username and not parts.password: fallback = f"{base_url}/{quote(owner)}/{quote(repo)}/archive/{quote(upstream_ref)}.zip" upstream = requests.get(fallback, stream=True, timeout=60, allow_redirects=False) if upstream.status_code < 400: with open(tmp_path, "wb") as f: for chunk in upstream.iter_content(chunk_size=1024 * 256): if chunk: f.write(chunk) os.replace(tmp_path, out_path) meta["method"] = "gogs_archive" meta["upstreamStatus"] = int(upstream.status_code) try: meta["bytes"] = int(out_path.stat().st_size) meta["mtime"] = int(out_path.stat().st_mtime) except Exception: pass _write_download_cache_meta(out_path, meta) return got_git = _download_git_sema.acquire(blocking=False) if not got_git: raise RuntimeError("git_fallback_busy") zip_path = None try: if commit and _looks_like_commit(commit): zip_path = gogs_git_archive_zip_commit(owner, repo, commit) else: zip_path = gogs_git_archive_zip(owner, repo, ref) with open(zip_path, "rb") as src, open(tmp_path, "wb") as dst: shutil.copyfileobj(src, dst, length=1024 * 256) os.replace(tmp_path, out_path) meta["method"] = "git_archive" meta["upstreamStatus"] = int(upstream.status_code) if upstream is not None else None try: meta["bytes"] = int(out_path.stat().st_size) meta["mtime"] = int(out_path.stat().st_mtime) except Exception: pass _write_download_cache_meta(out_path, meta) finally: try: if zip_path: os.unlink(zip_path) except Exception: pass _download_git_sema.release() finally: try: if tmp_path.exists(): os.unlink(tmp_path) except Exception: pass def _ensure_download_ready( *, resource_id: int, owner: str, repo: str, ref: str, commit: str | None, resolved_kind: str, force: bool = False, ) -> dict[str, Any]: ref = (ref or "").strip() or "HEAD" commit = (commit or "").strip() or None cache_key = commit or ref cache_path = _download_cache_path(resource_id=resource_id, owner=owner, repo=repo, cache_key=cache_key) if force: try: if cache_path.exists(): cache_path.unlink() except Exception: pass try: mp = _download_cache_meta_path(cache_path) if mp.exists(): mp.unlink() except Exception: pass if _download_cache_ready(cache_path): return {"ready": True, "path": cache_path, "ref": ref, "commit": commit, "cacheKey": cache_key} key = f"res{int(resource_id or 0)}:{owner}/{repo}@{cache_key}" with _download_lock: job = _download_jobs.get(key) if job and job.get("state") == "building": return {"ready": False, "state": "building", "ref": ref, "commit": commit, "cacheKey": cache_key} _download_jobs[key] = {"state": "building", "updatedAt": time.time(), "error": None, "ref": ref, "commit": commit, "cacheKey": cache_key} def runner() -> None: with app.app_context(): if not _download_build_sema.acquire(blocking=False): with _download_lock: _download_jobs[key] = {"state": "error", "updatedAt": time.time(), "error": "build_busy"} return try: _build_zip_to_cache(owner=owner, repo=repo, ref=ref, commit=commit, resolved_kind=resolved_kind, out_path=cache_path) with _download_lock: _download_jobs[key] = {"state": "ready", "updatedAt": time.time(), "error": None} except Exception as e: code = str(e) or "build_failed" with _download_lock: _download_jobs[key] = {"state": "error", "updatedAt": time.time(), "error": code[:120]} finally: _download_build_sema.release() force_sync = bool(app.testing) or (has_request_context() and bool(request.environ.get("werkzeug.test"))) if force_sync: runner() else: threading.Thread(target=runner, daemon=True).start() if _download_cache_ready(cache_path): return {"ready": True, "path": cache_path, "ref": ref, "commit": commit, "cacheKey": cache_key} with _download_lock: job = _download_jobs.get(key) or {} return { "ready": False, "state": job.get("state") or "building", "error": job.get("error"), "ref": ref, "commit": commit, "cacheKey": cache_key, } @app.post("/resources//download") def api_download_prepare(resource_id: int) -> Response: user = require_user() row = fetch_one("SELECT * FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,)) if row is None: abort(404) if row["type"] == "VIP" and not is_vip_active(user): return jsonify({"error": "vip_required"}), 403 payload = request.get_json(silent=True) or {} ref = (payload.get("ref") or "").strip() or row["default_ref"] owner, repo = row["repo_owner"], row["repo_name"] ip = (request.headers.get("X-Forwarded-For") or "").split(",")[0].strip() or (request.remote_addr or "") ua = (request.headers.get("User-Agent") or "").strip() if len(ip) > 64: ip = ip[:64] if len(ua) > 256: ua = ua[:256] execute( """ INSERT INTO download_logs (user_id, resource_id, resource_title_snapshot, resource_type_snapshot, ref_snapshot, downloaded_at, ip, user_agent) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (user["id"], row["id"], row["title"], row["type"], ref, isoformat(utcnow()), ip, ua), ) execute("UPDATE resources SET download_count = download_count + 1 WHERE id = ?", (resource_id,)) resolved = _resolve_download_commit(owner=owner, repo=repo, ref=ref) commit = resolved.get("commit") if resolved.get("ok") else None resolved_kind = resolved.get("kind") or "unknown" st = _ensure_download_ready( resource_id=resource_id, owner=owner, repo=repo, ref=ref, commit=commit, resolved_kind=resolved_kind, force=bool(payload.get("force")), ) cache_key = st.get("cacheKey") or (commit or ref) qs = {"ref": ref, "commit": commit} if commit else {"ref": ref} return jsonify( { "ok": True, "ready": bool(st.get("ready")), "state": st.get("state") or ("ready" if st.get("ready") else "building"), "error": st.get("error"), "ref": ref, "commit": commit, "cacheKey": cache_key, "downloadUrl": f"/resources/{resource_id}/download?{urlencode(qs)}", "statusUrl": f"/resources/{resource_id}/download/status?{urlencode(qs)}", } ) @app.get("/resources//download/status") def api_download_status(resource_id: int) -> Response: user = require_user() row = fetch_one("SELECT * FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,)) if row is None: abort(404) if row["type"] == "VIP" and not is_vip_active(user): return jsonify({"error": "vip_required"}), 403 ref = (request.args.get("ref") or "").strip() or row["default_ref"] commit_q = (request.args.get("commit") or "").strip() or None owner, repo = row["repo_owner"], row["repo_name"] resolved_kind = "unknown" commit = None if commit_q and _looks_like_commit(commit_q): commit = commit_q.lower() else: resolved = _resolve_download_commit(owner=owner, repo=repo, ref=ref) if resolved.get("ok"): commit = resolved.get("commit") resolved_kind = resolved.get("kind") or "unknown" cache_key = commit or ref cache_path = _download_cache_path(resource_id=resource_id, owner=owner, repo=repo, cache_key=cache_key) if _download_cache_ready(cache_path): meta = _read_download_cache_meta(cache_path) size = None mtime = None ttl_remaining = None try: st = cache_path.stat() size = int(st.st_size) mtime = int(st.st_mtime) if _download_cache_ttl_seconds > 0: ttl_remaining = max(0, int(_download_cache_ttl_seconds - (time.time() - float(st.st_mtime)))) except Exception: pass return jsonify( { "ok": True, "ready": True, "state": "ready", "error": None, "ref": ref, "commit": commit, "cacheKey": cache_key, "bytes": size, "mtime": mtime, "ttlRemainingSeconds": ttl_remaining, "meta": meta, } ) key = f"res{int(resource_id or 0)}:{owner}/{repo}@{cache_key}" with _download_lock: job = _download_jobs.get(key) or {} state = job.get("state") or "building" return jsonify( { "ok": True, "ready": False, "state": state, "error": job.get("error"), "ref": ref, "commit": commit, "cacheKey": cache_key, } ) @app.get("/resources//download") def api_download_file(resource_id: int) -> Response: user = require_user() row = fetch_one("SELECT * FROM resources WHERE id = ? AND status = 'ONLINE'", (resource_id,)) if row is None: abort(404) if row["type"] == "VIP" and not is_vip_active(user): return jsonify({"error": "vip_required"}), 403 ref = (request.args.get("ref") or "").strip() or row["default_ref"] commit_q = (request.args.get("commit") or "").strip() or None owner, repo = row["repo_owner"], row["repo_name"] resolved_kind = "unknown" commit = None if commit_q and _looks_like_commit(commit_q): commit = commit_q.lower() else: resolved = _resolve_download_commit(owner=owner, repo=repo, ref=ref) if resolved.get("ok"): commit = resolved.get("commit") resolved_kind = resolved.get("kind") or "unknown" st = _ensure_download_ready( resource_id=resource_id, owner=owner, repo=repo, ref=ref, commit=commit, resolved_kind=resolved_kind, ) if not st.get("ready"): return jsonify({"error": st.get("error") or "building", "state": st.get("state") or "building"}), 202 cache_path = st.get("path") if not isinstance(cache_path, Path) or not cache_path.exists(): return jsonify({"error": "download_not_ready"}), 409 filename = f"{owner}-{repo}-{ref}.zip".replace("/", "-") if app.testing or (has_request_context() and bool(request.environ.get("werkzeug.test"))): with open(cache_path, "rb") as f: payload = f.read() return Response( payload, headers={ "Content-Type": "application/zip", "Content-Disposition": f'attachment; filename="{filename}"', }, ) f = open(cache_path, "rb") resp = send_file( f, mimetype="application/zip", as_attachment=True, download_name=filename, conditional=True, max_age=0, ) resp.call_on_close(f.close) resp.direct_passthrough = False return resp @app.post("/orders") def api_create_order() -> Response: user = require_user() payload = request.get_json(silent=True) or {} plan_id = parse_int(payload.get("planId"), 0) plan = fetch_one("SELECT * FROM plans WHERE id = ? AND enabled = 1", (plan_id,)) if plan is None: return jsonify({"error": "plan_not_found"}), 404 order_id = uuid.uuid4().hex snapshot = { "name": plan["name"], "durationDays": plan["duration_days"], "priceCents": plan["price_cents"], } execute( """ INSERT INTO orders (id, user_id, plan_id, amount_cents, status, created_at, plan_snapshot_json) VALUES (?, ?, ?, ?, 'PENDING', ?, ?) """, (order_id, user["id"], plan["id"], plan["price_cents"], isoformat(utcnow()), json.dumps(snapshot)), ) return jsonify({"id": order_id, "status": "PENDING", "amountCents": plan["price_cents"], "plan": snapshot}) @app.get("/orders") def api_my_orders() -> Response: user = require_user() rows = fetch_all( "SELECT * FROM orders WHERE user_id = ? ORDER BY created_at DESC", (user["id"],), ) items = [] for row in rows: items.append( { "id": row["id"], "status": row["status"], "amountCents": row["amount_cents"], "createdAt": row["created_at"], "paidAt": row["paid_at"], "planSnapshot": json.loads(row["plan_snapshot_json"]), } ) return jsonify({"items": items}) @app.get("/me/downloads") def api_my_downloads() -> Response: user = require_user() page = max(parse_int(request.args.get("page"), 1), 1) page_size = min(max(parse_int(request.args.get("pageSize"), 20), 1), 50) offset = (page - 1) * page_size total_row = fetch_one("SELECT COUNT(1) AS cnt FROM download_logs WHERE user_id = ?", (user["id"],)) total = int(total_row["cnt"] if total_row is not None else 0) rows = fetch_all( """ SELECT dl.id, dl.user_id, dl.resource_id, dl.resource_title_snapshot, dl.resource_type_snapshot, dl.ref_snapshot, dl.downloaded_at, r.id AS r_id, r.status AS r_status, r.type AS r_type FROM download_logs dl LEFT JOIN resources r ON r.id = dl.resource_id WHERE dl.user_id = ? ORDER BY dl.downloaded_at DESC, dl.id DESC LIMIT ? OFFSET ? """, (user["id"], page_size, offset), ) items = [] for row in rows: if row["r_id"] is None: resource_state = "DELETED" elif row["r_status"] != "ONLINE": resource_state = "OFFLINE" else: resource_state = "ONLINE" items.append( { "id": row["id"], "resourceId": row["resource_id"], "resourceTitle": row["resource_title_snapshot"], "resourceType": row["resource_type_snapshot"], "currentResourceType": row["r_type"] if row["r_id"] is not None else None, "ref": row["ref_snapshot"], "downloadedAt": row["downloaded_at"], "resourceState": resource_state, } ) return jsonify({"items": items, "total": total, "page": page, "pageSize": page_size}) @app.get("/me/messages") def api_my_messages() -> Response: user = require_user() unread_only_raw = (request.args.get("unread") or "").strip().lower() unread_only = unread_only_raw in {"1", "true", "yes", "on"} page = max(parse_int(request.args.get("page"), 1), 1) page_size = min(max(parse_int(request.args.get("pageSize"), 20), 1), 50) offset = (page - 1) * page_size where = ["user_id = ?"] params: list[Any] = [user["id"]] if unread_only: where.append("read_at IS NULL") where_sql = f"WHERE {' AND '.join(where)}" total_row = fetch_one(f"SELECT COUNT(1) AS cnt FROM user_messages {where_sql}", tuple(params)) total = int(total_row["cnt"] if total_row is not None else 0) unread_row = fetch_one( "SELECT COUNT(1) AS cnt FROM user_messages WHERE user_id = ? AND read_at IS NULL", (user["id"],), ) unread_count = int(unread_row["cnt"] if unread_row is not None else 0) rows = fetch_all( f""" SELECT id, title, content, created_at, read_at FROM user_messages {where_sql} ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ? """, tuple(params + [page_size, offset]), ) items = [] for row in rows: items.append( { "id": row["id"], "title": row["title"], "content": row["content"], "createdAt": row["created_at"], "readAt": row["read_at"], "read": bool(row["read_at"]), } ) return jsonify({"items": items, "total": total, "unreadCount": unread_count, "page": page, "pageSize": page_size}) @app.put("/me/messages//read") def api_my_message_read(message_id: int) -> Response: user = require_user() execute( """ UPDATE user_messages SET read_at = ? WHERE id = ? AND user_id = ? AND read_at IS NULL """, (isoformat(utcnow()), message_id, user["id"]), ) return jsonify({"ok": True}) @app.post("/orders//pay") def api_pay_order(order_id: str) -> Response: user = require_user() config = get_config() row = fetch_one("SELECT * FROM orders WHERE id = ? AND user_id = ?", (order_id, user["id"])) if row is None: abort(404) if row["status"] != "PENDING": return jsonify({"error": "order_not_pending"}), 409 # 判断是否启用模拟支付 enable_mock_pay_raw = get_setting_value("ENABLE_MOCK_PAY") if enable_mock_pay_raw is None: enable_mock_pay = bool(config.enable_mock_pay) else: enable_mock_pay = enable_mock_pay_raw.strip().lower() in {"1", "true", "yes", "on"} snapshot = json.loads(row["plan_snapshot_json"]) # 模拟支付:直接标记为已支付并发放会员权益 if enable_mock_pay: execute( "UPDATE orders SET status = 'PAID', paid_at = ?, pay_channel = ?, pay_trade_no = ? WHERE id = ?", (isoformat(utcnow()), "MOCK", None, order_id), ) extend_vip(user["id"], int(snapshot["durationDays"])) return jsonify({"ok": True, "provider": "MOCK", "status": "PAID"}) # 真实支付:调用中间层 REST API 创建支付订单 host_base = request.host_url.rstrip("/") # return_url:优先读 .env,否则回落到本站 /pay/return return_url = config.pay_return_url or f"{host_base}/pay/return" # callback_url:优先读 .env,否则回落到本站 /pay/notify callback_url = config.pay_callback_url or f"{host_base}/pay/notify" amount_cents = int(row["amount_cents"] or 0) total_amount = float((Decimal(amount_cents) / Decimal(100)).quantize(Decimal("0.01"))) subject = f"VIP {snapshot.get('name') or ''}".strip()[:120] or "VIP" pay_api_base = config.pay_api_base_url pay_payload = { "bill_no": order_id, "amount": total_amount, "subject": subject, "return_url": return_url, "callback_url": callback_url, } import sys print(f"[PAY] 调用中间层: POST {pay_api_base}/api/alipay/pay", file=sys.stderr) print(f"[PAY] 请求体: {pay_payload}", file=sys.stderr) try: resp = requests.post( f"{pay_api_base}/api/alipay/pay", json=pay_payload, timeout=15, ) except Exception as e: print(f"[PAY] 连接失败: {e}", file=sys.stderr) return jsonify({"error": "pay_api_unreachable", "detail": str(e)}), 502 print(f"[PAY] 响应状态: {resp.status_code}", file=sys.stderr) print(f"[PAY] 响应体: {resp.text[:500]}", file=sys.stderr) if resp.status_code != 200: try: detail = resp.json() except Exception: detail = resp.text[:200] return jsonify({"error": "pay_api_error", "detail": detail}), 502 data = resp.json() pay_url = data.get("payment_url") or "" if not pay_url: return jsonify({"error": "pay_api_no_payment_url", "detail": data}), 502 execute("UPDATE orders SET pay_channel = ? WHERE id = ? AND status = 'PENDING'", ("ALIPAY", order_id)) return jsonify({"ok": True, "provider": "ALIPAY", "status": "PENDING", "payUrl": pay_url}) @app.post("/orders//query-and-activate") def api_order_query_and_activate(order_id: str) -> Response: """前端轮询调用:向中间层查询订单状态,若已支付则激活订单并发放会员权益。 与 callback_url 回调竞争,后端已做幂等保护,重复调用安全。 """ user = require_user() row = fetch_one("SELECT * FROM orders WHERE id = ? AND user_id = ?", (order_id, user["id"])) if row is None: abort(404) # 已支付,直接返回,无需再查 if row["status"] == "PAID": return jsonify({"status": "PAID"}) # 非 PENDING 状态(CLOSED/FAILED)也直接返回 if row["status"] != "PENDING": return jsonify({"status": row["status"]}) config = get_config() pay_api_base = config.pay_api_base_url # 调用中间层查询接口 try: resp = requests.get( f"{pay_api_base}/api/alipay/query", params={"bill_no": order_id}, timeout=10, ) except Exception as e: return jsonify({"status": "PENDING", "error": str(e)}), 200 if resp.status_code != 200: return jsonify({"status": "PENDING"}), 200 try: data = resp.json() except Exception: return jsonify({"status": "PENDING"}), 200 trade_status = (data.get("trade_status") or "").strip().upper() if trade_status not in {"TRADE_SUCCESS", "TRADE_FINISHED"}: # 返回中间层的状态供前端判断是否继续轮询 return jsonify({"status": "PENDING", "tradeStatus": trade_status}), 200 # 查询到支付成功,激活订单(幂等:只有 PENDING 状态才会更新) trade_no = (data.get("trade_no") or "").strip() amount_raw = str(data.get("amount") or "").strip() try: amount = Decimal(amount_raw).quantize(Decimal("0.01")) except Exception: return jsonify({"status": "PENDING", "error": "invalid_amount"}), 200 expect_amount = (Decimal(int(row["amount_cents"] or 0)) / Decimal(100)).quantize(Decimal("0.01")) if amount != expect_amount: return jsonify({"status": "PENDING", "error": "amount_mismatch"}), 200 snapshot = json.loads(row["plan_snapshot_json"]) cur = execute( """ UPDATE orders SET status = 'PAID', paid_at = ?, pay_channel = ?, pay_trade_no = ? WHERE id = ? AND status = 'PENDING' """, (isoformat(utcnow()), "ALIPAY", trade_no or None, order_id), ) if getattr(cur, "rowcount", 0) == 1: extend_vip(int(row["user_id"]), int(snapshot["durationDays"])) return jsonify({"status": "PAID"}) @app.post("/pay/callback") def api_pay_callback() -> Response: """兼容旧版支付宝直连回调(form 表单格式),保留以防万一。""" params: dict[str, Any] = {} try: for k in request.form.keys(): params[k] = request.form.get(k) except Exception: params = {} if not params: try: params = dict(request.args) except Exception: params = {} sign = (params.get("sign") or "").strip() sign_type = (params.get("sign_type") or "RSA2").strip().upper() if not sign or sign_type != "RSA2": return Response("fail", mimetype="text/plain") verify_params = dict(params) verify_params.pop("sign", None) verify_params.pop("sign_type", None) sign_content = _alipay_sign_content(verify_params) alipay_public_key = (get_setting_value("ALIPAY_PUBLIC_KEY") or "").strip() alipay_app_id = (get_setting_value("ALIPAY_APP_ID") or "").strip() if not alipay_public_key: return Response("fail", mimetype="text/plain") try: ok = _alipay_rsa2_verify(sign_content, sign, alipay_public_key) except RuntimeError: return Response("fail", mimetype="text/plain") if not ok: return Response("fail", mimetype="text/plain") out_trade_no = (params.get("out_trade_no") or "").strip() trade_no = (params.get("trade_no") or "").strip() trade_status = (params.get("trade_status") or "").strip().upper() total_amount_raw = (params.get("total_amount") or "").strip() app_id = (params.get("app_id") or "").strip() if alipay_app_id and app_id and alipay_app_id != app_id: return Response("fail", mimetype="text/plain") if not out_trade_no: return Response("fail", mimetype="text/plain") if trade_status not in {"TRADE_SUCCESS", "TRADE_FINISHED"}: return Response("success", mimetype="text/plain") row = fetch_one("SELECT * FROM orders WHERE id = ?", (out_trade_no,)) if row is None: return Response("success", mimetype="text/plain") if row["status"] == "PAID": return Response("success", mimetype="text/plain") try: amount = Decimal(total_amount_raw).quantize(Decimal("0.01")) except Exception: return Response("fail", mimetype="text/plain") expect_amount = (Decimal(int(row["amount_cents"] or 0)) / Decimal(100)).quantize(Decimal("0.01")) if amount != expect_amount: return Response("fail", mimetype="text/plain") snapshot = json.loads(row["plan_snapshot_json"]) cur = execute( """ UPDATE orders SET status = 'PAID', paid_at = ?, pay_channel = ?, pay_trade_no = ? WHERE id = ? AND status = 'PENDING' """, (isoformat(utcnow()), "ALIPAY", trade_no or None, out_trade_no), ) if getattr(cur, "rowcount", 0) == 1: extend_vip(int(row["user_id"]), int(snapshot["durationDays"])) return Response("success", mimetype="text/plain") @app.post("/pay/notify") def api_pay_notify() -> Response: """中间层 REST API 支付成功后的异步回调接口(JSON 格式)。 接收字段:bill_no, trade_no, trade_status, amount, paid_at """ data = request.get_json(silent=True) or {} bill_no = (data.get("bill_no") or "").strip() trade_no = (data.get("trade_no") or "").strip() trade_status = (data.get("trade_status") or "").strip().upper() amount_raw = str(data.get("amount") or "").strip() if not bill_no: return jsonify({"error": "bill_no_missing"}), 400 # 只处理支付成功的状态 if trade_status not in {"TRADE_SUCCESS", "TRADE_FINISHED"}: return jsonify({"ok": True, "msg": "ignored"}), 200 row = fetch_one("SELECT * FROM orders WHERE id = ?", (bill_no,)) if row is None: # 订单不存在,返回 200 避免中间层重试 return jsonify({"ok": True, "msg": "order_not_found"}), 200 if row["status"] == "PAID": # 幂等:已处理过,直接返回成功 return jsonify({"ok": True, "msg": "already_paid"}), 200 # 校验金额 try: amount = Decimal(amount_raw).quantize(Decimal("0.01")) except Exception: return jsonify({"error": "invalid_amount"}), 400 expect_amount = (Decimal(int(row["amount_cents"] or 0)) / Decimal(100)).quantize(Decimal("0.01")) if amount != expect_amount: return jsonify({"error": "amount_mismatch"}), 400 snapshot = json.loads(row["plan_snapshot_json"]) cur = execute( """ UPDATE orders SET status = 'PAID', paid_at = ?, pay_channel = ?, pay_trade_no = ? WHERE id = ? AND status = 'PENDING' """, (isoformat(utcnow()), "ALIPAY", trade_no or None, bill_no), ) if getattr(cur, "rowcount", 0) == 1: extend_vip(int(row["user_id"]), int(snapshot["durationDays"])) return jsonify({"ok": True}), 200 @app.get("/pay/return") def api_pay_return() -> Response: """支付宝支付完成后的同步跳转落地页。 中间层会将用户浏览器重定向到此地址,展示支付结果并跳转到个人中心。 """ bill_no = (request.args.get("bill_no") or request.args.get("out_trade_no") or "").strip() status = "unknown" if bill_no: row = fetch_one("SELECT status FROM orders WHERE id = ?", (bill_no,)) if row: status = row["status"] # 渲染一个简单的跳转页面,3 秒后自动跳转到个人中心 html = f""" 支付结果
{"✅" if status == "PAID" else "⏳"}

{"支付成功" if status == "PAID" else "支付处理中"}

{"会员权益已生效,正在跳转到个人中心…" if status == "PAID" else "订单处理中,请稍候,正在跳转…"}

立即前往个人中心
""" return Response(html, mimetype="text/html") @app.post("/admin/auth/login") def api_admin_login() -> Response: payload = request.get_json(silent=True) or {} username = (payload.get("username") or "").strip() password = payload.get("password") or "" admin = fetch_one("SELECT * FROM admin_users WHERE username = ?", (username,)) if admin is None or not check_password_hash(admin["password_hash"], password): return jsonify({"error": "invalid_credentials"}), 401 if admin["status"] != "ACTIVE": return jsonify({"error": "admin_disabled"}), 403 session["admin_user_id"] = admin["id"] execute("UPDATE admin_users SET last_login_at = ? WHERE id = ?", (isoformat(utcnow()), admin["id"])) return jsonify({"ok": True}) @app.post("/admin/auth/logout") def api_admin_logout() -> Response: session.pop("admin_user_id", None) return jsonify({"ok": True}) @app.get("/admin/stats") def api_admin_stats() -> Response: _ = require_admin() from datetime import timedelta now = utcnow() now_s = isoformat(now) since_24h = isoformat(now - timedelta(hours=24)) def _count(sql: str, params: tuple = ()) -> int: row = fetch_one(sql, params) if row is None: return 0 v = row.get("c") if isinstance(row, dict) else row["c"] try: return int(v or 0) except Exception: return 0 def _sum(sql: str, params: tuple = ()) -> int: row = fetch_one(sql, params) if row is None: return 0 v = row.get("s") if isinstance(row, dict) else row["s"] try: return int(v or 0) except Exception: return 0 users_total = _count("SELECT COUNT(1) AS c FROM users") users_active = _count("SELECT COUNT(1) AS c FROM users WHERE status = ?", ("ACTIVE",)) users_vip_active = _count( "SELECT COUNT(1) AS c FROM users WHERE vip_expire_at IS NOT NULL AND vip_expire_at > ?", (now_s,), ) resources_total = _count("SELECT COUNT(1) AS c FROM resources") resources_online = _count("SELECT COUNT(1) AS c FROM resources WHERE status = ?", ("ONLINE",)) orders_total = _count("SELECT COUNT(1) AS c FROM orders") orders_paid = _count("SELECT COUNT(1) AS c FROM orders WHERE status = ?", ("PAID",)) orders_pending = _count("SELECT COUNT(1) AS c FROM orders WHERE status = ?", ("PENDING",)) revenue_total_cents = _sum("SELECT COALESCE(SUM(amount_cents), 0) AS s FROM orders WHERE status = ?", ("PAID",)) revenue_24h_cents = _sum( "SELECT COALESCE(SUM(amount_cents), 0) AS s FROM orders WHERE status = ? AND paid_at IS NOT NULL AND paid_at >= ?", ("PAID", since_24h), ) downloads_total = _count("SELECT COUNT(1) AS c FROM download_logs") downloads_24h = _count("SELECT COUNT(1) AS c FROM download_logs WHERE downloaded_at >= ?", (since_24h,)) msg_total = _count("SELECT COUNT(1) AS c FROM user_messages") msg_24h = _count("SELECT COUNT(1) AS c FROM user_messages WHERE created_at >= ?", (since_24h,)) return jsonify( { "now": now_s, "users": {"total": users_total, "active": users_active, "vipActive": users_vip_active}, "resources": {"total": resources_total, "online": resources_online}, "orders": {"total": orders_total, "paid": orders_paid, "pending": orders_pending}, "revenue": {"totalCents": revenue_total_cents, "last24hCents": revenue_24h_cents}, "downloads": {"total": downloads_total, "last24h": downloads_24h}, "messages": {"total": msg_total, "last24h": msg_24h}, "backend": get_active_backend(), } ) @app.post("/admin/uploads") def api_admin_upload() -> Response: _ = require_admin() f = request.files.get("file") if f is None: return jsonify({"error": "file_required"}), 400 max_bytes = 50 * 1024 * 1024 if request.content_length is not None and int(request.content_length) > max_bytes: return jsonify({"error": "file_too_large"}), 413 original = secure_filename(f.filename or "") ext = os.path.splitext(original)[1].lower() allowed = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".mp4", ".webm", ".mov", ".m4v"} if ext and ext not in allowed: return jsonify({"error": "unsupported_file_type"}), 400 name = f"{uuid.uuid4().hex}{ext}" try: storage = _get_upload_storage() resp = storage.save_upload(f, name) return jsonify({"url": resp.get("url"), "name": resp.get("name")}) except RuntimeError as e: return jsonify({"error": str(e) or "upload_failed"}), 500 except Exception: return jsonify({"error": "upload_failed"}), 500 @app.delete("/admin/uploads/") def api_admin_delete_upload(name: str) -> Response: _ = require_admin() try: storage = _get_upload_storage() storage.delete_uploads({name}) except RuntimeError as e: return jsonify({"error": str(e) or "delete_failed"}), 500 except Exception: return jsonify({"error": "delete_failed"}), 500 return jsonify({"ok": True}) @app.get("/admin/uploads") def api_admin_uploads_list() -> Response: _ = require_admin() q = (request.args.get("q") or "").strip().lower() used_filter = (request.args.get("used") or "").strip().lower() used_names: set[str] = set() for row in fetch_all("SELECT cover_url, summary FROM resources", ()): used_names |= _extract_upload_names(row["cover_url"]) used_names |= _extract_upload_names(row["summary"]) used_lower = {n.lower() for n in used_names} try: storage = _get_upload_storage() all_items = storage.list_items() or [] except RuntimeError as e: return jsonify({"error": str(e) or "list_failed"}), 500 except Exception: return jsonify({"error": "list_failed"}), 500 for it in all_items: name = str(it.get("name") or "") it["used"] = bool(name.lower() in used_lower) all_items.sort(key=lambda x: x["mtime"], reverse=True) stats = { "totalCount": len(all_items), "totalBytes": sum(int(i.get("bytes") or 0) for i in all_items), "usedCount": sum(1 for i in all_items if i.get("used")), "usedBytes": sum(int(i.get("bytes") or 0) for i in all_items if i.get("used")), } stats["unusedCount"] = int(stats["totalCount"] - stats["usedCount"]) stats["unusedBytes"] = int(stats["totalBytes"] - stats["usedBytes"]) items = all_items if q: items = [i for i in items if q in str(i.get("name") or "").lower()] if used_filter == "used": items = [i for i in items if i.get("used")] elif used_filter == "unused": items = [i for i in items if not i.get("used")] return jsonify({"items": items, "stats": stats}) @app.post("/admin/uploads/cleanup-unused") def api_admin_uploads_cleanup_unused() -> Response: _ = require_admin() used_names: set[str] = set() for row in fetch_all("SELECT cover_url, summary FROM resources", ()): used_names |= _extract_upload_names(row["cover_url"]) used_names |= _extract_upload_names(row["summary"]) used_lower = {n.lower() for n in used_names} try: storage = _get_upload_storage() all_items = storage.list_items() or [] all_names = {str(i.get("name") or "") for i in all_items if str(i.get("name") or "")} unused = {n for n in all_names if n.lower() not in used_lower} storage.delete_uploads(unused) return jsonify({"ok": True, "deletedCount": len(unused)}) except RuntimeError as e: return jsonify({"error": str(e) or "cleanup_failed"}), 500 except Exception: return jsonify({"error": "cleanup_failed"}), 500 @app.get("/admin/settings") def api_admin_settings_get() -> Response: _ = require_admin() config = get_config() gogs_base_url = (get_setting_value("GOGS_BASE_URL") or config.gogs_base_url).rstrip("/") gogs_token = get_setting_value("GOGS_TOKEN") if gogs_token is not None: gogs_token = gogs_token.strip() or None if gogs_token is None: gogs_token = config.gogs_token pay_provider = (get_setting_value("PAY_PROVIDER") or "MOCK").strip().upper() pay_api_key = get_setting_value("PAY_API_KEY") if pay_api_key is not None: pay_api_key = pay_api_key.strip() or None alipay_app_id = (get_setting_value("ALIPAY_APP_ID") or "").strip() alipay_gateway = (get_setting_value("ALIPAY_GATEWAY") or "https://openapi.alipay.com/gateway.do").strip() alipay_notify_url = (get_setting_value("ALIPAY_NOTIFY_URL") or "").strip() alipay_return_url = (get_setting_value("ALIPAY_RETURN_URL") or "").strip() alipay_private_key = get_setting_value("ALIPAY_PRIVATE_KEY") if alipay_private_key is not None: alipay_private_key = alipay_private_key.strip() or None alipay_public_key = get_setting_value("ALIPAY_PUBLIC_KEY") if alipay_public_key is not None: alipay_public_key = alipay_public_key.strip() or None llm_provider = (get_setting_value("LLM_PROVIDER") or "").strip() llm_base_url = (get_setting_value("LLM_BASE_URL") or "").strip() llm_model = (get_setting_value("LLM_MODEL") or "").strip() llm_api_key = get_setting_value("LLM_API_KEY") if llm_api_key is not None: llm_api_key = llm_api_key.strip() or None redis_url = get_setting_value("REDIS_URL") if redis_url is not None: redis_url = redis_url.strip() or None redis_url_safe = "" if redis_url: try: parts = urlsplit(redis_url) if parts.scheme in {"redis", "rediss"} and parts.netloc: netloc = parts.netloc if "@" in netloc: netloc = netloc.split("@", 1)[1] redis_url_safe = urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment)) else: redis_url_safe = redis_url except Exception: redis_url_safe = "" enable_mock_pay_raw = get_setting_value("ENABLE_MOCK_PAY") if enable_mock_pay_raw is None: enable_mock_pay = bool(config.enable_mock_pay) else: enable_mock_pay = enable_mock_pay_raw.strip().lower() in {"1", "true", "yes", "on"} storage_provider = (get_setting_value("STORAGE_PROVIDER") or "AUTO").strip().upper() if storage_provider not in {"AUTO", "LOCAL", "OSS"}: storage_provider = "AUTO" oss_endpoint = (get_setting_value("OSS_ENDPOINT") or "").strip().rstrip("/") oss_bucket = (get_setting_value("OSS_BUCKET") or "").strip() oss_access_key_id = (get_setting_value("OSS_ACCESS_KEY_ID") or "").strip() oss_access_key_secret = get_setting_value("OSS_ACCESS_KEY_SECRET") if oss_access_key_secret is not None: oss_access_key_secret = oss_access_key_secret.strip() or None oss_upload_prefix = _normalize_upload_prefix(get_setting_value("OSS_UPLOAD_PREFIX") or "uploads/") oss_public_base_url = (get_setting_value("OSS_PUBLIC_BASE_URL") or "").strip().rstrip("/") return jsonify( { "gogsBaseUrl": gogs_base_url, "hasGogsToken": bool(gogs_token), "payment": { "provider": pay_provider, "hasApiKey": bool(pay_api_key), "enableMockPay": enable_mock_pay, "alipay": { "appId": alipay_app_id, "gateway": alipay_gateway, "notifyUrl": alipay_notify_url, "returnUrl": alipay_return_url, "hasPrivateKey": bool(alipay_private_key), "hasPublicKey": bool(alipay_public_key), }, }, "llm": {"provider": llm_provider, "baseUrl": llm_base_url, "model": llm_model, "hasApiKey": bool(llm_api_key)}, "cache": {"hasRedisUrl": bool(redis_url), "redisUrl": redis_url_safe}, "storage": { "provider": storage_provider, "oss": { "endpoint": oss_endpoint, "bucket": oss_bucket, "accessKeyId": oss_access_key_id, "hasAccessKeySecret": bool(oss_access_key_secret), "uploadPrefix": oss_upload_prefix, "publicBaseUrl": oss_public_base_url, }, }, "db": db_status(), } ) @app.put("/admin/settings") def api_admin_settings_put() -> Response: admin = require_admin() payload = request.get_json(silent=True) or {} config = get_config() gogs_base_url = payload.get("gogsBaseUrl") gogs_token = payload.get("gogsToken") clear_token = bool(payload.get("clearGogsToken")) pay = payload.get("payment") or {} llm = payload.get("llm") or {} cache = payload.get("cache") or {} storage = payload.get("storage") or {} mysql = payload.get("mysql") or {} before = { "gogsBaseUrl": (get_setting_value("GOGS_BASE_URL") or config.gogs_base_url).rstrip("/"), "hasGogsToken": bool((get_setting_value("GOGS_TOKEN") or "").strip() or config.gogs_token), "payment": { "provider": (get_setting_value("PAY_PROVIDER") or "MOCK").strip().upper(), "hasApiKey": bool((get_setting_value("PAY_API_KEY") or "").strip()), "enableMockPay": (get_setting_value("ENABLE_MOCK_PAY") or "").strip().lower() in {"1", "true", "yes", "on"}, "alipay": { "appId": (get_setting_value("ALIPAY_APP_ID") or "").strip(), "gateway": (get_setting_value("ALIPAY_GATEWAY") or "").strip(), "notifyUrl": (get_setting_value("ALIPAY_NOTIFY_URL") or "").strip(), "returnUrl": (get_setting_value("ALIPAY_RETURN_URL") or "").strip(), "hasPrivateKey": bool((get_setting_value("ALIPAY_PRIVATE_KEY") or "").strip()), "hasPublicKey": bool((get_setting_value("ALIPAY_PUBLIC_KEY") or "").strip()), }, }, "llm": { "provider": (get_setting_value("LLM_PROVIDER") or "").strip(), "baseUrl": (get_setting_value("LLM_BASE_URL") or "").strip(), "model": (get_setting_value("LLM_MODEL") or "").strip(), "hasApiKey": bool((get_setting_value("LLM_API_KEY") or "").strip()), }, "cache": {"hasRedisUrl": bool((get_setting_value("REDIS_URL") or "").strip())}, "storage": { "provider": (get_setting_value("STORAGE_PROVIDER") or "AUTO").strip().upper(), "oss": { "endpoint": (get_setting_value("OSS_ENDPOINT") or "").strip().rstrip("/"), "bucket": (get_setting_value("OSS_BUCKET") or "").strip(), "accessKeyId": (get_setting_value("OSS_ACCESS_KEY_ID") or "").strip(), "hasAccessKeySecret": bool((get_setting_value("OSS_ACCESS_KEY_SECRET") or "").strip()), "uploadPrefix": _normalize_upload_prefix(get_setting_value("OSS_UPLOAD_PREFIX") or "uploads/"), "publicBaseUrl": (get_setting_value("OSS_PUBLIC_BASE_URL") or "").strip().rstrip("/"), }, }, "db": db_status(), } if gogs_base_url is not None: gogs_base_url = str(gogs_base_url).strip().rstrip("/") if gogs_base_url and not (gogs_base_url.startswith("http://") or gogs_base_url.startswith("https://")): return jsonify({"error": "invalid_gogs_base_url"}), 400 if gogs_base_url: set_setting_value("GOGS_BASE_URL", gogs_base_url, category="GOGS") else: delete_setting_value("GOGS_BASE_URL") if clear_token: delete_setting_value("GOGS_TOKEN") elif gogs_token is not None: gogs_token = str(gogs_token).strip() if gogs_token: set_setting_value("GOGS_TOKEN", gogs_token, category="GOGS") pay_provider = pay.get("provider") pay_api_key = pay.get("apiKey") clear_pay_api_key = bool(pay.get("clearApiKey")) enable_mock_pay = pay.get("enableMockPay") alipay = pay.get("alipay") or {} if pay_provider is not None: pay_provider = str(pay_provider).strip().upper() if pay_provider: set_setting_value("PAY_PROVIDER", pay_provider, category="PAYMENT") if enable_mock_pay is not None: set_setting_value("ENABLE_MOCK_PAY", "1" if bool(enable_mock_pay) else "0", category="PAYMENT") if clear_pay_api_key: delete_setting_value("PAY_API_KEY") elif pay_api_key is not None: pay_api_key = str(pay_api_key).strip() if pay_api_key: set_setting_value("PAY_API_KEY", pay_api_key, category="PAYMENT") alipay_app_id = alipay.get("appId") alipay_gateway = alipay.get("gateway") alipay_notify_url = alipay.get("notifyUrl") alipay_return_url = alipay.get("returnUrl") alipay_private_key = alipay.get("privateKey") clear_alipay_private_key = bool(alipay.get("clearPrivateKey")) alipay_public_key = alipay.get("publicKey") clear_alipay_public_key = bool(alipay.get("clearPublicKey")) if alipay_app_id is not None: alipay_app_id = str(alipay_app_id).strip() if alipay_app_id: set_setting_value("ALIPAY_APP_ID", alipay_app_id, category="PAYMENT") else: delete_setting_value("ALIPAY_APP_ID") if alipay_gateway is not None: alipay_gateway = str(alipay_gateway).strip().rstrip("/") if alipay_gateway: if not (alipay_gateway.startswith("http://") or alipay_gateway.startswith("https://")): return jsonify({"error": "invalid_alipay_gateway"}), 400 set_setting_value("ALIPAY_GATEWAY", alipay_gateway, category="PAYMENT") else: delete_setting_value("ALIPAY_GATEWAY") if alipay_notify_url is not None: alipay_notify_url = str(alipay_notify_url).strip() if alipay_notify_url: if not (alipay_notify_url.startswith("http://") or alipay_notify_url.startswith("https://")): return jsonify({"error": "invalid_alipay_notify_url"}), 400 set_setting_value("ALIPAY_NOTIFY_URL", alipay_notify_url, category="PAYMENT") else: delete_setting_value("ALIPAY_NOTIFY_URL") if alipay_return_url is not None: alipay_return_url = str(alipay_return_url).strip() if alipay_return_url: if not (alipay_return_url.startswith("http://") or alipay_return_url.startswith("https://")): return jsonify({"error": "invalid_alipay_return_url"}), 400 set_setting_value("ALIPAY_RETURN_URL", alipay_return_url, category="PAYMENT") else: delete_setting_value("ALIPAY_RETURN_URL") if clear_alipay_private_key: delete_setting_value("ALIPAY_PRIVATE_KEY") elif alipay_private_key is not None: alipay_private_key = str(alipay_private_key).strip() if alipay_private_key: set_setting_value("ALIPAY_PRIVATE_KEY", alipay_private_key, category="PAYMENT") if clear_alipay_public_key: delete_setting_value("ALIPAY_PUBLIC_KEY") elif alipay_public_key is not None: alipay_public_key = str(alipay_public_key).strip() if alipay_public_key: set_setting_value("ALIPAY_PUBLIC_KEY", alipay_public_key, category="PAYMENT") llm_provider = llm.get("provider") llm_base_url = llm.get("baseUrl") llm_model = llm.get("model") llm_api_key = llm.get("apiKey") clear_llm_api_key = bool(llm.get("clearApiKey")) if llm_provider is not None: llm_provider = str(llm_provider).strip() if llm_provider: set_setting_value("LLM_PROVIDER", llm_provider, category="LLM") if llm_base_url is not None: llm_base_url = str(llm_base_url).strip().rstrip("/") if llm_base_url: set_setting_value("LLM_BASE_URL", llm_base_url, category="LLM") if llm_model is not None: llm_model = str(llm_model).strip() if llm_model: set_setting_value("LLM_MODEL", llm_model, category="LLM") if clear_llm_api_key: delete_setting_value("LLM_API_KEY") elif llm_api_key is not None: llm_api_key = str(llm_api_key).strip() if llm_api_key: set_setting_value("LLM_API_KEY", llm_api_key, category="LLM") redis_url = cache.get("redisUrl") clear_redis_url = bool(cache.get("clearRedisUrl")) if clear_redis_url: delete_setting_value("REDIS_URL") elif redis_url is not None: redis_url = str(redis_url).strip() if redis_url: if not (redis_url.startswith("redis://") or redis_url.startswith("rediss://")): return jsonify({"error": "invalid_redis_url"}), 400 set_setting_value("REDIS_URL", redis_url, category="CACHE") storage_provider = storage.get("provider") oss = storage.get("oss") or {} if storage_provider is not None: storage_provider = str(storage_provider).strip().upper() if storage_provider and storage_provider not in {"AUTO", "LOCAL", "OSS"}: return jsonify({"error": "invalid_storage_provider"}), 400 if storage_provider: set_setting_value("STORAGE_PROVIDER", storage_provider, category="STORAGE") else: delete_setting_value("STORAGE_PROVIDER") oss_endpoint = oss.get("endpoint") oss_bucket = oss.get("bucket") oss_access_key_id = oss.get("accessKeyId") oss_access_key_secret = oss.get("accessKeySecret") oss_upload_prefix = oss.get("uploadPrefix") oss_public_base_url = oss.get("publicBaseUrl") clear_oss_access_key_secret = bool(oss.get("clearAccessKeySecret")) if oss_endpoint is not None: oss_endpoint = str(oss_endpoint).strip().rstrip("/") if oss_endpoint: if not (oss_endpoint.startswith("http://") or oss_endpoint.startswith("https://")): return jsonify({"error": "invalid_oss_endpoint"}), 400 set_setting_value("OSS_ENDPOINT", oss_endpoint, category="STORAGE") else: delete_setting_value("OSS_ENDPOINT") if oss_bucket is not None: oss_bucket = str(oss_bucket).strip() if oss_bucket: set_setting_value("OSS_BUCKET", oss_bucket, category="STORAGE") else: delete_setting_value("OSS_BUCKET") if oss_access_key_id is not None: oss_access_key_id = str(oss_access_key_id).strip() if oss_access_key_id: set_setting_value("OSS_ACCESS_KEY_ID", oss_access_key_id, category="STORAGE") else: delete_setting_value("OSS_ACCESS_KEY_ID") if clear_oss_access_key_secret: delete_setting_value("OSS_ACCESS_KEY_SECRET") elif oss_access_key_secret is not None: oss_access_key_secret = str(oss_access_key_secret).strip() if oss_access_key_secret: set_setting_value("OSS_ACCESS_KEY_SECRET", oss_access_key_secret, category="STORAGE") if oss_upload_prefix is not None: oss_upload_prefix = _normalize_upload_prefix(oss_upload_prefix) if oss_upload_prefix: set_setting_value("OSS_UPLOAD_PREFIX", oss_upload_prefix, category="STORAGE") else: delete_setting_value("OSS_UPLOAD_PREFIX") if oss_public_base_url is not None: oss_public_base_url = str(oss_public_base_url).strip().rstrip("/") if oss_public_base_url: if not (oss_public_base_url.startswith("http://") or oss_public_base_url.startswith("https://")): return jsonify({"error": "invalid_oss_public_base_url"}), 400 set_setting_value("OSS_PUBLIC_BASE_URL", oss_public_base_url, category="STORAGE") else: delete_setting_value("OSS_PUBLIC_BASE_URL") mysql_host = mysql.get("host") mysql_port = mysql.get("port") mysql_user = mysql.get("user") mysql_database = mysql.get("database") mysql_password = mysql.get("password") clear_mysql_password = bool(mysql.get("clearPassword")) if mysql_host is not None: mysql_host = str(mysql_host).strip() if mysql_host: set_setting_value("MYSQL_HOST", mysql_host, category="DB") else: delete_setting_value("MYSQL_HOST") if mysql_port is not None: mysql_port = str(mysql_port).strip() if mysql_port: p = parse_int(mysql_port, 0) if p <= 0 or p > 65535: return jsonify({"error": "invalid_mysql_port"}), 400 set_setting_value("MYSQL_PORT", str(p), category="DB") else: delete_setting_value("MYSQL_PORT") if mysql_user is not None: mysql_user = str(mysql_user).strip() if mysql_user: set_setting_value("MYSQL_USER", mysql_user, category="DB") else: delete_setting_value("MYSQL_USER") if mysql_database is not None: mysql_database = str(mysql_database).strip() if mysql_database: set_setting_value("MYSQL_DATABASE", mysql_database, category="DB") else: delete_setting_value("MYSQL_DATABASE") if clear_mysql_password: delete_setting_value("MYSQL_PASSWORD") elif mysql_password is not None: mysql_password = str(mysql_password).strip() if mysql_password: set_setting_value("MYSQL_PASSWORD", mysql_password, category="DB") after = { "gogsBaseUrl": (get_setting_value("GOGS_BASE_URL") or config.gogs_base_url).rstrip("/"), "hasGogsToken": bool((get_setting_value("GOGS_TOKEN") or "").strip() or config.gogs_token), "payment": { "provider": (get_setting_value("PAY_PROVIDER") or "MOCK").strip().upper(), "hasApiKey": bool((get_setting_value("PAY_API_KEY") or "").strip()), "enableMockPay": (get_setting_value("ENABLE_MOCK_PAY") or "").strip().lower() in {"1", "true", "yes", "on"}, "alipay": { "appId": (get_setting_value("ALIPAY_APP_ID") or "").strip(), "gateway": (get_setting_value("ALIPAY_GATEWAY") or "").strip(), "notifyUrl": (get_setting_value("ALIPAY_NOTIFY_URL") or "").strip(), "returnUrl": (get_setting_value("ALIPAY_RETURN_URL") or "").strip(), "hasPrivateKey": bool((get_setting_value("ALIPAY_PRIVATE_KEY") or "").strip()), "hasPublicKey": bool((get_setting_value("ALIPAY_PUBLIC_KEY") or "").strip()), }, }, "llm": { "provider": (get_setting_value("LLM_PROVIDER") or "").strip(), "baseUrl": (get_setting_value("LLM_BASE_URL") or "").strip(), "model": (get_setting_value("LLM_MODEL") or "").strip(), "hasApiKey": bool((get_setting_value("LLM_API_KEY") or "").strip()), }, "cache": {"hasRedisUrl": bool((get_setting_value("REDIS_URL") or "").strip())}, "storage": { "provider": (get_setting_value("STORAGE_PROVIDER") or "AUTO").strip().upper(), "oss": { "endpoint": (get_setting_value("OSS_ENDPOINT") or "").strip().rstrip("/"), "bucket": (get_setting_value("OSS_BUCKET") or "").strip(), "accessKeyId": (get_setting_value("OSS_ACCESS_KEY_ID") or "").strip(), "hasAccessKeySecret": bool((get_setting_value("OSS_ACCESS_KEY_SECRET") or "").strip()), "uploadPrefix": _normalize_upload_prefix(get_setting_value("OSS_UPLOAD_PREFIX") or "uploads/"), "publicBaseUrl": (get_setting_value("OSS_PUBLIC_BASE_URL") or "").strip().rstrip("/"), }, }, "db": db_status(), } audit("ADMIN", admin["id"], "SETTINGS_UPDATE", "AppSettings", "-", before, after) return jsonify({"ok": True}) @app.post("/admin/redis/test") def api_admin_redis_test() -> Response: _ = require_admin() payload = request.get_json(silent=True) or {} url = (payload.get("url") or get_setting_value("REDIS_URL") or os.environ.get("REDIS_URL") or "").strip() if not url: return jsonify({"error": "redis_not_configured"}), 400 try: import redis # type: ignore except Exception: return jsonify({"error": "redis_client_missing"}), 500 try: r = redis.Redis.from_url(url, socket_connect_timeout=1, socket_timeout=2, decode_responses=False) ok = bool(r.ping()) return jsonify({"ok": ok}) except Exception: return jsonify({"error": "connect_failed"}), 502 @app.get("/admin/db/status") def api_admin_db_status() -> Response: _ = require_admin() probe = {"connectOk": True, "effective": None, "error": None} try: _ = fetch_one("SELECT 1 AS one", ()) probe["connectOk"] = True except Exception as e: probe["connectOk"] = False probe["error"] = str(e) or "connect_failed" try: probe["effective"] = get_active_backend() except Exception: probe["effective"] = None return jsonify({"ok": True, "db": db_status(), "probe": probe}) @app.post("/admin/db/switch") def api_admin_db_switch() -> Response: admin = require_admin() payload = request.get_json(silent=True) or {} target = (payload.get("target") or "").strip().lower() force = bool(payload.get("force")) mysql = payload.get("mysql") or {} if isinstance(mysql, dict) and mysql: mysql_host = mysql.get("host") mysql_port = mysql.get("port") mysql_user = mysql.get("user") mysql_database = mysql.get("database") mysql_password = mysql.get("password") clear_mysql_password = bool(mysql.get("clearPassword")) if mysql_host is not None: mysql_host = str(mysql_host).strip() if mysql_host: set_setting_value("MYSQL_HOST", mysql_host, category="DB") else: delete_setting_value("MYSQL_HOST") if mysql_port is not None: mysql_port = str(mysql_port).strip() if mysql_port: p = parse_int(mysql_port, 0) if p <= 0 or p > 65535: return jsonify({"error": "invalid_mysql_port"}), 400 set_setting_value("MYSQL_PORT", str(p), category="DB") else: delete_setting_value("MYSQL_PORT") if mysql_user is not None: mysql_user = str(mysql_user).strip() if mysql_user: set_setting_value("MYSQL_USER", mysql_user, category="DB") else: delete_setting_value("MYSQL_USER") if mysql_database is not None: mysql_database = str(mysql_database).strip() if mysql_database: set_setting_value("MYSQL_DATABASE", mysql_database, category="DB") else: delete_setting_value("MYSQL_DATABASE") if clear_mysql_password: delete_setting_value("MYSQL_PASSWORD") elif mysql_password is not None: mysql_password = str(mysql_password).strip() if mysql_password: set_setting_value("MYSQL_PASSWORD", mysql_password, category="DB") try: result = switch_database(target=target, force=force) except RuntimeError as e: code = str(e) or "switch_failed" status = 500 if code in {"invalid_target"}: status = 400 elif code in {"mysql_not_configured"}: status = 400 elif code in {"connect_failed", "db_create_failed"}: status = 502 elif code in {"target_not_empty"}: status = 409 elif code in {"migration_running"}: status = 409 elif code in {"pymysql_required"}: status = 500 elif code in {"verify_failed"}: status = 502 return jsonify({"error": code}), status except Exception: return jsonify({"error": "switch_failed"}), 500 audit("ADMIN", admin["id"], "DB_SWITCH", "Database", "-", {"target": target, "force": force}, result) return jsonify(result) @app.post("/admin/mysql/test") def api_admin_mysql_test() -> Response: _ = require_admin() payload = request.get_json(silent=True) or {} config = get_config() host = (payload.get("host") or get_setting_value("MYSQL_HOST") or config.mysql_host or "").strip() port = parse_int(payload.get("port") or get_setting_value("MYSQL_PORT") or "", config.mysql_port or 3306) user = (payload.get("user") or get_setting_value("MYSQL_USER") or config.mysql_user or "").strip() database = (payload.get("database") or get_setting_value("MYSQL_DATABASE") or config.mysql_database or "").strip() password = payload.get("password") if password is None: password = get_setting_value("MYSQL_PASSWORD") if password is None: password = config.mysql_password password = (str(password) if password is not None else "").strip() if not host or not user or not database: return jsonify({"error": "mysql_params_required"}), 400 if port <= 0 or port > 65535: return jsonify({"error": "invalid_mysql_port"}), 400 try: import pymysql except Exception: return jsonify({"error": "pymysql_required"}), 500 try: created_db = False try: conn = pymysql.connect( host=host, port=port, user=user, password=password, database=database, charset="utf8mb4", connect_timeout=3, read_timeout=3, write_timeout=3, autocommit=True, ) except Exception as e: errno = int(getattr(e, "args", [0])[0] or 0) if getattr(e, "args", None) else 0 if errno != 1049: raise server_conn = pymysql.connect( host=host, port=port, user=user, password=password, charset="utf8mb4", connect_timeout=3, read_timeout=3, write_timeout=3, autocommit=True, ) try: esc = database.replace("`", "``") cur = server_conn.cursor() cur.execute( f"CREATE DATABASE IF NOT EXISTS `{esc}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci" ) created_db = True finally: server_conn.close() conn = pymysql.connect( host=host, port=port, user=user, password=password, database=database, charset="utf8mb4", connect_timeout=3, read_timeout=3, write_timeout=3, autocommit=True, ) try: cur = conn.cursor() cur.execute("SELECT 1") _ = cur.fetchone() finally: conn.close() except Exception as e: errno = int(getattr(e, "args", [0])[0] or 0) if getattr(e, "args", None) else 0 msg = str(getattr(e, "args", [""])[1] if getattr(e, "args", None) and len(e.args) > 1 else str(e) or "") msg = (msg or "").strip()[:200] return jsonify({"ok": False, "error": "connect_failed", "errno": errno, "message": msg}), 502 return jsonify({"ok": True, "createdDatabase": bool(created_db)}) @app.get("/admin/gogs/repo") def api_admin_gogs_repo() -> Response: _ = require_admin() owner = (request.args.get("owner") or "").strip() repo = (request.args.get("repo") or "").strip() if not owner or not repo: return jsonify({"error": "owner_repo_required"}), 400 resp = gogs_repo_info(owner, repo) if resp.status_code == 404: return jsonify({"error": "repo_not_found"}), 404 if resp.status_code >= 400: if resp.status_code == 599: return jsonify({"error": "gogs_failed", "status": resp.status_code, "url": resp.url}), 502 return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502 data = resp.json() return jsonify( { "owner": owner, "repo": repo, "fullName": data.get("full_name"), "description": data.get("description"), "private": bool(data.get("private")), "defaultBranch": data.get("default_branch"), "htmlUrl": data.get("html_url"), "cloneUrl": data.get("clone_url"), "sshUrl": data.get("ssh_url"), "updatedAt": data.get("updated_at") or data.get("updated_at_unix"), } ) @app.get("/admin/gogs/user-repos") def api_admin_gogs_user_repos() -> Response: _ = require_admin() owner = (request.args.get("owner") or "").strip() q = (request.args.get("q") or "").strip().lower() if not owner: return jsonify({"error": "owner_required"}), 400 resp = gogs_user_repos(owner) if resp.status_code == 404: return jsonify({"error": "user_not_found"}), 404 if resp.status_code >= 400: if resp.status_code == 599: return jsonify({"error": "gogs_failed", "status": resp.status_code, "url": resp.url}), 502 return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502 repos = [] for item in resp.json() or []: name = (item.get("name") or "").strip() full_name = (item.get("full_name") or "").strip() if q and (q not in name.lower() and q not in full_name.lower()): continue repos.append( { "id": item.get("id"), "name": name, "fullName": full_name, "private": bool(item.get("private")), "defaultBranch": item.get("default_branch"), "description": item.get("description"), "htmlUrl": item.get("html_url"), "updatedAt": item.get("updated_at") or item.get("updated_at_unix"), } ) return jsonify({"items": repos}) @app.get("/admin/gogs/branches") def api_admin_gogs_branches() -> Response: _ = require_admin() owner = (request.args.get("owner") or "").strip() repo = (request.args.get("repo") or "").strip() if not owner or not repo: return jsonify({"error": "owner_repo_required"}), 400 resp = gogs_branches(owner, repo) if resp.status_code == 404: return jsonify({"error": "repo_not_found"}), 404 if resp.status_code >= 400: if resp.status_code == 599: return jsonify({"error": "gogs_failed", "status": resp.status_code, "url": resp.url}), 502 return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502 items = [] for b in resp.json() or []: name = (b.get("name") or "").strip() if not name: continue items.append({"name": name, "commit": (b.get("commit") or {}).get("id")}) items.sort(key=lambda x: x["name"]) return jsonify({"items": items}) @app.get("/admin/gogs/tags") def api_admin_gogs_tags() -> Response: _ = require_admin() owner = (request.args.get("owner") or "").strip() repo = (request.args.get("repo") or "").strip() if not owner or not repo: return jsonify({"error": "owner_repo_required"}), 400 resp = gogs_tags(owner, repo) if resp.status_code == 404: return jsonify({"error": "repo_not_found"}), 404 if resp.status_code >= 400: if resp.status_code == 599: return jsonify({"error": "gogs_failed", "status": resp.status_code, "url": resp.url}), 502 return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502 items = [] for t in resp.json() or []: name = (t.get("name") or "").strip() if not name: continue items.append({"name": name}) items.sort(key=lambda x: x["name"]) return jsonify({"items": items}) @app.get("/admin/gogs/file-text") def api_admin_gogs_file_text() -> Response: _ = require_admin() owner = (request.args.get("owner") or "").strip() repo = (request.args.get("repo") or "").strip() ref = (request.args.get("ref") or "").strip() or "AUTO" path = (request.args.get("path") or "").strip() or "README.md" if not owner or not repo: return jsonify({"error": "owner_repo_required"}), 400 path = path.lstrip("/") if not path: return jsonify({"error": "path_required"}), 400 if ref.upper() == "AUTO": repo_resp = gogs_repo_info(owner, repo) if repo_resp.status_code == 404: return jsonify({"error": "repo_not_found"}), 404 if repo_resp.status_code >= 400: if repo_resp.status_code == 599: return jsonify({"error": "gogs_failed", "status": repo_resp.status_code, "url": repo_resp.url}), 502 return jsonify({"error": "gogs_failed", "status": repo_resp.status_code}), 502 repo_data = repo_resp.json() or {} ref = (repo_data.get("default_branch") or "master").strip() or "master" resp = gogs_contents(owner, repo, path, ref) if resp.status_code == 404: return jsonify({"error": "file_not_found"}), 404 if resp.status_code >= 400: if resp.status_code == 599: return jsonify({"error": "gogs_failed", "status": resp.status_code, "url": resp.url}), 502 return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502 data = resp.json() or {} if (data.get("type") or "").strip().lower() != "file": return jsonify({"error": "not_a_file"}), 400 encoding = (data.get("encoding") or "").strip().lower() if encoding != "base64": return jsonify({"error": "unsupported_encoding"}), 502 content_b64 = (data.get("content") or "").strip() if not content_b64: return jsonify({"error": "empty_content"}), 404 try: raw = base64.b64decode(content_b64, validate=False) except Exception: return jsonify({"error": "decode_failed"}), 502 if len(raw) > 200_000: return jsonify({"error": "file_too_large"}), 413 try: text = raw.decode("utf-8") except Exception: text = raw.decode("utf-8", errors="replace") return jsonify({"owner": owner, "repo": repo, "ref": ref, "path": path, "text": text, "sha": data.get("sha")}) @app.get("/admin/gogs/repos") def api_admin_gogs_repos() -> Response: _ = require_admin() owner = (request.args.get("owner") or "").strip() q = (request.args.get("q") or "").strip().lower() config = get_config() gogs_token = get_setting_value("GOGS_TOKEN") if gogs_token is not None: gogs_token = gogs_token.strip() or None if gogs_token is None: gogs_token = config.gogs_token if not owner and not gogs_token: return jsonify({"error": "gogs_token_required"}), 400 resp = gogs_user_repos(owner) if owner else gogs_my_repos() if resp.status_code == 404 and owner: return jsonify({"error": "user_not_found"}), 404 if resp.status_code >= 400: if resp.status_code == 599: return jsonify({"error": "gogs_failed", "status": resp.status_code, "url": resp.url}), 502 return jsonify({"error": "gogs_failed", "status": resp.status_code}), 502 repos = [] for item in resp.json() or []: name = (item.get("name") or "").strip() full_name = (item.get("full_name") or "").strip() if q and (q not in name.lower() and q not in full_name.lower()): continue repo_owner = ((item.get("owner") or {}).get("username") or "").strip() if not repo_owner and "/" in full_name: repo_owner = full_name.split("/", 1)[0].strip() repos.append( { "id": item.get("id"), "owner": repo_owner, "name": name, "fullName": full_name, "private": bool(item.get("private")), "defaultBranch": item.get("default_branch"), "description": item.get("description"), "htmlUrl": item.get("html_url"), "updatedAt": item.get("updated_at") or item.get("updated_at_unix"), } ) return jsonify({"items": repos}) @app.get("/admin/plans") def api_admin_plans() -> Response: _ = require_admin() rows = fetch_all("SELECT * FROM plans ORDER BY sort DESC, id DESC") return jsonify( [ { "id": row["id"], "name": row["name"], "durationDays": row["duration_days"], "priceCents": row["price_cents"], "enabled": bool(row["enabled"]), "sort": row["sort"], } for row in rows ] ) @app.post("/admin/plans") def api_admin_create_plan() -> Response: admin = require_admin() payload = request.get_json(silent=True) or {} name = (payload.get("name") or "").strip() duration_days = parse_int(payload.get("durationDays"), 0) price_cents = parse_int(payload.get("priceCents"), 0) enabled = 1 if payload.get("enabled", True) else 0 sort = parse_int(payload.get("sort"), 0) if not name or duration_days <= 0 or price_cents <= 0: return jsonify({"error": "invalid_payload"}), 400 cur = execute( "INSERT INTO plans (name, duration_days, price_cents, enabled, sort) VALUES (?, ?, ?, ?, ?)", (name, duration_days, price_cents, enabled, sort), ) audit("ADMIN", admin["id"], "PLAN_CREATE", "Plan", str(cur.lastrowid), None, payload) return jsonify({"id": cur.lastrowid}) @app.put("/admin/plans/") def api_admin_update_plan(plan_id: int) -> Response: admin = require_admin() before_row = fetch_one("SELECT * FROM plans WHERE id = ?", (plan_id,)) if before_row is None: abort(404) payload = request.get_json(silent=True) or {} name = (payload.get("name") or before_row["name"]).strip() duration_days = parse_int(payload.get("durationDays", before_row["duration_days"]), before_row["duration_days"]) price_cents = parse_int(payload.get("priceCents", before_row["price_cents"]), before_row["price_cents"]) enabled = 1 if payload.get("enabled", bool(before_row["enabled"])) else 0 sort = parse_int(payload.get("sort", before_row["sort"]), before_row["sort"]) if not name or duration_days <= 0 or price_cents <= 0: return jsonify({"error": "invalid_payload"}), 400 execute( "UPDATE plans SET name = ?, duration_days = ?, price_cents = ?, enabled = ?, sort = ? WHERE id = ?", (name, duration_days, price_cents, enabled, sort, plan_id), ) after_row = fetch_one("SELECT * FROM plans WHERE id = ?", (plan_id,)) audit( "ADMIN", admin["id"], "PLAN_UPDATE", "Plan", str(plan_id), dict(before_row), dict(after_row) if after_row is not None else None, ) return jsonify({"ok": True}) @app.delete("/admin/plans/") def api_admin_delete_plan(plan_id: int) -> Response: admin = require_admin() before_row = fetch_one("SELECT * FROM plans WHERE id = ?", (plan_id,)) if before_row is None: abort(404) execute("DELETE FROM plans WHERE id = ?", (plan_id,)) audit("ADMIN", admin["id"], "PLAN_DELETE", "Plan", str(plan_id), dict(before_row), None) return jsonify({"ok": True}) @app.get("/admin/resources") def api_admin_resources() -> Response: _ = require_admin() q = (request.args.get("q") or "").strip() resource_type = (request.args.get("type") or "").strip().upper() status = (request.args.get("status") or "").strip().upper() page = max(1, parse_int(request.args.get("page"), 1)) page_size = min(100, max(1, parse_int(request.args.get("pageSize"), 20))) where = [] params: list[Any] = [] if q: where.append("(title LIKE ? OR summary LIKE ? OR repo_owner LIKE ? OR repo_name LIKE ?)") like = f"%{q}%" params.extend([like, like, like, like]) if resource_type in {"FREE", "VIP"}: where.append("type = ?") params.append(resource_type) if status in {"DRAFT", "ONLINE", "OFFLINE"}: where.append("status = ?") params.append(status) where_sql = f"WHERE {' AND '.join(where)}" if where else "" total_row = fetch_one(f"SELECT COUNT(1) AS cnt FROM resources {where_sql}", tuple(params)) total = int(total_row["cnt"]) if total_row is not None else 0 offset = (page - 1) * page_size rows = fetch_all( f""" SELECT * FROM resources {where_sql} ORDER BY updated_at DESC, id DESC LIMIT ? OFFSET ? """, tuple(params + [page_size, offset]), ) items = [] for row in rows: try: tags = json.loads(row["tags_json"] or "[]") except Exception: tags = [] items.append( { "id": row["id"], "title": row["title"], "summary": row["summary"], "type": row["type"], "status": row["status"], "coverUrl": row["cover_url"], "tags": tags, "repoOwner": row["repo_owner"], "repoName": row["repo_name"], "repoPrivate": bool(row["repo_private"]), "repoHtmlUrl": row["repo_html_url"], "defaultRef": row["default_ref"], "updatedAt": row["updated_at"], } ) return jsonify({"items": items, "total": total, "page": page, "pageSize": page_size}) @app.post("/admin/resources") def api_admin_create_resource() -> Response: admin = require_admin() payload = request.get_json(silent=True) or {} title = (payload.get("title") or "").strip() summary = (payload.get("summary") or "").strip() resource_type = (payload.get("type") or "").strip().upper() status = (payload.get("status") or "").strip().upper() cover_url = (payload.get("coverUrl") or "").strip() or None tags = _parse_keywords(payload.get("keywords") if "keywords" in payload else payload.get("tags")) sync_readme = payload.get("syncReadme") if sync_readme is None: sync_readme = True create_repo = payload.get("createRepo") if create_repo is None: create_repo = True repo_owner = (payload.get("repoOwner") or "").strip() repo_name = (payload.get("repoName") or "").strip() repo_private = 1 if payload.get("repoPrivate") else 0 repo_html_url: str | None = None default_ref = (payload.get("defaultRef") or "").strip() requested_ref = default_ref if not title or resource_type not in {"FREE", "VIP"}: return jsonify({"error": "invalid_payload"}), 400 if status not in {"DRAFT", "ONLINE", "OFFLINE"}: status = "DRAFT" base_url, token = _gogs_base_url_and_token() if not base_url: return jsonify({"error": "gogs_base_url_required"}), 400 if not token: return jsonify({"error": "gogs_token_required"}), 400 if create_repo: desired_name = repo_name or _slugify_repo_name(title) desired_owner = repo_owner description = title repo_data: dict[str, Any] | None = None created_resp: requests.Response | None = None for i in range(5): try_name = desired_name if i == 0 else f"{desired_name}-{uuid.uuid4().hex[:6]}" resp = gogs_create_repo(desired_owner, try_name, description, bool(repo_private)) if desired_owner and resp.status_code in {403, 404}: resp = gogs_create_repo("", try_name, description, bool(repo_private)) if resp.status_code == 409: if repo_name: return jsonify({"error": "repo_exists"}), 409 continue if resp.status_code >= 400: msg = _gogs_error_message(resp) upstream_url = _safe_upstream_url(resp) if resp.status_code in {401, 403}: return jsonify({"error": "gogs_unauthorized", "status": resp.status_code, "message": msg, "url": upstream_url}), 400 if resp.status_code == 599: return jsonify({"error": "gogs_unreachable", "status": resp.status_code, "message": msg, "url": upstream_url}), 502 return jsonify({"error": "gogs_failed", "status": resp.status_code, "message": msg, "url": upstream_url}), 502 created_resp = resp repo_data = resp.json() or {} break if created_resp is None or repo_data is None: return jsonify({"error": "repo_create_failed"}), 502 full_name = (repo_data.get("full_name") or "").strip() if "/" in full_name: owner_part, repo_part = full_name.split("/", 1) repo_owner = owner_part.strip() repo_name = repo_part.strip() else: repo_owner = (repo_data.get("owner", {}) or {}).get("username") or repo_owner repo_name = (repo_data.get("name") or repo_name).strip() repo_html_url = (repo_data.get("html_url") or "").strip() or None if repo_data.get("private") is not None: repo_private = 1 if repo_data.get("private") else 0 default_ref = (repo_data.get("default_branch") or "master").strip() if sync_readme: readme = f"# {title}\n\n{summary}\n" if summary else f"# {title}\n" try: gogs_git_write_file(repo_owner, repo_name, default_ref, "README.md", readme, "init README", must_create=True) except GogsGitError as e: if e.code != "file_exists": if e.code in {"ref_required", "gogs_token_required", "invalid_gogs_base_url"}: return jsonify({"error": e.code, "message": e.message}), 400 if e.code == "git_not_found": return jsonify({"error": e.code, "message": e.message}), 501 return jsonify({"error": "readme_sync_failed", "message": e.message}), 502 else: if not repo_owner or not repo_name: return jsonify({"error": "repo_required"}), 400 repo_resp = gogs_repo_info(repo_owner, repo_name) if repo_resp.status_code == 404: return jsonify({"error": "repo_not_found"}), 400 if repo_resp.status_code >= 400: msg = _gogs_error_message(repo_resp) upstream_url = _safe_upstream_url(repo_resp) if repo_resp.status_code in {401, 403}: return jsonify({"error": "gogs_unauthorized", "status": repo_resp.status_code, "message": msg, "url": upstream_url}), 400 if repo_resp.status_code == 599: return jsonify({"error": "gogs_unreachable", "status": repo_resp.status_code, "message": msg, "url": upstream_url}), 502 return jsonify({"error": "gogs_failed", "status": repo_resp.status_code, "message": msg, "url": upstream_url}), 502 repo_data = repo_resp.json() repo_html_url = (repo_data.get("html_url") or "").strip() or None if repo_data.get("private") is not None: repo_private = 1 if repo_data.get("private") else 0 if not default_ref or default_ref.upper() == "AUTO": default_ref = (repo_data.get("default_branch") or "master").strip() if requested_ref and requested_ref.upper() != "AUTO": branches_resp = gogs_branches(repo_owner, repo_name) tags_resp = gogs_tags(repo_owner, repo_name) if branches_resp.status_code >= 400: msg = _gogs_error_message(branches_resp) upstream_url = _safe_upstream_url(branches_resp) if branches_resp.status_code in {401, 403}: return jsonify({"error": "gogs_unauthorized", "status": branches_resp.status_code, "message": msg, "url": upstream_url}), 400 if branches_resp.status_code == 599: return jsonify({"error": "gogs_unreachable", "status": branches_resp.status_code, "message": msg, "url": upstream_url}), 502 return jsonify({"error": "gogs_failed", "status": branches_resp.status_code, "message": msg, "url": upstream_url}), 502 if tags_resp.status_code >= 400: msg = _gogs_error_message(tags_resp) upstream_url = _safe_upstream_url(tags_resp) if tags_resp.status_code in {401, 403}: return jsonify({"error": "gogs_unauthorized", "status": tags_resp.status_code, "message": msg, "url": upstream_url}), 400 if tags_resp.status_code == 599: return jsonify({"error": "gogs_unreachable", "status": tags_resp.status_code, "message": msg, "url": upstream_url}), 502 return jsonify({"error": "gogs_failed", "status": tags_resp.status_code, "message": msg, "url": upstream_url}), 502 exists = False for b in branches_resp.json() or []: if (b.get("name") or "").strip() == requested_ref: exists = True break if not exists: for t in tags_resp.json() or []: if (t.get("name") or "").strip() == requested_ref: exists = True break if not exists: return jsonify({"error": "invalid_ref"}), 400 if sync_readme: readme = f"# {title}\n\n{summary}\n" if summary else f"# {title}\n" try: gogs_git_write_file(repo_owner, repo_name, default_ref, "README.md", readme, "sync README", must_create=False) except GogsGitError as e: if e.code == "file_not_found": try: gogs_git_write_file(repo_owner, repo_name, default_ref, "README.md", readme, "sync README", must_create=True) except GogsGitError as e2: if e2.code in {"ref_required", "gogs_token_required", "invalid_gogs_base_url"}: return jsonify({"error": e2.code, "message": e2.message}), 400 if e2.code == "git_not_found": return jsonify({"error": e2.code, "message": e2.message}), 501 return jsonify({"error": "readme_sync_failed", "message": e2.message}), 502 else: if e.code in {"ref_required", "gogs_token_required", "invalid_gogs_base_url"}: return jsonify({"error": e.code, "message": e.message}), 400 if e.code == "git_not_found": return jsonify({"error": e.code, "message": e.message}), 501 return jsonify({"error": "readme_sync_failed", "message": e.message}), 502 now = isoformat(utcnow()) cur = execute( """ INSERT INTO resources (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (title, summary, resource_type, status, cover_url, json.dumps(tags, ensure_ascii=False), repo_owner, repo_name, repo_private, repo_html_url, default_ref, now, now), ) audit("ADMIN", admin["id"], "RESOURCE_CREATE", "Resource", str(cur.lastrowid), None, payload) return jsonify({"id": cur.lastrowid}) @app.put("/admin/resources/") def api_admin_update_resource(resource_id: int) -> Response: admin = require_admin() before_row = fetch_one("SELECT * FROM resources WHERE id = ?", (resource_id,)) if before_row is None: abort(404) payload = request.get_json(silent=True) or {} title = (payload.get("title") or before_row["title"]).strip() summary = (payload.get("summary") or before_row["summary"]).strip() resource_type = (payload.get("type") or before_row["type"]).strip().upper() status = (payload.get("status") or before_row["status"]).strip().upper() cover_url = (payload.get("coverUrl") if "coverUrl" in payload else before_row["cover_url"]) or None if cover_url is not None: cover_url = str(cover_url).strip() or None if "keywords" in payload or "tags" in payload: tags = _parse_keywords(payload.get("keywords") if "keywords" in payload else payload.get("tags")) tags_json = json.dumps(tags, ensure_ascii=False) else: tags_json = before_row["tags_json"] repo_owner = (payload.get("repoOwner") or before_row["repo_owner"]).strip() repo_name = (payload.get("repoName") or before_row["repo_name"]).strip() default_ref = (payload.get("defaultRef") or before_row["default_ref"]).strip() requested_ref = (payload.get("defaultRef") or "").strip() if not title or resource_type not in {"FREE", "VIP"}: return jsonify({"error": "invalid_payload"}), 400 if status not in {"DRAFT", "ONLINE", "OFFLINE"}: return jsonify({"error": "invalid_status"}), 400 if not repo_owner or not repo_name: return jsonify({"error": "repo_required"}), 400 base_url, token = _gogs_base_url_and_token() if not base_url: return jsonify({"error": "gogs_base_url_required"}), 400 if not token: return jsonify({"error": "gogs_token_required"}), 400 repo_resp = gogs_repo_info(repo_owner, repo_name) if repo_resp.status_code == 404: return jsonify({"error": "repo_not_found"}), 400 if repo_resp.status_code >= 400: msg = _gogs_error_message(repo_resp) if repo_resp.status_code in {401, 403}: return jsonify({"error": "gogs_unauthorized", "status": repo_resp.status_code, "message": msg}), 400 if repo_resp.status_code == 599: return jsonify({"error": "gogs_unreachable", "status": repo_resp.status_code, "message": msg}), 502 return jsonify({"error": "gogs_failed", "status": repo_resp.status_code, "message": msg}), 502 repo_data = repo_resp.json() repo_html_url = (repo_data.get("html_url") or "").strip() or None repo_private = 1 if repo_data.get("private") else 0 if not default_ref or default_ref.upper() == "AUTO": default_ref = (repo_data.get("default_branch") or "master").strip() if requested_ref and requested_ref.upper() != "AUTO": branches_resp = gogs_branches(repo_owner, repo_name) tags_resp = gogs_tags(repo_owner, repo_name) if branches_resp.status_code >= 400: msg = _gogs_error_message(branches_resp) if branches_resp.status_code in {401, 403}: return jsonify({"error": "gogs_unauthorized", "status": branches_resp.status_code, "message": msg}), 400 if branches_resp.status_code == 599: return jsonify({"error": "gogs_unreachable", "status": branches_resp.status_code, "message": msg}), 502 return jsonify({"error": "gogs_failed", "status": branches_resp.status_code, "message": msg}), 502 if tags_resp.status_code >= 400: msg = _gogs_error_message(tags_resp) if tags_resp.status_code in {401, 403}: return jsonify({"error": "gogs_unauthorized", "status": tags_resp.status_code, "message": msg}), 400 if tags_resp.status_code == 599: return jsonify({"error": "gogs_unreachable", "status": tags_resp.status_code, "message": msg}), 502 return jsonify({"error": "gogs_failed", "status": tags_resp.status_code, "message": msg}), 502 exists = False for b in branches_resp.json() or []: if (b.get("name") or "").strip() == requested_ref: exists = True break if not exists: for t in tags_resp.json() or []: if (t.get("name") or "").strip() == requested_ref: exists = True break if not exists: return jsonify({"error": "invalid_ref"}), 400 sync_readme = payload.get("syncReadme") if sync_readme is None: sync_readme = ("title" in payload) or ("summary" in payload) if sync_readme: readme = f"# {title}\n\n{summary}\n" if summary else f"# {title}\n" try: gogs_git_write_file(repo_owner, repo_name, default_ref, "README.md", readme, "sync README", must_create=False) except GogsGitError as e: if e.code == "file_not_found": try: gogs_git_write_file(repo_owner, repo_name, default_ref, "README.md", readme, "sync README", must_create=True) except GogsGitError as e2: if e2.code in {"ref_required", "gogs_token_required", "invalid_gogs_base_url"}: return jsonify({"error": e2.code, "message": e2.message}), 400 if e2.code == "git_not_found": return jsonify({"error": e2.code, "message": e2.message}), 501 return jsonify({"error": "readme_sync_failed", "message": e2.message}), 502 else: if e.code in {"ref_required", "gogs_token_required", "invalid_gogs_base_url"}: return jsonify({"error": e.code, "message": e.message}), 400 if e.code == "git_not_found": return jsonify({"error": e.code, "message": e.message}), 501 return jsonify({"error": "readme_sync_failed", "message": e.message}), 502 execute( """ UPDATE resources SET title = ?, summary = ?, type = ?, status = ?, cover_url = ?, tags_json = ?, repo_owner = ?, repo_name = ?, repo_private = ?, repo_html_url = ?, default_ref = ?, updated_at = ? WHERE id = ? """, ( title, summary, resource_type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, isoformat(utcnow()), resource_id, ), ) after_row = fetch_one("SELECT * FROM resources WHERE id = ?", (resource_id,)) audit( "ADMIN", admin["id"], "RESOURCE_UPDATE", "Resource", str(resource_id), dict(before_row), dict(after_row) if after_row is not None else None, ) return jsonify({"ok": True}) @app.delete("/admin/resources/") def api_admin_delete_resource(resource_id: int) -> Response: admin = require_admin() before_row = fetch_one("SELECT * FROM resources WHERE id = ?", (resource_id,)) if before_row is None: abort(404) repo_owner = (before_row["repo_owner"] or "").strip() repo_name = (before_row["repo_name"] or "").strip() if repo_owner and repo_name: resp = gogs_delete_repo(repo_owner, repo_name) if resp.status_code not in {204, 404} and resp.status_code >= 400: msg = _gogs_error_message(resp) upstream_url = _safe_upstream_url(resp) if resp.status_code in {401, 403}: return jsonify({"error": "gogs_unauthorized", "status": resp.status_code, "message": msg, "url": upstream_url}), 400 if resp.status_code == 599: return jsonify({"error": "gogs_unreachable", "status": resp.status_code, "message": msg, "url": upstream_url}), 502 return jsonify({"error": "gogs_failed", "status": resp.status_code, "message": msg, "url": upstream_url}), 502 upload_names: set[str] = set() upload_names |= _extract_upload_names(before_row["cover_url"]) upload_names |= _extract_upload_names(before_row["summary"]) execute("DELETE FROM resources WHERE id = ?", (resource_id,)) audit("ADMIN", admin["id"], "RESOURCE_DELETE", "Resource", str(resource_id), dict(before_row), None) _delete_upload_files(upload_names) return jsonify({"ok": True}) def _download_cache_glob_prefix(*, resource_id: int, owner: str, repo: str) -> str: safe_owner = re.sub(r"[^a-zA-Z0-9._-]+", "_", (owner or "").strip())[:50] or "owner" safe_repo = re.sub(r"[^a-zA-Z0-9._-]+", "_", (repo or "").strip())[:50] or "repo" rid = int(resource_id or 0) return f"res{rid}__{safe_owner}__{safe_repo}__" def _admin_cache_entry_from_path(p: Path) -> dict[str, Any]: try: st = p.stat() except Exception: st = None meta = _read_download_cache_meta(p) ttl_remaining = None if st is not None and _download_cache_ttl_seconds > 0: ttl_remaining = max(0, int(_download_cache_ttl_seconds - (time.time() - float(st.st_mtime)))) commit = None ref = None if isinstance(meta, dict): commit = (meta.get("commit") or "").strip() or None ref = (meta.get("ref") or "").strip() or None return { "fileName": p.name, "path": str(p), "bytes": int(st.st_size) if st is not None else None, "mtime": int(st.st_mtime) if st is not None else None, "ttlRemainingSeconds": ttl_remaining, "ref": ref, "commit": commit, "meta": meta, "ready": bool(st is not None and st.st_size > 0), } def _admin_cache_building_for(*, resource_id: int, owner: str, repo: str) -> list[dict[str, Any]]: rid = int(resource_id or 0) prefix = f"res{rid}:{owner}/{repo}@" out: list[dict[str, Any]] = [] with _download_lock: for k, v in _download_jobs.items(): if not isinstance(k, str) or not k.startswith(prefix): continue if not isinstance(v, dict): continue out.append( { "key": k, "state": v.get("state"), "updatedAt": v.get("updatedAt"), "error": v.get("error"), "ref": v.get("ref"), "commit": v.get("commit"), "cacheKey": v.get("cacheKey"), } ) out.sort(key=lambda x: float(x.get("updatedAt") or 0), reverse=True) return out[:20] @app.get("/admin/resources//download-cache/summary") def api_admin_resource_download_cache_summary(resource_id: int) -> Response: _ = require_admin() row = fetch_one("SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ?", (resource_id,)) if row is None: abort(404) owner, repo, default_ref = row["repo_owner"], row["repo_name"], row["default_ref"] cache_dir = _download_cache_dir() prefix = _download_cache_glob_prefix(resource_id=resource_id, owner=owner, repo=repo) files = list(cache_dir.glob(prefix + "*.zip")) latest = None latest_mtime = -1 for p in files: try: st = p.stat() except Exception: continue if st.st_size <= 0: continue if float(st.st_mtime) > float(latest_mtime): latest_mtime = float(st.st_mtime) latest = p latest_entry = _admin_cache_entry_from_path(latest) if latest else None building = _admin_cache_building_for(resource_id=resource_id, owner=owner, repo=repo) return jsonify( { "ok": True, "resourceId": int(resource_id), "owner": owner, "repo": repo, "defaultRef": default_ref, "count": len(files), "latest": latest_entry, "jobs": building, } ) @app.get("/admin/resources//download-cache/list") def api_admin_resource_download_cache_list(resource_id: int) -> Response: _ = require_admin() row = fetch_one("SELECT repo_owner, repo_name FROM resources WHERE id = ?", (resource_id,)) if row is None: abort(404) owner, repo = row["repo_owner"], row["repo_name"] cache_dir = _download_cache_dir() prefix = _download_cache_glob_prefix(resource_id=resource_id, owner=owner, repo=repo) files = list(cache_dir.glob(prefix + "*.zip")) items: list[dict[str, Any]] = [] for p in files: items.append(_admin_cache_entry_from_path(p)) items.sort(key=lambda x: float(x.get("mtime") or 0), reverse=True) return jsonify({"ok": True, "items": items[:50], "total": len(items)}) @app.get("/admin/resources//download-cache/status") def api_admin_resource_download_cache_status(resource_id: int) -> Response: _ = require_admin() row = fetch_one("SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ?", (resource_id,)) if row is None: abort(404) owner, repo, default_ref = row["repo_owner"], row["repo_name"], row["default_ref"] ref = (request.args.get("ref") or "").strip() or default_ref resolved = _resolve_download_commit(owner=owner, repo=repo, ref=ref) commit = resolved.get("commit") if resolved.get("ok") else None resolved_kind = resolved.get("kind") or "unknown" cache_key = commit or ref cache_path = _download_cache_path(resource_id=resource_id, owner=owner, repo=repo, cache_key=cache_key) if _download_cache_ready(cache_path): return jsonify({"ok": True, "ready": True, "state": "ready", "ref": ref, "commit": commit, "cacheKey": cache_key}) key = f"res{int(resource_id or 0)}:{owner}/{repo}@{cache_key}" with _download_lock: job = _download_jobs.get(key) or {} return jsonify( { "ok": True, "ready": False, "state": job.get("state") or "building", "error": job.get("error"), "ref": ref, "commit": commit, "cacheKey": cache_key, "refKind": resolved_kind, } ) @app.post("/admin/resources//download-cache/refresh") def api_admin_resource_download_cache_refresh(resource_id: int) -> Response: admin = require_admin() row = fetch_one("SELECT repo_owner, repo_name, default_ref FROM resources WHERE id = ?", (resource_id,)) if row is None: abort(404) payload = request.get_json(silent=True) or {} owner, repo, default_ref = row["repo_owner"], row["repo_name"], row["default_ref"] ref = (payload.get("ref") or "").strip() or default_ref resolved = _resolve_download_commit(owner=owner, repo=repo, ref=ref) commit = resolved.get("commit") if resolved.get("ok") else None resolved_kind = resolved.get("kind") or "unknown" st = _ensure_download_ready( resource_id=resource_id, owner=owner, repo=repo, ref=ref, commit=commit, resolved_kind=resolved_kind, force=True, ) audit("ADMIN", admin["id"], "DOWNLOAD_CACHE_REFRESH", "Resource", str(resource_id), None, {"ref": ref, "commit": commit}) return jsonify( { "ok": True, "ready": bool(st.get("ready")), "state": st.get("state") or ("ready" if st.get("ready") else "building"), "error": st.get("error"), "ref": ref, "commit": commit, "cacheKey": st.get("cacheKey") or (commit or ref), } ) @app.delete("/admin/resources//download-cache") def api_admin_resource_download_cache_clear(resource_id: int) -> Response: admin = require_admin() row = fetch_one("SELECT repo_owner, repo_name FROM resources WHERE id = ?", (resource_id,)) if row is None: abort(404) owner, repo = row["repo_owner"], row["repo_name"] commit = (request.args.get("commit") or "").strip() or None clear_all = (request.args.get("all") or "").strip() in {"1", "true", "True"} cache_dir = _download_cache_dir() prefix = _download_cache_glob_prefix(resource_id=resource_id, owner=owner, repo=repo) files = list(cache_dir.glob(prefix + "*.zip")) removed = 0 for p in files: if not clear_all and commit: meta = _read_download_cache_meta(p) or {} if (meta.get("commit") or "").strip().lower() != commit.lower(): continue try: p.unlink() removed += 1 except Exception: pass try: mp = _download_cache_meta_path(p) if mp.exists(): mp.unlink() except Exception: pass audit("ADMIN", admin["id"], "DOWNLOAD_CACHE_CLEAR", "Resource", str(resource_id), None, {"commit": commit, "all": clear_all, "removed": removed}) return jsonify({"ok": True, "removed": removed}) @app.get("/admin/resources//download-cache/file") def api_admin_resource_download_cache_file(resource_id: int) -> Response: _ = require_admin() row = fetch_one("SELECT repo_owner, repo_name FROM resources WHERE id = ?", (resource_id,)) if row is None: abort(404) owner, repo = row["repo_owner"], row["repo_name"] commit = (request.args.get("commit") or "").strip() if not commit or not _looks_like_commit(commit): return jsonify({"error": "commit_required"}), 400 cache_path = _download_cache_path(resource_id=resource_id, owner=owner, repo=repo, cache_key=commit.lower()) if not cache_path.exists(): abort(404) filename = f"{owner}-{repo}-{commit[:12]}.zip".replace("/", "-") f = open(cache_path, "rb") resp = send_file( f, mimetype="application/zip", as_attachment=True, download_name=filename, conditional=True, max_age=0, ) resp.call_on_close(f.close) resp.direct_passthrough = False return resp @app.get("/admin/orders") def api_admin_orders() -> Response: _ = require_admin() q = (request.args.get("q") or "").strip() status = (request.args.get("status") or "").strip().upper() page = max(1, parse_int(request.args.get("page"), 1)) page_size = min(100, max(1, parse_int(request.args.get("pageSize"), 20))) where = [] params: list[Any] = [] if q: where.append("(o.id LIKE ? OR u.phone LIKE ?)") like = f"%{q}%" params.extend([like, like]) if status in {"PENDING", "PAID", "CLOSED", "FAILED"}: where.append("o.status = ?") params.append(status) where_sql = f"WHERE {' AND '.join(where)}" if where else "" total_row = fetch_one( f""" SELECT COUNT(1) AS cnt FROM orders o JOIN users u ON u.id = o.user_id {where_sql} """, tuple(params), ) total = int(total_row["cnt"]) if total_row is not None else 0 offset = (page - 1) * page_size rows = fetch_all( f""" SELECT o.*, u.phone as user_phone FROM orders o JOIN users u ON u.id = o.user_id {where_sql} ORDER BY o.created_at DESC LIMIT ? OFFSET ? """, tuple(params + [page_size, offset]), ) items = [] for row in rows: items.append( { "id": row["id"], "status": row["status"], "amountCents": row["amount_cents"], "userId": row["user_id"], "userPhone": row["user_phone"], "createdAt": row["created_at"], "paidAt": row["paid_at"], "planSnapshot": json.loads(row["plan_snapshot_json"]), } ) return jsonify({"items": items, "total": total, "page": page, "pageSize": page_size}) @app.get("/admin/orders/") def api_admin_get_order(order_id: str) -> Response: _ = require_admin() row = fetch_one( """ SELECT o.*, u.phone as user_phone FROM orders o JOIN users u ON u.id = o.user_id WHERE o.id = ? """, (order_id,), ) if row is None: abort(404) return jsonify( { "id": row["id"], "status": row["status"], "amountCents": row["amount_cents"], "userId": row["user_id"], "userPhone": row["user_phone"], "planId": row["plan_id"], "payChannel": row["pay_channel"], "payTradeNo": row["pay_trade_no"], "createdAt": row["created_at"], "paidAt": row["paid_at"], "planSnapshot": json.loads(row["plan_snapshot_json"]), } ) @app.post("/admin/orders") def api_admin_create_order() -> Response: admin = require_admin() payload = request.get_json(silent=True) or {} user_id = parse_int(payload.get("userId"), 0) user_phone = (payload.get("userPhone") or "").strip() plan_id = parse_int(payload.get("planId"), 0) status = (payload.get("status") or "PENDING").strip().upper() if status not in {"PENDING", "PAID", "CLOSED", "FAILED"}: return jsonify({"error": "invalid_status"}), 400 if user_id <= 0 and not user_phone: return jsonify({"error": "user_required"}), 400 if plan_id <= 0: return jsonify({"error": "plan_required"}), 400 user_row = None if user_id > 0: user_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,)) else: user_row = fetch_one("SELECT * FROM users WHERE phone = ?", (user_phone,)) if user_row is None: return jsonify({"error": "user_not_found"}), 404 plan = fetch_one("SELECT * FROM plans WHERE id = ? AND enabled = 1", (plan_id,)) if plan is None: return jsonify({"error": "plan_not_found"}), 404 order_id = uuid.uuid4().hex snapshot = { "name": plan["name"], "durationDays": plan["duration_days"], "priceCents": plan["price_cents"], } created_at = isoformat(utcnow()) paid_at = isoformat(utcnow()) if status == "PAID" else None execute( """ INSERT INTO orders (id, user_id, plan_id, amount_cents, status, created_at, paid_at, plan_snapshot_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( order_id, user_row["id"], plan["id"], plan["price_cents"], status, created_at, paid_at, json.dumps(snapshot, ensure_ascii=False), ), ) if status == "PAID": extend_vip(int(user_row["id"]), int(snapshot["durationDays"])) after_row = fetch_one("SELECT * FROM orders WHERE id = ?", (order_id,)) audit( "ADMIN", admin["id"], "ORDER_CREATE", "Order", order_id, None, dict(after_row) if after_row is not None else None, ) return jsonify({"ok": True, "id": order_id}) @app.put("/admin/orders/") def api_admin_update_order(order_id: str) -> Response: admin = require_admin() before_row = fetch_one("SELECT * FROM orders WHERE id = ?", (order_id,)) if before_row is None: abort(404) payload = request.get_json(silent=True) or {} status = (payload.get("status") or "").strip().upper() if status not in {"PENDING", "PAID", "CLOSED", "FAILED"}: return jsonify({"error": "invalid_status"}), 400 before_status = before_row["status"] if before_status == "PAID" and status != "PAID": return jsonify({"error": "cannot_change_paid_order"}), 409 if before_status == "CLOSED" and status != "CLOSED": return jsonify({"error": "cannot_change_closed_order"}), 409 allowed = False if before_status == status: allowed = True elif before_status == "PENDING" and status in {"PAID", "CLOSED", "FAILED"}: allowed = True elif before_status == "FAILED" and status in {"PAID", "CLOSED"}: allowed = True if not allowed: return jsonify({"error": "invalid_transition"}), 409 paid_at = before_row["paid_at"] if status == "PAID": if before_status != "PAID": paid_at = isoformat(utcnow()) snapshot = json.loads(before_row["plan_snapshot_json"]) extend_vip(int(before_row["user_id"]), int(snapshot.get("durationDays") or 0)) else: paid_at = None execute("UPDATE orders SET status = ?, paid_at = ? WHERE id = ?", (status, paid_at, order_id)) after_row = fetch_one("SELECT * FROM orders WHERE id = ?", (order_id,)) audit( "ADMIN", admin["id"], "ORDER_UPDATE", "Order", order_id, dict(before_row), dict(after_row) if after_row is not None else None, ) return jsonify({"ok": True}) @app.delete("/admin/orders/") def api_admin_delete_order(order_id: str) -> Response: admin = require_admin() before_row = fetch_one("SELECT * FROM orders WHERE id = ?", (order_id,)) if before_row is None: abort(404) if before_row["status"] == "PAID": return jsonify({"error": "cannot_delete_paid_order"}), 409 execute("DELETE FROM orders WHERE id = ?", (order_id,)) audit("ADMIN", admin["id"], "ORDER_DELETE", "Order", order_id, dict(before_row), None) return jsonify({"ok": True}) @app.get("/admin/users") def api_admin_users() -> Response: _ = require_admin() q = (request.args.get("q") or "").strip() status = (request.args.get("status") or "").strip().upper() vip = (request.args.get("vip") or "").strip().upper() page = max(1, parse_int(request.args.get("page"), 1)) page_size = min(100, max(1, parse_int(request.args.get("pageSize"), 20))) now_dt = utcnow() now_iso = isoformat(now_dt) where = [] params: list[Any] = [] if q: where.append("(phone LIKE ?)") params.append(f"%{q}%") if status in {"ACTIVE", "DISABLED"}: where.append("status = ?") params.append(status) if vip in {"VIP", "ACTIVE"}: where.append("(vip_expire_at IS NOT NULL AND vip_expire_at > ?)") params.append(now_iso) elif vip in {"NONVIP", "NOVIP", "INACTIVE"}: where.append("(vip_expire_at IS NULL OR vip_expire_at <= ?)") params.append(now_iso) where_sql = f"WHERE {' AND '.join(where)}" if where else "" total_row = fetch_one(f"SELECT COUNT(1) AS cnt FROM users {where_sql}", tuple(params)) total = int(total_row["cnt"]) if total_row is not None else 0 offset = (page - 1) * page_size rows = fetch_all( f""" SELECT * FROM users {where_sql} ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ? """, tuple(params + [page_size, offset]), ) items = [] for row in rows: expire_dt = parse_datetime(row["vip_expire_at"]) if row["vip_expire_at"] else None vip_active = bool(expire_dt is not None and expire_dt > now_dt) vip_remaining_days = None if vip_active and expire_dt is not None: seconds = (expire_dt - now_dt).total_seconds() vip_remaining_days = max(0, int((seconds + 86399) // 86400)) items.append( { "id": row["id"], "phone": row["phone"], "status": row["status"], "vipActive": vip_active, "vipRemainingDays": vip_remaining_days, "vipExpireAt": row["vip_expire_at"], "createdAt": row["created_at"], } ) return jsonify({"items": items, "total": total, "page": page, "pageSize": page_size}) @app.get("/admin/download-logs") def api_admin_download_logs() -> Response: _ = require_admin() q = (request.args.get("q") or "").strip() typ = (request.args.get("type") or "").strip().upper() state = (request.args.get("state") or "").strip().upper() page = max(1, parse_int(request.args.get("page"), 1)) page_size = min(100, max(1, parse_int(request.args.get("pageSize"), 20))) where = [] params: list[Any] = [] if q: like = f"%{q}%" where.append("(u.phone LIKE ? OR dl.resource_title_snapshot LIKE ? OR dl.ref_snapshot LIKE ? OR dl.ip LIKE ?)") params.extend([like, like, like, like]) if typ in {"FREE", "VIP"}: where.append("dl.resource_type_snapshot = ?") params.append(typ) if state == "DELETED": where.append("r.id IS NULL") elif state == "OFFLINE": where.append("(r.id IS NOT NULL AND r.status != 'ONLINE')") elif state == "ONLINE": where.append("r.status = 'ONLINE'") where_sql = f"WHERE {' AND '.join(where)}" if where else "" total_row = fetch_one( f""" SELECT COUNT(1) AS cnt FROM download_logs dl JOIN users u ON u.id = dl.user_id LEFT JOIN resources r ON r.id = dl.resource_id {where_sql} """, tuple(params), ) total = int(total_row["cnt"]) if total_row is not None else 0 offset = (page - 1) * page_size rows = fetch_all( f""" SELECT dl.id, dl.user_id, u.phone AS user_phone, dl.resource_id, dl.resource_title_snapshot, dl.resource_type_snapshot, dl.ref_snapshot, dl.downloaded_at, dl.ip, dl.user_agent, r.id AS r_id, r.status AS r_status, r.type AS r_type FROM download_logs dl JOIN users u ON u.id = dl.user_id LEFT JOIN resources r ON r.id = dl.resource_id {where_sql} ORDER BY dl.downloaded_at DESC, dl.id DESC LIMIT ? OFFSET ? """, tuple(params + [page_size, offset]), ) items = [] for row in rows: if row["r_id"] is None: resource_state = "DELETED" elif row["r_status"] != "ONLINE": resource_state = "OFFLINE" else: resource_state = "ONLINE" items.append( { "id": row["id"], "userId": row["user_id"], "userPhone": row["user_phone"], "resourceId": row["resource_id"], "resourceTitle": row["resource_title_snapshot"], "resourceType": row["resource_type_snapshot"], "currentResourceType": row["r_type"] if row["r_id"] is not None else None, "ref": row["ref_snapshot"], "downloadedAt": row["downloaded_at"], "resourceState": resource_state, "ip": row["ip"], "userAgent": row["user_agent"], } ) return jsonify({"items": items, "total": total, "page": page, "pageSize": page_size}) @app.put("/admin/users/") def api_admin_update_user(user_id: int) -> Response: admin = require_admin() before_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,)) if before_row is None: abort(404) payload = request.get_json(silent=True) or {} status = (payload.get("status") or before_row["status"]).strip().upper() if status not in {"ACTIVE", "DISABLED"}: return jsonify({"error": "invalid_status"}), 400 execute("UPDATE users SET status = ? WHERE id = ?", (status, user_id)) after_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,)) audit( "ADMIN", admin["id"], "USER_UPDATE", "User", str(user_id), dict(before_row), dict(after_row) if after_row is not None else None, ) return jsonify({"ok": True}) @app.post("/admin/users//password-reset") def api_admin_reset_user_password(user_id: int) -> Response: admin = require_admin() before_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,)) if before_row is None: abort(404) payload = request.get_json(silent=True) or {} password = payload.get("password") or "" if not password: return jsonify({"error": "password_required"}), 400 if len(password) < 6: return jsonify({"error": "password_too_short"}), 400 execute("UPDATE users SET password_hash = ? WHERE id = ?", (generate_password_hash(password), user_id)) after_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,)) audit( "ADMIN", admin["id"], "USER_PASSWORD_RESET", "User", str(user_id), dict(before_row), dict(after_row) if after_row is not None else None, ) return jsonify({"ok": True}) @app.post("/admin/users//vip-adjust") def api_admin_vip_adjust(user_id: int) -> Response: admin = require_admin() payload = request.get_json(silent=True) or {} duration_days = parse_int(payload.get("addDays"), 0) if duration_days == 0: return jsonify({"error": "addDays_required"}), 400 before_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,)) if before_row is None: abort(404) extend_vip(user_id, duration_days) after_row = fetch_one("SELECT * FROM users WHERE id = ?", (user_id,)) before_expire = before_row["vip_expire_at"] or "" after_expire = after_row["vip_expire_at"] if after_row is not None else "" delta = f"{duration_days:+d} 天" create_user_message( user_id, "会员期限变更", "\n".join( [ f"管理员已调整你的会员天数:{delta}", f"调整前到期:{before_expire or '无'}", f"调整后到期:{after_expire or '无'}", ] ), sender_type="ADMIN", sender_id=int(admin["id"]), ) audit( "ADMIN", admin["id"], "VIP_ADJUST", "User", str(user_id), dict(before_row), dict(after_row) if after_row is not None else None, ) return jsonify({"ok": True}) @app.get("/admin/messages") def api_admin_messages() -> Response: admin = require_admin() _ = admin q = (request.args.get("q") or "").strip() sender_type = (request.args.get("senderType") or "").strip().upper() read_raw = (request.args.get("read") or "").strip().lower() user_id = parse_int(request.args.get("user_id"), 0) page = max(parse_int(request.args.get("page"), 1), 1) page_size = min(max(parse_int(request.args.get("pageSize"), 20), 1), 50) offset = (page - 1) * page_size where: list[str] = [] params: list[Any] = [] if user_id > 0: where.append("m.user_id = ?") params.append(user_id) if q: where.append("(u.phone LIKE ? OR m.title LIKE ? OR m.content LIKE ?)") like = f"%{q}%" params.extend([like, like, like]) if sender_type in {"SYSTEM", "ADMIN"}: where.append("m.sender_type = ?") params.append(sender_type) if read_raw in {"1", "true", "yes", "on", "read"}: where.append("m.read_at IS NOT NULL") elif read_raw in {"0", "false", "no", "off", "unread"}: where.append("m.read_at IS NULL") where_sql = f"WHERE {' AND '.join(where)}" if where else "" total_row = fetch_one( f""" SELECT COUNT(1) AS cnt FROM user_messages m JOIN users u ON u.id = m.user_id {where_sql} """, tuple(params), ) total = int(total_row["cnt"] if total_row is not None else 0) rows = fetch_all( f""" SELECT m.id, m.user_id, m.title, m.content, m.created_at, m.read_at, m.sender_type, m.sender_id, u.phone AS user_phone FROM user_messages m JOIN users u ON u.id = m.user_id {where_sql} ORDER BY m.created_at DESC, m.id DESC LIMIT ? OFFSET ? """, tuple(params + [page_size, offset]), ) items = [] for row in rows: items.append( { "id": row["id"], "userId": row["user_id"], "userPhone": row["user_phone"], "title": row["title"], "content": row["content"], "createdAt": row["created_at"], "readAt": row["read_at"], "read": bool(row["read_at"]), "senderType": row["sender_type"] or "SYSTEM", "senderId": row["sender_id"], } ) return jsonify({"items": items, "total": total, "page": page, "pageSize": page_size}) @app.post("/admin/messages/send") def api_admin_message_send() -> Response: admin = require_admin() payload = request.get_json(silent=True) or {} user_id = parse_int(payload.get("userId"), 0) phone = (payload.get("phone") or "").strip() title = (payload.get("title") or "").strip() content = (payload.get("content") or "").strip() if not title or not content: return jsonify({"error": "title_and_content_required"}), 400 if user_id <= 0 and not phone: return jsonify({"error": "user_required"}), 400 if user_id <= 0 and phone: row = fetch_one("SELECT id FROM users WHERE phone = ?", (phone,)) if row is None: return jsonify({"error": "user_not_found"}), 404 user_id = int(row["id"]) msg_id = create_user_message(user_id, title, content, sender_type="ADMIN", sender_id=int(admin["id"])) audit( "ADMIN", admin["id"], "MESSAGE_SEND", "User", str(user_id), None, {"title": title[:120], "contentLen": len(content)}, ) return jsonify({"ok": True, "id": msg_id}) @app.post("/admin/messages/broadcast") def api_admin_message_broadcast() -> Response: admin = require_admin() payload = request.get_json(silent=True) or {} audience = (payload.get("audience") or "ALL").strip().upper() title = (payload.get("title") or "").strip() content = (payload.get("content") or "").strip() if not title or not content: return jsonify({"error": "title_and_content_required"}), 400 now_iso = isoformat(utcnow()) or "" where = ["status = 'ACTIVE'"] params: list[Any] = [] if audience == "VIP": where.append("vip_expire_at IS NOT NULL AND vip_expire_at > ?") params.append(now_iso) elif audience == "NONVIP": where.append("(vip_expire_at IS NULL OR vip_expire_at <= ?)") params.append(now_iso) where_sql = f"WHERE {' AND '.join(where)}" rows = fetch_all(f"SELECT id FROM users {where_sql}", tuple(params)) user_ids = [int(r["id"]) for r in rows] for uid in user_ids: create_user_message(uid, title, content, sender_type="ADMIN", sender_id=int(admin["id"])) audit( "ADMIN", admin["id"], "MESSAGE_BROADCAST", "Users", audience, None, {"title": title[:120], "contentLen": len(content), "count": len(user_ids)}, ) return jsonify({"ok": True, "count": len(user_ids)}) @app.delete("/admin/messages/") def api_admin_message_delete(message_id: int) -> Response: admin = require_admin() before = fetch_one("SELECT * FROM user_messages WHERE id = ?", (message_id,)) if before is None: abort(404) execute("DELETE FROM user_messages WHERE id = ?", (message_id,)) audit( "ADMIN", admin["id"], "MESSAGE_DELETE", "UserMessage", str(message_id), dict(before), None, ) return jsonify({"ok": True})