| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491 |
- import os
- import tempfile
- import unittest
- class AdminApiTests(unittest.TestCase):
- @classmethod
- def setUpClass(cls) -> None:
- cls._tmpdir = tempfile.TemporaryDirectory()
- os.environ["DATABASE_PATH"] = os.path.join(cls._tmpdir.name, "test.db")
- os.environ["SECRET_KEY"] = "test-secret"
- os.environ["GOGS_BASE_URL"] = "http://127.0.0.1:9"
- import app as appmod
- cls.appmod = appmod
- cls.app = appmod.create_app()
- @classmethod
- def tearDownClass(cls) -> None:
- cls._tmpdir.cleanup()
- def test_admin_login_and_list_endpoints(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = client.get("/admin/stats")
- self.assertEqual(200, r.status_code)
- self.assertIn("users", r.json)
- self.assertIn("resources", r.json)
- self.assertIn("orders", r.json)
- self.assertIn("revenue", r.json)
- self.assertIn("downloads", r.json)
- self.assertIn("messages", r.json)
- self.assertIn("backend", r.json)
- r = client.get("/admin/resources?page=1&pageSize=10")
- self.assertEqual(200, r.status_code)
- self.assertIn("items", r.json)
- self.assertIn("total", r.json)
- self.assertIn("page", r.json)
- self.assertIn("pageSize", r.json)
- r = client.get("/admin/users?page=1&pageSize=10")
- self.assertEqual(200, r.status_code)
- self.assertIn("items", r.json)
- self.assertIn("total", r.json)
- self.assertIn("page", r.json)
- self.assertIn("pageSize", r.json)
- r = client.get("/admin/orders?page=1&pageSize=10")
- self.assertEqual(200, r.status_code)
- self.assertIn("items", r.json)
- self.assertIn("total", r.json)
- self.assertIn("page", r.json)
- self.assertIn("pageSize", r.json)
- def test_admin_shortcut_pages_redirect(self) -> None:
- client = self.app.test_client()
- r = client.get("/admin")
- self.assertIn(r.status_code, {301, 302, 308})
- self.assertIn("/ui/admin", r.headers.get("Location") or "")
- r = client.get("/admin/login")
- self.assertIn(r.status_code, {301, 302, 308})
- self.assertIn("/ui/admin/login", r.headers.get("Location") or "")
- def test_admin_stats_requires_login(self) -> None:
- client = self.app.test_client()
- r = client.get("/admin/stats")
- self.assertEqual(401, r.status_code)
- def test_admin_users_vip_filters_and_remaining_days(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- from datetime import timedelta
- from server.core import isoformat, utcnow
- from server.db import execute
- from werkzeug.security import generate_password_hash
- with self.app.app_context():
- now = utcnow()
- cur = execute(
- """
- INSERT INTO users
- (phone, password_hash, status, vip_expire_at, created_at)
- VALUES
- (?, ?, ?, ?, ?)
- """,
- ("13900008881", generate_password_hash("p"), "ACTIVE", isoformat(now + timedelta(days=5)), isoformat(now)),
- )
- vip_user_id = int(cur.lastrowid)
- cur = execute(
- """
- INSERT INTO users
- (phone, password_hash, status, vip_expire_at, created_at)
- VALUES
- (?, ?, ?, ?, ?)
- """,
- ("13900008882", generate_password_hash("p"), "ACTIVE", None, isoformat(now)),
- )
- nonvip_user_id = int(cur.lastrowid)
- r = client.get("/admin/users?page=1&pageSize=50&vip=VIP")
- self.assertEqual(200, r.status_code)
- items = r.json.get("items") or []
- self.assertTrue(any(it.get("id") == vip_user_id for it in items))
- self.assertFalse(any(it.get("id") == nonvip_user_id for it in items))
- vip_item = next(it for it in items if it.get("id") == vip_user_id)
- self.assertEqual(True, vip_item.get("vipActive"))
- self.assertTrue(int(vip_item.get("vipRemainingDays") or 0) >= 1)
- r = client.get("/admin/users?page=1&pageSize=50&vip=NONVIP")
- self.assertEqual(200, r.status_code)
- items = r.json.get("items") or []
- self.assertFalse(any(it.get("id") == vip_user_id for it in items))
- self.assertTrue(any(it.get("id") == nonvip_user_id for it in items))
- def test_admin_can_reset_user_password(self) -> None:
- user_client = self.app.test_client()
- r = user_client.post("/auth/register", json={"phone": "13900007771", "password": "oldpass123"})
- self.assertEqual(200, r.status_code)
- user_id = int(r.json.get("id"))
- admin_client = self.app.test_client()
- r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = admin_client.post(f"/admin/users/{user_id}/password-reset", json={"password": "newpass123"})
- self.assertEqual(200, r.status_code)
- login_client = self.app.test_client()
- r = login_client.post("/auth/login", json={"phone": "13900007771", "password": "oldpass123"})
- self.assertEqual(401, r.status_code)
- r = login_client.post("/auth/login", json={"phone": "13900007771", "password": "newpass123"})
- self.assertEqual(200, r.status_code)
- def test_vip_adjust_creates_user_message_and_can_mark_read(self) -> None:
- user_client = self.app.test_client()
- phone = "13900006661"
- password = "pass1234"
- r = user_client.post("/auth/register", json={"phone": phone, "password": password})
- self.assertEqual(200, r.status_code)
- user_id = int(r.json.get("id"))
- admin_client = self.app.test_client()
- r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = admin_client.post(f"/admin/users/{user_id}/vip-adjust", json={"addDays": 7})
- self.assertEqual(200, r.status_code)
- r = user_client.post("/auth/login", json={"phone": phone, "password": password})
- self.assertEqual(200, r.status_code)
- r = user_client.get("/me/messages?page=1&pageSize=10")
- self.assertEqual(200, r.status_code)
- self.assertTrue(int(r.json.get("unreadCount") or 0) >= 1)
- items = r.json.get("items") or []
- self.assertTrue(len(items) >= 1)
- msg_id = int(items[0].get("id"))
- self.assertIn("会员", items[0].get("title") or "")
- r = user_client.put(f"/me/messages/{msg_id}/read")
- self.assertEqual(200, r.status_code)
- r = user_client.get("/me/messages?page=1&pageSize=10")
- self.assertEqual(200, r.status_code)
- self.assertEqual(0, int(r.json.get("unreadCount") or 0))
- def test_ui_messages_page_renders(self) -> None:
- client = self.app.test_client()
- r = client.get("/ui/messages")
- self.assertEqual(200, r.status_code)
- self.assertIn(b'data-page="messages"', r.data)
- def test_admin_can_send_list_and_delete_messages(self) -> None:
- user_client = self.app.test_client()
- r = user_client.post("/auth/register", json={"phone": "13900005551", "password": "p123456"})
- self.assertEqual(200, r.status_code)
- user_id = int(r.json.get("id"))
- admin_client = self.app.test_client()
- r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = admin_client.post(
- "/admin/messages/send",
- json={"userId": user_id, "title": "测试消息", "content": "内容A"},
- )
- self.assertEqual(200, r.status_code)
- msg_id = int(r.json.get("id"))
- r = admin_client.get(f"/admin/messages?page=1&pageSize=50&user_id={user_id}")
- self.assertEqual(200, r.status_code)
- items = r.json.get("items") or []
- self.assertTrue(any(int(it.get("id")) == msg_id for it in items))
- r = admin_client.delete(f"/admin/messages/{msg_id}")
- self.assertEqual(200, r.status_code)
- r = admin_client.get(f"/admin/messages?page=1&pageSize=50&user_id={user_id}")
- self.assertEqual(200, r.status_code)
- items = r.json.get("items") or []
- self.assertFalse(any(int(it.get("id")) == msg_id for it in items))
- def test_admin_can_broadcast_messages_by_audience(self) -> None:
- from datetime import timedelta
- from server.core import isoformat, utcnow
- from server.db import execute
- vip_client = self.app.test_client()
- r = vip_client.post("/auth/register", json={"phone": "13900005561", "password": "p123456"})
- self.assertEqual(200, r.status_code)
- vip_user_id = int(r.json.get("id"))
- nonvip_client = self.app.test_client()
- r = nonvip_client.post("/auth/register", json={"phone": "13900005562", "password": "p123456"})
- self.assertEqual(200, r.status_code)
- nonvip_user_id = int(r.json.get("id"))
- with self.app.app_context():
- now = utcnow()
- execute("UPDATE users SET vip_expire_at = ? WHERE id = ?", (isoformat(now + timedelta(days=3)), vip_user_id))
- execute("UPDATE users SET vip_expire_at = NULL WHERE id = ?", (nonvip_user_id,))
- admin_client = self.app.test_client()
- r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = admin_client.post("/admin/messages/broadcast", json={"audience": "VIP", "title": "VIP通知", "content": "内容B"})
- self.assertEqual(200, r.status_code)
- self.assertEqual(1, int(r.json.get("count") or 0))
- r = admin_client.get(f"/admin/messages?page=1&pageSize=50&user_id={vip_user_id}&senderType=ADMIN")
- self.assertEqual(200, r.status_code)
- items = r.json.get("items") or []
- self.assertTrue(any((it.get("title") or "") == "VIP通知" for it in items))
- r = admin_client.get(f"/admin/messages?page=1&pageSize=50&user_id={nonvip_user_id}&senderType=ADMIN")
- self.assertEqual(200, r.status_code)
- items = r.json.get("items") or []
- self.assertFalse(any((it.get("title") or "") == "VIP通知" for it in items))
- def test_admin_db_status_and_switch_endpoints(self) -> None:
- admin_client = self.app.test_client()
- r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = admin_client.get("/admin/db/status")
- self.assertEqual(200, r.status_code)
- self.assertEqual(True, bool(r.json.get("ok")))
- self.assertIn("db", r.json)
- self.assertIn("active", r.json["db"])
- self.assertIn("probe", r.json)
- self.assertIn("connectOk", r.json["probe"])
- self.assertEqual(True, bool(r.json["probe"].get("connectOk")))
- self.assertIn("effective", r.json["probe"])
- r = admin_client.post("/admin/db/switch", json={"target": "sqlite"})
- self.assertEqual(200, r.status_code)
- self.assertEqual(True, bool(r.json.get("ok")))
- r = admin_client.post("/admin/db/switch", json={"target": "mysql"})
- self.assertEqual(400, r.status_code)
- self.assertEqual("mysql_not_configured", r.json.get("error"))
- def test_gogs_endpoints_fail_gracefully(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- old_env_token = os.environ.pop("GOGS_TOKEN", None)
- old_env_redis_url = os.environ.get("REDIS_URL")
- os.environ["REDIS_URL"] = "redis://127.0.0.1:0/0"
- try:
- r = client.put("/admin/settings", json={"clearGogsToken": True})
- self.assertEqual(200, r.status_code)
- r = client.get("/admin/gogs/repo?owner=aa&repo=bb")
- self.assertEqual(502, r.status_code)
- self.assertEqual("gogs_failed", r.json.get("error"))
- r = client.get("/admin/gogs/branches?owner=aa&repo=bb")
- self.assertEqual(502, r.status_code)
- self.assertEqual("gogs_failed", r.json.get("error"))
- r = client.get("/admin/gogs/tags?owner=aa&repo=bb")
- self.assertEqual(502, r.status_code)
- self.assertEqual("gogs_failed", r.json.get("error"))
- r = client.get("/admin/gogs/repos?owner=aa&q=bb")
- self.assertEqual(502, r.status_code)
- self.assertEqual("gogs_failed", r.json.get("error"))
- r = client.get("/admin/gogs/repos?q=bb")
- self.assertEqual(400, r.status_code)
- self.assertEqual("gogs_token_required", r.json.get("error"))
- finally:
- if old_env_token is not None:
- os.environ["GOGS_TOKEN"] = old_env_token
- else:
- os.environ.pop("GOGS_TOKEN", None)
- if old_env_redis_url is None:
- os.environ.pop("REDIS_URL", None)
- else:
- os.environ["REDIS_URL"] = old_env_redis_url
- old_token = os.environ.get("GOGS_TOKEN")
- os.environ["GOGS_TOKEN"] = "test-token"
- try:
- app2 = self.appmod.create_app()
- client2 = app2.test_client()
- r = client2.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = client2.get("/admin/gogs/repos?q=bb")
- self.assertEqual(502, r.status_code)
- self.assertEqual("gogs_failed", r.json.get("error"))
- finally:
- if old_token is None:
- os.environ.pop("GOGS_TOKEN", None)
- else:
- os.environ["GOGS_TOKEN"] = old_token
- def test_alipay_callback_can_mark_order_paid_idempotently(self) -> None:
- from decimal import Decimal
- user_client = self.app.test_client()
- r = user_client.post("/auth/register", json={"phone": "13900004441", "password": "p123456"})
- self.assertEqual(200, r.status_code)
- r = user_client.post("/auth/login", json={"phone": "13900004441", "password": "p123456"})
- self.assertEqual(200, r.status_code)
- plans = user_client.get("/plans").json
- self.assertTrue(isinstance(plans, list) and len(plans) >= 1)
- plan_id = int(plans[0]["id"])
- order = user_client.post("/orders", json={"planId": plan_id}).json
- order_id = order["id"]
- amount_cents = int(order["amountCents"])
- total_amount = (Decimal(amount_cents) / Decimal(100)).quantize(Decimal("0.01"))
- admin_client = self.app.test_client()
- r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- from Crypto.Hash import SHA256
- from Crypto.PublicKey import RSA
- from Crypto.Signature import pkcs1_15
- key = RSA.generate(2048)
- private_pem = key.export_key(format="PEM").decode("utf-8")
- public_pem = key.publickey().export_key(format="PEM").decode("utf-8")
- r = admin_client.put(
- "/admin/settings",
- json={
- "payment": {
- "provider": "ALIPAY",
- "enableMockPay": False,
- "alipay": {
- "appId": "2021000123456789",
- "gateway": "https://openapi.alipay.com/gateway.do",
- "notifyUrl": "https://example.com/pay/callback",
- "returnUrl": "https://example.com/ui/me",
- "privateKey": private_pem,
- "publicKey": public_pem,
- },
- }
- },
- )
- self.assertEqual(200, r.status_code)
- params = {
- "app_id": "2021000123456789",
- "out_trade_no": order_id,
- "trade_no": "202603210000000000001",
- "trade_status": "TRADE_SUCCESS",
- "total_amount": str(total_amount),
- }
- sign_content = "&".join([f"{k}={params[k]}" for k in sorted(params.keys())])
- sig = pkcs1_15.new(RSA.import_key(private_pem)).sign(SHA256.new(sign_content.encode("utf-8")))
- params["sign_type"] = "RSA2"
- params["sign"] = __import__("base64").b64encode(sig).decode("utf-8")
- r = user_client.post("/pay/callback", data=params)
- self.assertEqual(200, r.status_code)
- self.assertEqual(b"success", (r.data or b"").strip())
- r = user_client.post("/pay/callback", data=params)
- self.assertEqual(200, r.status_code)
- self.assertEqual(b"success", (r.data or b"").strip())
- from server.db import fetch_one
- with self.app.app_context():
- row = fetch_one("SELECT * FROM orders WHERE id = ?", (order_id,))
- self.assertEqual("PAID", row["status"])
- self.assertEqual("ALIPAY", row["pay_channel"])
- self.assertEqual("202603210000000000001", row["pay_trade_no"])
- def test_repo_tree_guest_allowed_and_file_preview_protected(self) -> None:
- client = self.app.test_client()
- from server.core import isoformat, utcnow
- from server.db import execute
- from werkzeug.security import generate_password_hash
- with self.app.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
- )
- resource_id = int(cur.lastrowid)
- cur = execute(
- """
- INSERT INTO users
- (phone, password_hash, status, vip_expire_at, created_at)
- VALUES
- (?, ?, ?, ?, ?)
- """,
- ("13800000000", generate_password_hash("p"), "ACTIVE", None, now),
- )
- user_id = int(cur.lastrowid)
- import server.routes as routesmod
- from requests import Response
- import base64
- original = routesmod.gogs_contents
- def stub_gogs_contents(owner, repo, path, ref):
- resp = Response()
- resp.status_code = 200
- resp.url = "http://stub.local"
- if not path:
- payload = [
- {"name": "docs", "path": "docs", "type": "dir", "size": 0},
- {"name": "README.md", "path": "README.md", "type": "file", "size": 1},
- {"name": "main.py", "path": "main.py", "type": "file", "size": 1},
- {"name": ".env", "path": ".env", "type": "file", "size": 1},
- ]
- else:
- b = base64.b64encode(("content:" + str(path)).encode("utf-8")).decode("ascii")
- payload = {"type": "file", "encoding": "base64", "size": 10, "content": b}
- resp._content = __import__("json").dumps(payload).encode("utf-8")
- return resp
- try:
- routesmod.gogs_contents = stub_gogs_contents
- r = client.get(f"/resources/{resource_id}/repo/tree?ref=master&path=")
- self.assertEqual(200, r.status_code)
- by_path = {it.get("path"): it for it in (r.json.get("items") or [])}
- self.assertTrue(bool(by_path.get("docs", {}).get("guestAllowed")))
- self.assertTrue(bool(by_path.get("README.md", {}).get("guestAllowed")))
- self.assertFalse(bool(by_path.get("main.py", {}).get("guestAllowed")))
- self.assertFalse(bool(by_path.get(".env", {}).get("guestAllowed")))
- r = client.get(f"/resources/{resource_id}/repo/file?ref=master&path=main.py")
- self.assertEqual(401, r.status_code)
- self.assertEqual("login_required", r.json.get("error"))
- r = client.get(f"/resources/{resource_id}/repo/file?ref=master&path=README.md")
- self.assertEqual(200, r.status_code)
- self.assertIn("content", r.json)
- client2 = self.app.test_client()
- with client2.session_transaction() as s:
- s["user_id"] = user_id
- r = client2.get(f"/resources/{resource_id}/repo/tree?ref=master&path=")
- self.assertEqual(200, r.status_code)
- for it in r.json.get("items") or []:
- self.assertTrue(bool(it.get("guestAllowed")))
- finally:
- routesmod.gogs_contents = original
- def test_my_download_logs_paginated_and_mark_deleted(self) -> None:
- from server.core import isoformat, utcnow
- from server.db import execute
- from werkzeug.security import generate_password_hash
- import tempfile
- with self.app.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO users
- (phone, password_hash, status, vip_expire_at, created_at)
- VALUES
- (?, ?, ?, ?, ?)
- """,
- ("13900000000", generate_password_hash("p"), "ACTIVE", None, now),
- )
- user_id = int(cur.lastrowid)
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- ("res", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
- )
- resource_id = int(cur.lastrowid)
- import server.routes as routesmod
- class DummyUpstream:
- status_code = 500
- original_archive_get = routesmod.gogs_archive_get
- original_git_zip = routesmod.gogs_git_archive_zip
- original_resolve = routesmod.gogs_resolve_ref_commit
- tmp = tempfile.NamedTemporaryFile(delete=False)
- tmp.write(b"ZIP")
- tmp.close()
- try:
- routesmod.gogs_archive_get = lambda owner, repo, ref: DummyUpstream()
- routesmod.gogs_git_archive_zip = lambda owner, repo, ref: tmp.name
- routesmod.gogs_resolve_ref_commit = lambda owner, repo, ref: {"ok": False, "ref": ref, "commit": None, "kind": "unknown"}
- client = self.app.test_client()
- with client.session_transaction() as s:
- s["user_id"] = user_id
- r = client.post(f"/resources/{resource_id}/download", json={"ref": "master"})
- self.assertEqual(200, r.status_code)
- r = client.get("/me/downloads?page=1&pageSize=10")
- self.assertEqual(200, r.status_code)
- self.assertEqual(1, int(r.json.get("total") or 0))
- self.assertEqual(1, len(r.json.get("items") or []))
- it = (r.json.get("items") or [None])[0]
- self.assertEqual(resource_id, it.get("resourceId"))
- self.assertEqual("res", it.get("resourceTitle"))
- self.assertEqual("FREE", it.get("resourceType"))
- self.assertEqual("FREE", it.get("currentResourceType"))
- self.assertEqual("ONLINE", it.get("resourceState"))
- with self.app.app_context():
- execute("DELETE FROM resources WHERE id = ?", (resource_id,))
- r = client.get("/me/downloads?page=1&pageSize=10")
- self.assertEqual(200, r.status_code)
- it = (r.json.get("items") or [None])[0]
- self.assertEqual("DELETED", it.get("resourceState"))
- self.assertIsNone(it.get("currentResourceType"))
- finally:
- routesmod.gogs_archive_get = original_archive_get
- routesmod.gogs_git_archive_zip = original_git_zip
- routesmod.gogs_resolve_ref_commit = original_resolve
- try:
- os.unlink(tmp.name)
- except Exception:
- pass
- def test_admin_download_logs_list_and_filters(self) -> None:
- from server.core import isoformat, utcnow
- from server.db import execute
- from werkzeug.security import generate_password_hash
- import tempfile
- with self.app.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO users
- (phone, password_hash, status, vip_expire_at, created_at)
- VALUES
- (?, ?, ?, ?, ?)
- """,
- ("13900009901", generate_password_hash("p"), "ACTIVE", None, now),
- )
- user_id = int(cur.lastrowid)
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- ("res2", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
- )
- resource_id = int(cur.lastrowid)
- import server.routes as routesmod
- class DummyUpstream:
- status_code = 500
- original_archive_get = routesmod.gogs_archive_get
- original_git_zip = routesmod.gogs_git_archive_zip
- original_resolve = routesmod.gogs_resolve_ref_commit
- tmp = tempfile.NamedTemporaryFile(delete=False)
- tmp.write(b"ZIP")
- tmp.close()
- try:
- routesmod.gogs_archive_get = lambda owner, repo, ref: DummyUpstream()
- routesmod.gogs_git_archive_zip = lambda owner, repo, ref: tmp.name
- routesmod.gogs_resolve_ref_commit = lambda owner, repo, ref: {"ok": False, "ref": ref, "commit": None, "kind": "unknown"}
- user_client = self.app.test_client()
- with user_client.session_transaction() as s:
- s["user_id"] = user_id
- r = user_client.post(f"/resources/{resource_id}/download", json={"ref": "master"})
- self.assertEqual(200, r.status_code)
- admin_client = self.app.test_client()
- r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = admin_client.get("/admin/download-logs?page=1&pageSize=10&q=13900009901")
- self.assertEqual(200, r.status_code)
- self.assertEqual(1, int(r.json.get("total") or 0))
- it = (r.json.get("items") or [None])[0]
- self.assertEqual("13900009901", it.get("userPhone"))
- self.assertEqual(resource_id, it.get("resourceId"))
- self.assertEqual("res2", it.get("resourceTitle"))
- self.assertEqual("FREE", it.get("resourceType"))
- self.assertEqual("FREE", it.get("currentResourceType"))
- self.assertEqual("ONLINE", it.get("resourceState"))
- r = admin_client.get("/admin/download-logs?page=1&pageSize=10&q=13900009901&type=FREE")
- self.assertEqual(200, r.status_code)
- self.assertEqual(1, int(r.json.get("total") or 0))
- with self.app.app_context():
- execute("DELETE FROM resources WHERE id = ?", (resource_id,))
- r = admin_client.get("/admin/download-logs?page=1&pageSize=10&q=13900009901&state=DELETED")
- self.assertEqual(200, r.status_code)
- self.assertEqual(1, int(r.json.get("total") or 0))
- it = (r.json.get("items") or [None])[0]
- self.assertEqual("DELETED", it.get("resourceState"))
- finally:
- routesmod.gogs_archive_get = original_archive_get
- routesmod.gogs_git_archive_zip = original_git_zip
- routesmod.gogs_resolve_ref_commit = original_resolve
- try:
- os.unlink(tmp.name)
- except Exception:
- pass
- def test_resources_cover_url_defaults_when_missing(self) -> None:
- client = self.app.test_client()
- from server.core import isoformat, utcnow
- from server.db import execute
- with self.app.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
- )
- resource_id = int(cur.lastrowid)
- r = client.get("/resources?page=1&pageSize=10")
- self.assertEqual(200, r.status_code)
- items = r.json.get("items") or []
- self.assertTrue(any(it.get("id") == resource_id for it in items))
- for it in items:
- if it.get("id") == resource_id:
- self.assertEqual("/static/images/resources/default.png", it.get("coverUrl"))
- r = client.get(f"/resources/{resource_id}")
- self.assertEqual(200, r.status_code)
- self.assertEqual("/static/images/resources/default.png", r.json.get("coverUrl"))
- def test_admin_settings_saved_to_db(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = client.put("/admin/settings", json={"clearGogsToken": True})
- self.assertEqual(200, r.status_code)
- r = client.get("/admin/settings")
- self.assertEqual(200, r.status_code)
- self.assertIn("gogsBaseUrl", r.json)
- self.assertIn("hasGogsToken", r.json)
- self.assertIn("payment", r.json)
- self.assertIn("llm", r.json)
- self.assertIn("cache", r.json)
- self.assertFalse(bool(r.json.get("cache", {}).get("hasRedisUrl")))
- self.assertFalse(bool(r.json.get("hasGogsToken")))
- r = client.put(
- "/admin/settings",
- json={
- "gogsBaseUrl": "http://127.0.0.1:9",
- "gogsToken": "db-test-token",
- "clearGogsToken": False,
- "payment": {"provider": "MOCK", "enableMockPay": True, "apiKey": "pay-key", "clearApiKey": False},
- "llm": {"provider": "DeepSeek", "baseUrl": "https://api.example.com", "model": "deepseek-chat", "apiKey": "llm-key", "clearApiKey": False},
- "cache": {"redisUrl": "redis://:pwd@127.0.0.1:6379/0", "clearRedisUrl": False},
- },
- )
- self.assertEqual(200, r.status_code)
- r = client.get("/admin/settings")
- self.assertEqual(200, r.status_code)
- self.assertTrue(bool(r.json.get("hasGogsToken")))
- self.assertEqual("MOCK", r.json.get("payment", {}).get("provider"))
- self.assertTrue(bool(r.json.get("payment", {}).get("hasApiKey")))
- self.assertTrue(bool(r.json.get("payment", {}).get("enableMockPay")))
- self.assertEqual("DeepSeek", r.json.get("llm", {}).get("provider"))
- self.assertEqual("https://api.example.com", r.json.get("llm", {}).get("baseUrl"))
- self.assertEqual("deepseek-chat", r.json.get("llm", {}).get("model"))
- self.assertTrue(bool(r.json.get("llm", {}).get("hasApiKey")))
- self.assertTrue(bool(r.json.get("cache", {}).get("hasRedisUrl")))
- self.assertTrue(str(r.json.get("cache", {}).get("redisUrl") or "").endswith("127.0.0.1:6379/0"))
- r = client.put("/admin/settings", json={"cache": {"clearRedisUrl": True}})
- self.assertEqual(200, r.status_code)
- r = client.get("/admin/settings")
- self.assertEqual(200, r.status_code)
- self.assertFalse(bool(r.json.get("cache", {}).get("hasRedisUrl")))
- r = client.get("/admin/gogs/repos?q=bb")
- self.assertEqual(502, r.status_code)
- self.assertEqual("gogs_failed", r.json.get("error"))
- def test_admin_settings_storage_oss_fields(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = client.put(
- "/admin/settings",
- json={
- "storage": {
- "provider": "AUTO",
- "oss": {
- "endpoint": "https://oss-cn-hangzhou.aliyuncs.com",
- "bucket": "bkt",
- "accessKeyId": "id",
- "accessKeySecret": "sec",
- "clearAccessKeySecret": False,
- "uploadPrefix": "uploads/",
- "publicBaseUrl": "https://cdn.example.com",
- },
- }
- },
- )
- self.assertEqual(200, r.status_code)
- r = client.get("/admin/settings")
- self.assertEqual(200, r.status_code)
- storage = r.json.get("storage") or {}
- oss = storage.get("oss") or {}
- self.assertEqual("AUTO", storage.get("provider"))
- self.assertEqual("https://oss-cn-hangzhou.aliyuncs.com", oss.get("endpoint"))
- self.assertEqual("bkt", oss.get("bucket"))
- self.assertEqual("id", oss.get("accessKeyId"))
- self.assertEqual("uploads/", oss.get("uploadPrefix"))
- self.assertEqual("https://cdn.example.com", oss.get("publicBaseUrl"))
- self.assertTrue(bool(oss.get("hasAccessKeySecret")))
- self.assertNotIn("accessKeySecret", oss)
- r = client.put("/admin/settings", json={"storage": {"oss": {"clearAccessKeySecret": True}}})
- self.assertEqual(200, r.status_code)
- r = client.get("/admin/settings")
- self.assertEqual(200, r.status_code)
- storage = r.json.get("storage") or {}
- oss = storage.get("oss") or {}
- self.assertFalse(bool(oss.get("hasAccessKeySecret")))
- def test_admin_uploads_used_detection_accepts_oss_urls(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- from pathlib import Path
- from server.core import isoformat, utcnow
- from server.db import execute
- project_root = Path(__file__).resolve().parents[1]
- uploads_dir = project_root / "static" / "uploads"
- uploads_dir.mkdir(parents=True, exist_ok=True)
- used_name = "cccccccccccccccccccccccccccccccc.png"
- unused_name = "dddddddddddddddddddddddddddddddd.webp"
- (uploads_dir / used_name).write_bytes(b"x")
- (uploads_dir / unused_name).write_bytes(b"y")
- with self.app.app_context():
- now = isoformat(utcnow())
- execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- (
- "t",
- f"img: https://cdn.example.com/uploads/{used_name}",
- "FREE",
- "DRAFT",
- None,
- "[]",
- "root",
- "r",
- 0,
- "http://gogs.local/root/r",
- "master",
- now,
- now,
- ),
- )
- r = client.get("/admin/uploads")
- self.assertEqual(200, r.status_code)
- items = r.json.get("items") or []
- used_item = next((it for it in items if it.get("name") == used_name), None)
- unused_item = next((it for it in items if it.get("name") == unused_name), None)
- self.assertIsNotNone(used_item)
- self.assertIsNotNone(unused_item)
- self.assertTrue(bool(used_item.get("used")))
- self.assertFalse(bool(unused_item.get("used")))
- r = client.post("/admin/uploads/cleanup-unused", json={})
- self.assertEqual(200, r.status_code)
- self.assertTrue(int((r.json or {}).get("deletedCount") or 0) >= 1)
- self.assertTrue((uploads_dir / used_name).exists())
- self.assertFalse((uploads_dir / unused_name).exists())
- def test_admin_create_resource_requires_gogs_token(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- old_env_token = os.environ.pop("GOGS_TOKEN", None)
- old_env_base = os.environ.get("GOGS_BASE_URL")
- os.environ["GOGS_BASE_URL"] = "http://127.0.0.1:9"
- try:
- r = client.put("/admin/settings", json={"clearGogsToken": True})
- self.assertEqual(200, r.status_code)
- r = client.post(
- "/admin/resources",
- json={"title": "t", "summary": "s", "type": "FREE", "status": "DRAFT"},
- )
- self.assertEqual(400, r.status_code)
- self.assertEqual("gogs_token_required", r.json.get("error"))
- finally:
- if old_env_token is not None:
- os.environ["GOGS_TOKEN"] = old_env_token
- if old_env_base is None:
- os.environ.pop("GOGS_BASE_URL", None)
- else:
- os.environ["GOGS_BASE_URL"] = old_env_base
- def test_gogs_create_repo_request_shape(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = client.put("/admin/settings", json={"gogsBaseUrl": "http://gogs.local", "gogsToken": "t"})
- self.assertEqual(200, r.status_code)
- import server.gogs as gogsmod
- from requests import Response
- calls = []
- original = gogsmod.requests.request
- def stub_request(method, url, params=None, json=None, headers=None, timeout=None):
- calls.append({"method": method, "url": url, "params": params, "json": json, "headers": headers, "timeout": timeout})
- resp = Response()
- resp.status_code = 500
- resp.url = url
- resp._content = b'{"message":"boom"}'
- return resp
- try:
- gogsmod.requests.request = stub_request
- r = client.post(
- "/admin/resources",
- json={"title": "t", "summary": "s", "type": "FREE", "status": "DRAFT", "createRepo": True},
- )
- self.assertEqual(502, r.status_code)
- self.assertEqual("gogs_failed", r.json.get("error"))
- self.assertTrue(len(calls) >= 2)
- self.assertEqual("POST", calls[0]["method"])
- self.assertEqual("http://gogs.local/api/v1/user/repos", calls[0]["url"])
- self.assertFalse(bool((calls[0]["params"] or {}).get("token")))
- self.assertEqual("token t", (calls[0]["headers"] or {}).get("Authorization"))
- self.assertTrue(bool((calls[1]["params"] or {}).get("token")))
- finally:
- gogsmod.requests.request = original
- def test_create_resource_skips_readme_sync_when_contents_api_missing(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = client.put("/admin/settings", json={"gogsBaseUrl": "http://gogs.local", "gogsToken": "t"})
- self.assertEqual(200, r.status_code)
- import server.routes as routesmod
- import server.gogs as gogsmod
- from requests import Response
- original = gogsmod.requests.request
- original_git_write = routesmod.gogs_git_write_file
- def stub_request(method, url, params=None, json=None, headers=None, timeout=None):
- resp = Response()
- if params and "token" in params:
- resp.url = f"{url}?token={params.get('token')}"
- else:
- resp.url = url
- if url.endswith("/api/v1/user/repos") and method.upper() == "POST":
- resp.status_code = 201
- resp._content = b'{"full_name":"root/110","name":"110","owner":{"username":"root"},"default_branch":"master","private":false,"html_url":"http://gogs.local/root/110"}'
- return resp
- if "/contents/README.md" in url and method.upper() in {"POST", "PUT"}:
- resp.status_code = 404
- resp.headers["Content-Type"] = "text/html; charset=utf-8"
- resp._content = b"<!DOCTYPE html><html><head></head><body>Not Found</body></html>"
- return resp
- if "/contents/README.md" in url and method.upper() == "GET":
- resp.status_code = 404
- resp.headers["Content-Type"] = "application/json; charset=utf-8"
- resp._content = b'{"message":"not found"}'
- return resp
- resp.status_code = 500
- resp._content = b'{"message":"unexpected"}'
- return resp
- try:
- gogsmod.requests.request = stub_request
- routesmod.gogs_git_write_file = lambda owner, repo, branch, path, content_text, message, must_create: {"branch": branch, "commit": "c" * 40}
- r = client.post(
- "/admin/resources",
- json={"title": "t", "summary": "s", "type": "FREE", "status": "DRAFT", "createRepo": True, "syncReadme": True},
- )
- self.assertEqual(200, r.status_code)
- self.assertTrue(int(r.json.get("id") or 0) > 0)
- finally:
- gogsmod.requests.request = original
- routesmod.gogs_git_write_file = original_git_write
- def test_repo_file_crud_error_mapping(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- import server.routes as routesmod
- import server.gogs as gogsmod
- from server.core import isoformat, utcnow
- from server.db import execute
- with self.app.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- ("t", "s", "FREE", "DRAFT", None, "[]", "root", "r", 0, None, "master", now, now),
- )
- resource_id = int(cur.lastrowid)
- original_write = routesmod.gogs_git_write_file
- original_del = routesmod.gogs_git_delete_path
- try:
- def raise_exists(*_a, **_k):
- raise gogsmod.GogsGitError("file_exists", "exists")
- routesmod.gogs_git_write_file = raise_exists
- r = client.post(f"/resources/{resource_id}/repo/file", json={"ref": "master", "path": "a.txt", "content": "x", "message": "m"})
- self.assertEqual(409, r.status_code)
- self.assertEqual("file_exists", r.json.get("error"))
- def raise_not_found(*_a, **_k):
- raise gogsmod.GogsGitError("file_not_found", "missing")
- routesmod.gogs_git_write_file = raise_not_found
- r = client.put(f"/resources/{resource_id}/repo/file", json={"ref": "master", "path": "a.txt", "content": "x", "message": "m"})
- self.assertEqual(404, r.status_code)
- self.assertEqual("file_not_found", r.json.get("error"))
- def raise_git_missing(*_a, **_k):
- raise gogsmod.GogsGitError("git_not_found", "git required")
- routesmod.gogs_git_delete_path = raise_git_missing
- r = client.delete(f"/resources/{resource_id}/repo/file", json={"ref": "master", "path": "a.txt", "message": "m"})
- self.assertEqual(501, r.status_code)
- self.assertEqual("git_not_found", r.json.get("error"))
- finally:
- routesmod.gogs_git_write_file = original_write
- routesmod.gogs_git_delete_path = original_del
- def test_repo_commits_endpoint_works(self) -> None:
- old_req = os.environ.get("REQUIRE_LOGIN_TO_VIEW_REPO")
- os.environ["REQUIRE_LOGIN_TO_VIEW_REPO"] = "0"
- try:
- app2 = self.appmod.create_app()
- client = app2.test_client()
- import server.routes as routesmod
- from server.core import isoformat, utcnow
- from server.db import execute
- with app2.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
- )
- resource_id = int(cur.lastrowid)
- original = routesmod.gogs_commits
- try:
- from requests import Response
- def stub_commits(owner, repo, *, ref, path, limit):
- resp = Response()
- resp.status_code = 200
- resp.url = f"http://gogs.local/api/v1/repos/{owner}/{repo}/commits"
- resp.headers["Content-Type"] = "application/json"
- resp._content = b'[{"sha":"' + (b"a" * 40) + b'","commit":{"author":{"name":"u","date":"2026-01-01T00:00:00Z"},"message":"init\\n"}}]'
- return resp
- routesmod.gogs_commits = stub_commits
- r = client.get(f"/resources/{resource_id}/repo/commits?ref=master&limit=10")
- self.assertEqual(200, r.status_code)
- self.assertEqual("master", r.json.get("ref"))
- self.assertTrue(isinstance(r.json.get("items"), list))
- self.assertEqual(1, len(r.json.get("items") or []))
- finally:
- routesmod.gogs_commits = original
- finally:
- if old_req is None:
- os.environ.pop("REQUIRE_LOGIN_TO_VIEW_REPO", None)
- else:
- os.environ["REQUIRE_LOGIN_TO_VIEW_REPO"] = old_req
- def test_repo_refs_falls_back_to_git_when_gogs_api_fails(self) -> None:
- old_req = os.environ.get("REQUIRE_LOGIN_TO_VIEW_REPO")
- os.environ["REQUIRE_LOGIN_TO_VIEW_REPO"] = "0"
- try:
- app2 = self.appmod.create_app()
- client = app2.test_client()
- import server.routes as routesmod
- from requests import Response
- from server.core import isoformat, utcnow
- from server.db import execute
- with app2.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
- )
- resource_id = int(cur.lastrowid)
- original_branches = routesmod.gogs_branches
- original_tags = routesmod.gogs_tags
- original_git_refs = routesmod.gogs_git_list_refs
- try:
- def stub_branches(owner, repo):
- resp = Response()
- resp.status_code = 500
- resp.url = f"http://gogs.local/api/v1/repos/{owner}/{repo}/branches"
- resp._content = b'{"message":"boom"}'
- resp.headers["Content-Type"] = "application/json"
- return resp
- def stub_tags(owner, repo):
- resp = Response()
- resp.status_code = 500
- resp.url = f"http://gogs.local/api/v1/repos/{owner}/{repo}/tags"
- resp._content = b'{"message":"boom"}'
- resp.headers["Content-Type"] = "application/json"
- return resp
- routesmod.gogs_branches = stub_branches
- routesmod.gogs_tags = stub_tags
- routesmod.gogs_git_list_refs = lambda owner, repo: {"branches": [{"name": "master"}], "tags": []}
- r = client.get(f"/resources/{resource_id}/repo/refs")
- self.assertEqual(200, r.status_code)
- self.assertEqual([{"name": "master"}], r.json.get("branches"))
- finally:
- routesmod.gogs_branches = original_branches
- routesmod.gogs_tags = original_tags
- routesmod.gogs_git_list_refs = original_git_refs
- finally:
- if old_req is None:
- os.environ.pop("REQUIRE_LOGIN_TO_VIEW_REPO", None)
- else:
- os.environ["REQUIRE_LOGIN_TO_VIEW_REPO"] = old_req
- def test_download_requires_login(self) -> None:
- client = self.app.test_client()
- from server.core import isoformat, utcnow
- from server.db import execute
- with self.app.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
- )
- resource_id = int(cur.lastrowid)
- r = client.post(f"/resources/{resource_id}/download", json={"ref": "master"})
- self.assertEqual(401, r.status_code)
- def test_download_vip_requires_vip(self) -> None:
- client = self.app.test_client()
- r = client.post("/auth/register", json={"phone": "13900000002", "password": "abc12345"})
- self.assertIn(r.status_code, {200, 400})
- r = client.post("/auth/login", json={"phone": "13900000002", "password": "abc12345"})
- self.assertEqual(200, r.status_code)
- from server.core import isoformat, utcnow
- from server.db import execute
- with self.app.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- ("t", "s", "VIP", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
- )
- resource_id = int(cur.lastrowid)
- r = client.post(f"/resources/{resource_id}/download", json={"ref": "master"})
- self.assertEqual(403, r.status_code)
- self.assertEqual("vip_required", (r.json or {}).get("error"))
- def test_download_uses_git_archive_when_gogs_archive_fails(self) -> None:
- client = self.app.test_client()
- r = client.post("/auth/register", json={"phone": "13900000001", "password": "abc12345"})
- self.assertIn(r.status_code, {200, 400})
- r = client.post("/auth/login", json={"phone": "13900000001", "password": "abc12345"})
- self.assertEqual(200, r.status_code)
- import tempfile
- import server.routes as routesmod
- from requests import Response
- from server.core import isoformat, utcnow
- from server.db import execute
- with self.app.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
- )
- resource_id = int(cur.lastrowid)
- original_archive_get = routesmod.gogs_archive_get
- original_git_archive = routesmod.gogs_git_archive_zip
- original_git_archive_commit = routesmod.gogs_git_archive_zip_commit
- original_resolve = routesmod.gogs_resolve_ref_commit
- try:
- def stub_archive_get(owner, repo, ref):
- resp = Response()
- resp.status_code = 500
- resp.url = "http://gogs.local/api/v1/repos/root/r/archive/master.zip"
- resp._content = b'{"message":"boom"}'
- return resp
- def stub_git_archive_commit(owner, repo, commit):
- f = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
- f.write(b"ZIPDATA")
- f.close()
- return f.name
- routesmod.gogs_archive_get = stub_archive_get
- routesmod.gogs_resolve_ref_commit = lambda owner, repo, ref: {"ok": True, "ref": ref, "commit": "a" * 40, "kind": "branch"}
- routesmod.gogs_git_archive_zip_commit = stub_git_archive_commit
- r = client.post(f"/resources/{resource_id}/download", json={"ref": "master"})
- self.assertEqual(200, r.status_code)
- self.assertTrue(bool((r.json or {}).get("ok")))
- self.assertEqual("a" * 40, (r.json or {}).get("cacheKey"))
- download_url = (r.json or {}).get("downloadUrl")
- self.assertTrue(bool(download_url))
- status_url = (r.json or {}).get("statusUrl")
- self.assertTrue(bool(status_url))
- import time
- for _ in range(200):
- st = client.get(status_url)
- self.assertEqual(200, st.status_code)
- if (st.json or {}).get("ready"):
- break
- if (st.json or {}).get("state") == "error":
- self.fail((st.json or {}).get("error") or "build_failed")
- time.sleep(0.01)
- r2 = client.get(download_url)
- self.assertEqual(200, r2.status_code)
- self.assertEqual("application/zip", r2.headers.get("Content-Type"))
- self.assertTrue((r2.data or b"").startswith(b"ZIPDATA"))
- try:
- r2.close()
- except Exception:
- pass
- finally:
- routesmod.gogs_archive_get = original_archive_get
- routesmod.gogs_git_archive_zip = original_git_archive
- routesmod.gogs_git_archive_zip_commit = original_git_archive_commit
- routesmod.gogs_resolve_ref_commit = original_resolve
- def test_admin_delete_resource_deletes_gogs_repo(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = client.put("/admin/settings", json={"gogsBaseUrl": "http://gogs.local", "gogsToken": "t"})
- self.assertEqual(200, r.status_code)
- from server.core import isoformat, utcnow
- from server.db import execute, fetch_one
- with self.app.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- ("t", "s", "FREE", "DRAFT", None, "[]", "root", "r", 0, "http://gogs.local/root/r", "master", now, now),
- )
- resource_id = int(cur.lastrowid)
- import server.gogs as gogsmod
- from requests import Response
- calls = []
- original = gogsmod.requests.request
- def stub_request(method, url, params=None, json=None, headers=None, timeout=None):
- calls.append({"method": method, "url": url, "params": params, "json": json, "headers": headers, "timeout": timeout})
- resp = Response()
- resp.url = url
- if method.upper() == "DELETE" and url == "http://gogs.local/api/v1/repos/root/r":
- resp.status_code = 204
- resp._content = b""
- return resp
- resp.status_code = 500
- resp._content = b'{"message":"unexpected"}'
- return resp
- try:
- gogsmod.requests.request = stub_request
- r = client.delete(f"/admin/resources/{resource_id}")
- self.assertEqual(200, r.status_code)
- self.assertTrue(bool(r.json.get("ok")))
- finally:
- gogsmod.requests.request = original
- self.assertTrue(any(c["method"] == "DELETE" and c["url"].endswith("/api/v1/repos/root/r") for c in calls))
- with self.app.app_context():
- row = fetch_one("SELECT id FROM resources WHERE id = ?", (resource_id,))
- self.assertIsNone(row)
- def test_admin_download_cache_clear_all(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- from server.core import isoformat, utcnow
- from server.db import execute
- from server.context import get_config
- from pathlib import Path
- import json
- import re
- with self.app.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
- )
- resource_id = int(cur.lastrowid)
- cfg = get_config()
- cache_dir = cfg.database_path.parent / "download_cache"
- cache_dir.mkdir(parents=True, exist_ok=True)
- safe_owner = re.sub(r"[^a-zA-Z0-9._-]+", "_", "root")[:50] or "owner"
- safe_repo = re.sub(r"[^a-zA-Z0-9._-]+", "_", "r")[:50] or "repo"
- commit = "b" * 40
- zip_path = Path(cache_dir) / f"res{resource_id}__{safe_owner}__{safe_repo}__{commit[:24]}.zip"
- zip_path.write_bytes(b"ZIPDATA")
- meta_path = zip_path.with_suffix(zip_path.suffix + ".meta.json")
- meta_path.write_text(json.dumps({"owner": "root", "repo": "r", "ref": "master", "commit": commit}, ensure_ascii=False), encoding="utf-8")
- r = client.get(f"/admin/resources/{resource_id}/download-cache/summary")
- self.assertEqual(200, r.status_code)
- self.assertEqual(1, int((r.json or {}).get("count") or 0))
- r = client.delete(f"/admin/resources/{resource_id}/download-cache?all=1")
- self.assertEqual(200, r.status_code)
- self.assertEqual(1, int((r.json or {}).get("removed") or 0))
- with self.app.app_context():
- self.assertFalse(zip_path.exists())
- self.assertFalse(meta_path.exists())
- r = client.get(f"/admin/resources/{resource_id}/download-cache/summary")
- self.assertEqual(200, r.status_code)
- self.assertEqual(0, int((r.json or {}).get("count") or 0))
- def test_admin_delete_resource_deletes_uploaded_files(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = client.put("/admin/settings", json={"gogsBaseUrl": "http://gogs.local", "gogsToken": "t"})
- self.assertEqual(200, r.status_code)
- from pathlib import Path
- from server.core import isoformat, utcnow
- from server.db import execute
- project_root = Path(__file__).resolve().parents[1]
- uploads_dir = project_root / "static" / "uploads"
- uploads_dir.mkdir(parents=True, exist_ok=True)
- name1 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.png"
- name2 = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.webp"
- p1 = uploads_dir / name1
- p2 = uploads_dir / name2
- p1.write_bytes(b"x")
- p2.write_bytes(b"y")
- with self.app.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- (
- "t",
- f"img1: /static/uploads/{name2}",
- "FREE",
- "DRAFT",
- f"/static/uploads/{name1}",
- "[]",
- "root",
- "r",
- 0,
- "http://gogs.local/root/r",
- "master",
- now,
- now,
- ),
- )
- resource_id = int(cur.lastrowid)
- import server.gogs as gogsmod
- from requests import Response
- original = gogsmod.requests.request
- def stub_request(method, url, params=None, json=None, headers=None, timeout=None):
- resp = Response()
- resp.url = url
- if method.upper() == "DELETE" and url.endswith("/api/v1/repos/root/r"):
- resp.status_code = 204
- resp._content = b""
- return resp
- resp.status_code = 500
- resp._content = b'{"message":"unexpected"}'
- return resp
- try:
- gogsmod.requests.request = stub_request
- r = client.delete(f"/admin/resources/{resource_id}")
- self.assertEqual(200, r.status_code)
- self.assertTrue(bool(r.json.get("ok")))
- finally:
- gogsmod.requests.request = original
- self.assertFalse(p1.exists())
- self.assertFalse(p2.exists())
- def test_admin_delete_resource_aborts_when_gogs_delete_fails(self) -> None:
- client = self.app.test_client()
- r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
- self.assertEqual(200, r.status_code)
- r = client.put("/admin/settings", json={"gogsBaseUrl": "http://gogs.local", "gogsToken": "t"})
- self.assertEqual(200, r.status_code)
- from server.core import isoformat, utcnow
- from server.db import execute, fetch_one
- with self.app.app_context():
- now = isoformat(utcnow())
- cur = execute(
- """
- INSERT INTO resources
- (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- ("t", "s", "FREE", "DRAFT", None, "[]", "root", "r2", 0, "http://gogs.local/root/r2", "master", now, now),
- )
- resource_id = int(cur.lastrowid)
- import server.gogs as gogsmod
- from requests import Response
- original = gogsmod.requests.request
- def stub_request(method, url, params=None, json=None, headers=None, timeout=None):
- resp = Response()
- resp.url = url
- if method.upper() == "DELETE" and url == "http://gogs.local/api/v1/repos/root/r2":
- resp.status_code = 403
- resp._content = b'{"message":"forbidden"}'
- resp.headers["Content-Type"] = "application/json"
- return resp
- resp.status_code = 500
- resp._content = b'{"message":"unexpected"}'
- return resp
- try:
- gogsmod.requests.request = stub_request
- r = client.delete(f"/admin/resources/{resource_id}")
- self.assertEqual(400, r.status_code)
- self.assertEqual("gogs_unauthorized", r.json.get("error"))
- finally:
- gogsmod.requests.request = original
- with self.app.app_context():
- row = fetch_one("SELECT id FROM resources WHERE id = ?", (resource_id,))
- self.assertIsNotNone(row)
- if __name__ == "__main__":
- unittest.main()
|