test_admin_api.py 65 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491
  1. import os
  2. import tempfile
  3. import unittest
  4. class AdminApiTests(unittest.TestCase):
  5. @classmethod
  6. def setUpClass(cls) -> None:
  7. cls._tmpdir = tempfile.TemporaryDirectory()
  8. os.environ["DATABASE_PATH"] = os.path.join(cls._tmpdir.name, "test.db")
  9. os.environ["SECRET_KEY"] = "test-secret"
  10. os.environ["GOGS_BASE_URL"] = "http://127.0.0.1:9"
  11. import app as appmod
  12. cls.appmod = appmod
  13. cls.app = appmod.create_app()
  14. @classmethod
  15. def tearDownClass(cls) -> None:
  16. cls._tmpdir.cleanup()
  17. def test_admin_login_and_list_endpoints(self) -> None:
  18. client = self.app.test_client()
  19. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  20. self.assertEqual(200, r.status_code)
  21. r = client.get("/admin/stats")
  22. self.assertEqual(200, r.status_code)
  23. self.assertIn("users", r.json)
  24. self.assertIn("resources", r.json)
  25. self.assertIn("orders", r.json)
  26. self.assertIn("revenue", r.json)
  27. self.assertIn("downloads", r.json)
  28. self.assertIn("messages", r.json)
  29. self.assertIn("backend", r.json)
  30. r = client.get("/admin/resources?page=1&pageSize=10")
  31. self.assertEqual(200, r.status_code)
  32. self.assertIn("items", r.json)
  33. self.assertIn("total", r.json)
  34. self.assertIn("page", r.json)
  35. self.assertIn("pageSize", r.json)
  36. r = client.get("/admin/users?page=1&pageSize=10")
  37. self.assertEqual(200, r.status_code)
  38. self.assertIn("items", r.json)
  39. self.assertIn("total", r.json)
  40. self.assertIn("page", r.json)
  41. self.assertIn("pageSize", r.json)
  42. r = client.get("/admin/orders?page=1&pageSize=10")
  43. self.assertEqual(200, r.status_code)
  44. self.assertIn("items", r.json)
  45. self.assertIn("total", r.json)
  46. self.assertIn("page", r.json)
  47. self.assertIn("pageSize", r.json)
  48. def test_admin_shortcut_pages_redirect(self) -> None:
  49. client = self.app.test_client()
  50. r = client.get("/admin")
  51. self.assertIn(r.status_code, {301, 302, 308})
  52. self.assertIn("/ui/admin", r.headers.get("Location") or "")
  53. r = client.get("/admin/login")
  54. self.assertIn(r.status_code, {301, 302, 308})
  55. self.assertIn("/ui/admin/login", r.headers.get("Location") or "")
  56. def test_admin_stats_requires_login(self) -> None:
  57. client = self.app.test_client()
  58. r = client.get("/admin/stats")
  59. self.assertEqual(401, r.status_code)
  60. def test_admin_users_vip_filters_and_remaining_days(self) -> None:
  61. client = self.app.test_client()
  62. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  63. self.assertEqual(200, r.status_code)
  64. from datetime import timedelta
  65. from server.core import isoformat, utcnow
  66. from server.db import execute
  67. from werkzeug.security import generate_password_hash
  68. with self.app.app_context():
  69. now = utcnow()
  70. cur = execute(
  71. """
  72. INSERT INTO users
  73. (phone, password_hash, status, vip_expire_at, created_at)
  74. VALUES
  75. (?, ?, ?, ?, ?)
  76. """,
  77. ("13900008881", generate_password_hash("p"), "ACTIVE", isoformat(now + timedelta(days=5)), isoformat(now)),
  78. )
  79. vip_user_id = int(cur.lastrowid)
  80. cur = execute(
  81. """
  82. INSERT INTO users
  83. (phone, password_hash, status, vip_expire_at, created_at)
  84. VALUES
  85. (?, ?, ?, ?, ?)
  86. """,
  87. ("13900008882", generate_password_hash("p"), "ACTIVE", None, isoformat(now)),
  88. )
  89. nonvip_user_id = int(cur.lastrowid)
  90. r = client.get("/admin/users?page=1&pageSize=50&vip=VIP")
  91. self.assertEqual(200, r.status_code)
  92. items = r.json.get("items") or []
  93. self.assertTrue(any(it.get("id") == vip_user_id for it in items))
  94. self.assertFalse(any(it.get("id") == nonvip_user_id for it in items))
  95. vip_item = next(it for it in items if it.get("id") == vip_user_id)
  96. self.assertEqual(True, vip_item.get("vipActive"))
  97. self.assertTrue(int(vip_item.get("vipRemainingDays") or 0) >= 1)
  98. r = client.get("/admin/users?page=1&pageSize=50&vip=NONVIP")
  99. self.assertEqual(200, r.status_code)
  100. items = r.json.get("items") or []
  101. self.assertFalse(any(it.get("id") == vip_user_id for it in items))
  102. self.assertTrue(any(it.get("id") == nonvip_user_id for it in items))
  103. def test_admin_can_reset_user_password(self) -> None:
  104. user_client = self.app.test_client()
  105. r = user_client.post("/auth/register", json={"phone": "13900007771", "password": "oldpass123"})
  106. self.assertEqual(200, r.status_code)
  107. user_id = int(r.json.get("id"))
  108. admin_client = self.app.test_client()
  109. r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  110. self.assertEqual(200, r.status_code)
  111. r = admin_client.post(f"/admin/users/{user_id}/password-reset", json={"password": "newpass123"})
  112. self.assertEqual(200, r.status_code)
  113. login_client = self.app.test_client()
  114. r = login_client.post("/auth/login", json={"phone": "13900007771", "password": "oldpass123"})
  115. self.assertEqual(401, r.status_code)
  116. r = login_client.post("/auth/login", json={"phone": "13900007771", "password": "newpass123"})
  117. self.assertEqual(200, r.status_code)
  118. def test_vip_adjust_creates_user_message_and_can_mark_read(self) -> None:
  119. user_client = self.app.test_client()
  120. phone = "13900006661"
  121. password = "pass1234"
  122. r = user_client.post("/auth/register", json={"phone": phone, "password": password})
  123. self.assertEqual(200, r.status_code)
  124. user_id = int(r.json.get("id"))
  125. admin_client = self.app.test_client()
  126. r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  127. self.assertEqual(200, r.status_code)
  128. r = admin_client.post(f"/admin/users/{user_id}/vip-adjust", json={"addDays": 7})
  129. self.assertEqual(200, r.status_code)
  130. r = user_client.post("/auth/login", json={"phone": phone, "password": password})
  131. self.assertEqual(200, r.status_code)
  132. r = user_client.get("/me/messages?page=1&pageSize=10")
  133. self.assertEqual(200, r.status_code)
  134. self.assertTrue(int(r.json.get("unreadCount") or 0) >= 1)
  135. items = r.json.get("items") or []
  136. self.assertTrue(len(items) >= 1)
  137. msg_id = int(items[0].get("id"))
  138. self.assertIn("会员", items[0].get("title") or "")
  139. r = user_client.put(f"/me/messages/{msg_id}/read")
  140. self.assertEqual(200, r.status_code)
  141. r = user_client.get("/me/messages?page=1&pageSize=10")
  142. self.assertEqual(200, r.status_code)
  143. self.assertEqual(0, int(r.json.get("unreadCount") or 0))
  144. def test_ui_messages_page_renders(self) -> None:
  145. client = self.app.test_client()
  146. r = client.get("/ui/messages")
  147. self.assertEqual(200, r.status_code)
  148. self.assertIn(b'data-page="messages"', r.data)
  149. def test_admin_can_send_list_and_delete_messages(self) -> None:
  150. user_client = self.app.test_client()
  151. r = user_client.post("/auth/register", json={"phone": "13900005551", "password": "p123456"})
  152. self.assertEqual(200, r.status_code)
  153. user_id = int(r.json.get("id"))
  154. admin_client = self.app.test_client()
  155. r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  156. self.assertEqual(200, r.status_code)
  157. r = admin_client.post(
  158. "/admin/messages/send",
  159. json={"userId": user_id, "title": "测试消息", "content": "内容A"},
  160. )
  161. self.assertEqual(200, r.status_code)
  162. msg_id = int(r.json.get("id"))
  163. r = admin_client.get(f"/admin/messages?page=1&pageSize=50&user_id={user_id}")
  164. self.assertEqual(200, r.status_code)
  165. items = r.json.get("items") or []
  166. self.assertTrue(any(int(it.get("id")) == msg_id for it in items))
  167. r = admin_client.delete(f"/admin/messages/{msg_id}")
  168. self.assertEqual(200, r.status_code)
  169. r = admin_client.get(f"/admin/messages?page=1&pageSize=50&user_id={user_id}")
  170. self.assertEqual(200, r.status_code)
  171. items = r.json.get("items") or []
  172. self.assertFalse(any(int(it.get("id")) == msg_id for it in items))
  173. def test_admin_can_broadcast_messages_by_audience(self) -> None:
  174. from datetime import timedelta
  175. from server.core import isoformat, utcnow
  176. from server.db import execute
  177. vip_client = self.app.test_client()
  178. r = vip_client.post("/auth/register", json={"phone": "13900005561", "password": "p123456"})
  179. self.assertEqual(200, r.status_code)
  180. vip_user_id = int(r.json.get("id"))
  181. nonvip_client = self.app.test_client()
  182. r = nonvip_client.post("/auth/register", json={"phone": "13900005562", "password": "p123456"})
  183. self.assertEqual(200, r.status_code)
  184. nonvip_user_id = int(r.json.get("id"))
  185. with self.app.app_context():
  186. now = utcnow()
  187. execute("UPDATE users SET vip_expire_at = ? WHERE id = ?", (isoformat(now + timedelta(days=3)), vip_user_id))
  188. execute("UPDATE users SET vip_expire_at = NULL WHERE id = ?", (nonvip_user_id,))
  189. admin_client = self.app.test_client()
  190. r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  191. self.assertEqual(200, r.status_code)
  192. r = admin_client.post("/admin/messages/broadcast", json={"audience": "VIP", "title": "VIP通知", "content": "内容B"})
  193. self.assertEqual(200, r.status_code)
  194. self.assertEqual(1, int(r.json.get("count") or 0))
  195. r = admin_client.get(f"/admin/messages?page=1&pageSize=50&user_id={vip_user_id}&senderType=ADMIN")
  196. self.assertEqual(200, r.status_code)
  197. items = r.json.get("items") or []
  198. self.assertTrue(any((it.get("title") or "") == "VIP通知" for it in items))
  199. r = admin_client.get(f"/admin/messages?page=1&pageSize=50&user_id={nonvip_user_id}&senderType=ADMIN")
  200. self.assertEqual(200, r.status_code)
  201. items = r.json.get("items") or []
  202. self.assertFalse(any((it.get("title") or "") == "VIP通知" for it in items))
  203. def test_admin_db_status_and_switch_endpoints(self) -> None:
  204. admin_client = self.app.test_client()
  205. r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  206. self.assertEqual(200, r.status_code)
  207. r = admin_client.get("/admin/db/status")
  208. self.assertEqual(200, r.status_code)
  209. self.assertEqual(True, bool(r.json.get("ok")))
  210. self.assertIn("db", r.json)
  211. self.assertIn("active", r.json["db"])
  212. self.assertIn("probe", r.json)
  213. self.assertIn("connectOk", r.json["probe"])
  214. self.assertEqual(True, bool(r.json["probe"].get("connectOk")))
  215. self.assertIn("effective", r.json["probe"])
  216. r = admin_client.post("/admin/db/switch", json={"target": "sqlite"})
  217. self.assertEqual(200, r.status_code)
  218. self.assertEqual(True, bool(r.json.get("ok")))
  219. r = admin_client.post("/admin/db/switch", json={"target": "mysql"})
  220. self.assertEqual(400, r.status_code)
  221. self.assertEqual("mysql_not_configured", r.json.get("error"))
  222. def test_gogs_endpoints_fail_gracefully(self) -> None:
  223. client = self.app.test_client()
  224. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  225. self.assertEqual(200, r.status_code)
  226. old_env_token = os.environ.pop("GOGS_TOKEN", None)
  227. old_env_redis_url = os.environ.get("REDIS_URL")
  228. os.environ["REDIS_URL"] = "redis://127.0.0.1:0/0"
  229. try:
  230. r = client.put("/admin/settings", json={"clearGogsToken": True})
  231. self.assertEqual(200, r.status_code)
  232. r = client.get("/admin/gogs/repo?owner=aa&repo=bb")
  233. self.assertEqual(502, r.status_code)
  234. self.assertEqual("gogs_failed", r.json.get("error"))
  235. r = client.get("/admin/gogs/branches?owner=aa&repo=bb")
  236. self.assertEqual(502, r.status_code)
  237. self.assertEqual("gogs_failed", r.json.get("error"))
  238. r = client.get("/admin/gogs/tags?owner=aa&repo=bb")
  239. self.assertEqual(502, r.status_code)
  240. self.assertEqual("gogs_failed", r.json.get("error"))
  241. r = client.get("/admin/gogs/repos?owner=aa&q=bb")
  242. self.assertEqual(502, r.status_code)
  243. self.assertEqual("gogs_failed", r.json.get("error"))
  244. r = client.get("/admin/gogs/repos?q=bb")
  245. self.assertEqual(400, r.status_code)
  246. self.assertEqual("gogs_token_required", r.json.get("error"))
  247. finally:
  248. if old_env_token is not None:
  249. os.environ["GOGS_TOKEN"] = old_env_token
  250. else:
  251. os.environ.pop("GOGS_TOKEN", None)
  252. if old_env_redis_url is None:
  253. os.environ.pop("REDIS_URL", None)
  254. else:
  255. os.environ["REDIS_URL"] = old_env_redis_url
  256. old_token = os.environ.get("GOGS_TOKEN")
  257. os.environ["GOGS_TOKEN"] = "test-token"
  258. try:
  259. app2 = self.appmod.create_app()
  260. client2 = app2.test_client()
  261. r = client2.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  262. self.assertEqual(200, r.status_code)
  263. r = client2.get("/admin/gogs/repos?q=bb")
  264. self.assertEqual(502, r.status_code)
  265. self.assertEqual("gogs_failed", r.json.get("error"))
  266. finally:
  267. if old_token is None:
  268. os.environ.pop("GOGS_TOKEN", None)
  269. else:
  270. os.environ["GOGS_TOKEN"] = old_token
  271. def test_alipay_callback_can_mark_order_paid_idempotently(self) -> None:
  272. from decimal import Decimal
  273. user_client = self.app.test_client()
  274. r = user_client.post("/auth/register", json={"phone": "13900004441", "password": "p123456"})
  275. self.assertEqual(200, r.status_code)
  276. r = user_client.post("/auth/login", json={"phone": "13900004441", "password": "p123456"})
  277. self.assertEqual(200, r.status_code)
  278. plans = user_client.get("/plans").json
  279. self.assertTrue(isinstance(plans, list) and len(plans) >= 1)
  280. plan_id = int(plans[0]["id"])
  281. order = user_client.post("/orders", json={"planId": plan_id}).json
  282. order_id = order["id"]
  283. amount_cents = int(order["amountCents"])
  284. total_amount = (Decimal(amount_cents) / Decimal(100)).quantize(Decimal("0.01"))
  285. admin_client = self.app.test_client()
  286. r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  287. self.assertEqual(200, r.status_code)
  288. from Crypto.Hash import SHA256
  289. from Crypto.PublicKey import RSA
  290. from Crypto.Signature import pkcs1_15
  291. key = RSA.generate(2048)
  292. private_pem = key.export_key(format="PEM").decode("utf-8")
  293. public_pem = key.publickey().export_key(format="PEM").decode("utf-8")
  294. r = admin_client.put(
  295. "/admin/settings",
  296. json={
  297. "payment": {
  298. "provider": "ALIPAY",
  299. "enableMockPay": False,
  300. "alipay": {
  301. "appId": "2021000123456789",
  302. "gateway": "https://openapi.alipay.com/gateway.do",
  303. "notifyUrl": "https://example.com/pay/callback",
  304. "returnUrl": "https://example.com/ui/me",
  305. "privateKey": private_pem,
  306. "publicKey": public_pem,
  307. },
  308. }
  309. },
  310. )
  311. self.assertEqual(200, r.status_code)
  312. params = {
  313. "app_id": "2021000123456789",
  314. "out_trade_no": order_id,
  315. "trade_no": "202603210000000000001",
  316. "trade_status": "TRADE_SUCCESS",
  317. "total_amount": str(total_amount),
  318. }
  319. sign_content = "&".join([f"{k}={params[k]}" for k in sorted(params.keys())])
  320. sig = pkcs1_15.new(RSA.import_key(private_pem)).sign(SHA256.new(sign_content.encode("utf-8")))
  321. params["sign_type"] = "RSA2"
  322. params["sign"] = __import__("base64").b64encode(sig).decode("utf-8")
  323. r = user_client.post("/pay/callback", data=params)
  324. self.assertEqual(200, r.status_code)
  325. self.assertEqual(b"success", (r.data or b"").strip())
  326. r = user_client.post("/pay/callback", data=params)
  327. self.assertEqual(200, r.status_code)
  328. self.assertEqual(b"success", (r.data or b"").strip())
  329. from server.db import fetch_one
  330. with self.app.app_context():
  331. row = fetch_one("SELECT * FROM orders WHERE id = ?", (order_id,))
  332. self.assertEqual("PAID", row["status"])
  333. self.assertEqual("ALIPAY", row["pay_channel"])
  334. self.assertEqual("202603210000000000001", row["pay_trade_no"])
  335. def test_repo_tree_guest_allowed_and_file_preview_protected(self) -> None:
  336. client = self.app.test_client()
  337. from server.core import isoformat, utcnow
  338. from server.db import execute
  339. from werkzeug.security import generate_password_hash
  340. with self.app.app_context():
  341. now = isoformat(utcnow())
  342. cur = execute(
  343. """
  344. INSERT INTO resources
  345. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  346. VALUES
  347. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  348. """,
  349. ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
  350. )
  351. resource_id = int(cur.lastrowid)
  352. cur = execute(
  353. """
  354. INSERT INTO users
  355. (phone, password_hash, status, vip_expire_at, created_at)
  356. VALUES
  357. (?, ?, ?, ?, ?)
  358. """,
  359. ("13800000000", generate_password_hash("p"), "ACTIVE", None, now),
  360. )
  361. user_id = int(cur.lastrowid)
  362. import server.routes as routesmod
  363. from requests import Response
  364. import base64
  365. original = routesmod.gogs_contents
  366. def stub_gogs_contents(owner, repo, path, ref):
  367. resp = Response()
  368. resp.status_code = 200
  369. resp.url = "http://stub.local"
  370. if not path:
  371. payload = [
  372. {"name": "docs", "path": "docs", "type": "dir", "size": 0},
  373. {"name": "README.md", "path": "README.md", "type": "file", "size": 1},
  374. {"name": "main.py", "path": "main.py", "type": "file", "size": 1},
  375. {"name": ".env", "path": ".env", "type": "file", "size": 1},
  376. ]
  377. else:
  378. b = base64.b64encode(("content:" + str(path)).encode("utf-8")).decode("ascii")
  379. payload = {"type": "file", "encoding": "base64", "size": 10, "content": b}
  380. resp._content = __import__("json").dumps(payload).encode("utf-8")
  381. return resp
  382. try:
  383. routesmod.gogs_contents = stub_gogs_contents
  384. r = client.get(f"/resources/{resource_id}/repo/tree?ref=master&path=")
  385. self.assertEqual(200, r.status_code)
  386. by_path = {it.get("path"): it for it in (r.json.get("items") or [])}
  387. self.assertTrue(bool(by_path.get("docs", {}).get("guestAllowed")))
  388. self.assertTrue(bool(by_path.get("README.md", {}).get("guestAllowed")))
  389. self.assertFalse(bool(by_path.get("main.py", {}).get("guestAllowed")))
  390. self.assertFalse(bool(by_path.get(".env", {}).get("guestAllowed")))
  391. r = client.get(f"/resources/{resource_id}/repo/file?ref=master&path=main.py")
  392. self.assertEqual(401, r.status_code)
  393. self.assertEqual("login_required", r.json.get("error"))
  394. r = client.get(f"/resources/{resource_id}/repo/file?ref=master&path=README.md")
  395. self.assertEqual(200, r.status_code)
  396. self.assertIn("content", r.json)
  397. client2 = self.app.test_client()
  398. with client2.session_transaction() as s:
  399. s["user_id"] = user_id
  400. r = client2.get(f"/resources/{resource_id}/repo/tree?ref=master&path=")
  401. self.assertEqual(200, r.status_code)
  402. for it in r.json.get("items") or []:
  403. self.assertTrue(bool(it.get("guestAllowed")))
  404. finally:
  405. routesmod.gogs_contents = original
  406. def test_my_download_logs_paginated_and_mark_deleted(self) -> None:
  407. from server.core import isoformat, utcnow
  408. from server.db import execute
  409. from werkzeug.security import generate_password_hash
  410. import tempfile
  411. with self.app.app_context():
  412. now = isoformat(utcnow())
  413. cur = execute(
  414. """
  415. INSERT INTO users
  416. (phone, password_hash, status, vip_expire_at, created_at)
  417. VALUES
  418. (?, ?, ?, ?, ?)
  419. """,
  420. ("13900000000", generate_password_hash("p"), "ACTIVE", None, now),
  421. )
  422. user_id = int(cur.lastrowid)
  423. cur = execute(
  424. """
  425. INSERT INTO resources
  426. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  427. VALUES
  428. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  429. """,
  430. ("res", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
  431. )
  432. resource_id = int(cur.lastrowid)
  433. import server.routes as routesmod
  434. class DummyUpstream:
  435. status_code = 500
  436. original_archive_get = routesmod.gogs_archive_get
  437. original_git_zip = routesmod.gogs_git_archive_zip
  438. original_resolve = routesmod.gogs_resolve_ref_commit
  439. tmp = tempfile.NamedTemporaryFile(delete=False)
  440. tmp.write(b"ZIP")
  441. tmp.close()
  442. try:
  443. routesmod.gogs_archive_get = lambda owner, repo, ref: DummyUpstream()
  444. routesmod.gogs_git_archive_zip = lambda owner, repo, ref: tmp.name
  445. routesmod.gogs_resolve_ref_commit = lambda owner, repo, ref: {"ok": False, "ref": ref, "commit": None, "kind": "unknown"}
  446. client = self.app.test_client()
  447. with client.session_transaction() as s:
  448. s["user_id"] = user_id
  449. r = client.post(f"/resources/{resource_id}/download", json={"ref": "master"})
  450. self.assertEqual(200, r.status_code)
  451. r = client.get("/me/downloads?page=1&pageSize=10")
  452. self.assertEqual(200, r.status_code)
  453. self.assertEqual(1, int(r.json.get("total") or 0))
  454. self.assertEqual(1, len(r.json.get("items") or []))
  455. it = (r.json.get("items") or [None])[0]
  456. self.assertEqual(resource_id, it.get("resourceId"))
  457. self.assertEqual("res", it.get("resourceTitle"))
  458. self.assertEqual("FREE", it.get("resourceType"))
  459. self.assertEqual("FREE", it.get("currentResourceType"))
  460. self.assertEqual("ONLINE", it.get("resourceState"))
  461. with self.app.app_context():
  462. execute("DELETE FROM resources WHERE id = ?", (resource_id,))
  463. r = client.get("/me/downloads?page=1&pageSize=10")
  464. self.assertEqual(200, r.status_code)
  465. it = (r.json.get("items") or [None])[0]
  466. self.assertEqual("DELETED", it.get("resourceState"))
  467. self.assertIsNone(it.get("currentResourceType"))
  468. finally:
  469. routesmod.gogs_archive_get = original_archive_get
  470. routesmod.gogs_git_archive_zip = original_git_zip
  471. routesmod.gogs_resolve_ref_commit = original_resolve
  472. try:
  473. os.unlink(tmp.name)
  474. except Exception:
  475. pass
  476. def test_admin_download_logs_list_and_filters(self) -> None:
  477. from server.core import isoformat, utcnow
  478. from server.db import execute
  479. from werkzeug.security import generate_password_hash
  480. import tempfile
  481. with self.app.app_context():
  482. now = isoformat(utcnow())
  483. cur = execute(
  484. """
  485. INSERT INTO users
  486. (phone, password_hash, status, vip_expire_at, created_at)
  487. VALUES
  488. (?, ?, ?, ?, ?)
  489. """,
  490. ("13900009901", generate_password_hash("p"), "ACTIVE", None, now),
  491. )
  492. user_id = int(cur.lastrowid)
  493. cur = execute(
  494. """
  495. INSERT INTO resources
  496. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  497. VALUES
  498. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  499. """,
  500. ("res2", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
  501. )
  502. resource_id = int(cur.lastrowid)
  503. import server.routes as routesmod
  504. class DummyUpstream:
  505. status_code = 500
  506. original_archive_get = routesmod.gogs_archive_get
  507. original_git_zip = routesmod.gogs_git_archive_zip
  508. original_resolve = routesmod.gogs_resolve_ref_commit
  509. tmp = tempfile.NamedTemporaryFile(delete=False)
  510. tmp.write(b"ZIP")
  511. tmp.close()
  512. try:
  513. routesmod.gogs_archive_get = lambda owner, repo, ref: DummyUpstream()
  514. routesmod.gogs_git_archive_zip = lambda owner, repo, ref: tmp.name
  515. routesmod.gogs_resolve_ref_commit = lambda owner, repo, ref: {"ok": False, "ref": ref, "commit": None, "kind": "unknown"}
  516. user_client = self.app.test_client()
  517. with user_client.session_transaction() as s:
  518. s["user_id"] = user_id
  519. r = user_client.post(f"/resources/{resource_id}/download", json={"ref": "master"})
  520. self.assertEqual(200, r.status_code)
  521. admin_client = self.app.test_client()
  522. r = admin_client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  523. self.assertEqual(200, r.status_code)
  524. r = admin_client.get("/admin/download-logs?page=1&pageSize=10&q=13900009901")
  525. self.assertEqual(200, r.status_code)
  526. self.assertEqual(1, int(r.json.get("total") or 0))
  527. it = (r.json.get("items") or [None])[0]
  528. self.assertEqual("13900009901", it.get("userPhone"))
  529. self.assertEqual(resource_id, it.get("resourceId"))
  530. self.assertEqual("res2", it.get("resourceTitle"))
  531. self.assertEqual("FREE", it.get("resourceType"))
  532. self.assertEqual("FREE", it.get("currentResourceType"))
  533. self.assertEqual("ONLINE", it.get("resourceState"))
  534. r = admin_client.get("/admin/download-logs?page=1&pageSize=10&q=13900009901&type=FREE")
  535. self.assertEqual(200, r.status_code)
  536. self.assertEqual(1, int(r.json.get("total") or 0))
  537. with self.app.app_context():
  538. execute("DELETE FROM resources WHERE id = ?", (resource_id,))
  539. r = admin_client.get("/admin/download-logs?page=1&pageSize=10&q=13900009901&state=DELETED")
  540. self.assertEqual(200, r.status_code)
  541. self.assertEqual(1, int(r.json.get("total") or 0))
  542. it = (r.json.get("items") or [None])[0]
  543. self.assertEqual("DELETED", it.get("resourceState"))
  544. finally:
  545. routesmod.gogs_archive_get = original_archive_get
  546. routesmod.gogs_git_archive_zip = original_git_zip
  547. routesmod.gogs_resolve_ref_commit = original_resolve
  548. try:
  549. os.unlink(tmp.name)
  550. except Exception:
  551. pass
  552. def test_resources_cover_url_defaults_when_missing(self) -> None:
  553. client = self.app.test_client()
  554. from server.core import isoformat, utcnow
  555. from server.db import execute
  556. with self.app.app_context():
  557. now = isoformat(utcnow())
  558. cur = execute(
  559. """
  560. INSERT INTO resources
  561. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  562. VALUES
  563. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  564. """,
  565. ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
  566. )
  567. resource_id = int(cur.lastrowid)
  568. r = client.get("/resources?page=1&pageSize=10")
  569. self.assertEqual(200, r.status_code)
  570. items = r.json.get("items") or []
  571. self.assertTrue(any(it.get("id") == resource_id for it in items))
  572. for it in items:
  573. if it.get("id") == resource_id:
  574. self.assertEqual("/static/images/resources/default.png", it.get("coverUrl"))
  575. r = client.get(f"/resources/{resource_id}")
  576. self.assertEqual(200, r.status_code)
  577. self.assertEqual("/static/images/resources/default.png", r.json.get("coverUrl"))
  578. def test_admin_settings_saved_to_db(self) -> None:
  579. client = self.app.test_client()
  580. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  581. self.assertEqual(200, r.status_code)
  582. r = client.put("/admin/settings", json={"clearGogsToken": True})
  583. self.assertEqual(200, r.status_code)
  584. r = client.get("/admin/settings")
  585. self.assertEqual(200, r.status_code)
  586. self.assertIn("gogsBaseUrl", r.json)
  587. self.assertIn("hasGogsToken", r.json)
  588. self.assertIn("payment", r.json)
  589. self.assertIn("llm", r.json)
  590. self.assertIn("cache", r.json)
  591. self.assertFalse(bool(r.json.get("cache", {}).get("hasRedisUrl")))
  592. self.assertFalse(bool(r.json.get("hasGogsToken")))
  593. r = client.put(
  594. "/admin/settings",
  595. json={
  596. "gogsBaseUrl": "http://127.0.0.1:9",
  597. "gogsToken": "db-test-token",
  598. "clearGogsToken": False,
  599. "payment": {"provider": "MOCK", "enableMockPay": True, "apiKey": "pay-key", "clearApiKey": False},
  600. "llm": {"provider": "DeepSeek", "baseUrl": "https://api.example.com", "model": "deepseek-chat", "apiKey": "llm-key", "clearApiKey": False},
  601. "cache": {"redisUrl": "redis://:pwd@127.0.0.1:6379/0", "clearRedisUrl": False},
  602. },
  603. )
  604. self.assertEqual(200, r.status_code)
  605. r = client.get("/admin/settings")
  606. self.assertEqual(200, r.status_code)
  607. self.assertTrue(bool(r.json.get("hasGogsToken")))
  608. self.assertEqual("MOCK", r.json.get("payment", {}).get("provider"))
  609. self.assertTrue(bool(r.json.get("payment", {}).get("hasApiKey")))
  610. self.assertTrue(bool(r.json.get("payment", {}).get("enableMockPay")))
  611. self.assertEqual("DeepSeek", r.json.get("llm", {}).get("provider"))
  612. self.assertEqual("https://api.example.com", r.json.get("llm", {}).get("baseUrl"))
  613. self.assertEqual("deepseek-chat", r.json.get("llm", {}).get("model"))
  614. self.assertTrue(bool(r.json.get("llm", {}).get("hasApiKey")))
  615. self.assertTrue(bool(r.json.get("cache", {}).get("hasRedisUrl")))
  616. self.assertTrue(str(r.json.get("cache", {}).get("redisUrl") or "").endswith("127.0.0.1:6379/0"))
  617. r = client.put("/admin/settings", json={"cache": {"clearRedisUrl": True}})
  618. self.assertEqual(200, r.status_code)
  619. r = client.get("/admin/settings")
  620. self.assertEqual(200, r.status_code)
  621. self.assertFalse(bool(r.json.get("cache", {}).get("hasRedisUrl")))
  622. r = client.get("/admin/gogs/repos?q=bb")
  623. self.assertEqual(502, r.status_code)
  624. self.assertEqual("gogs_failed", r.json.get("error"))
  625. def test_admin_settings_storage_oss_fields(self) -> None:
  626. client = self.app.test_client()
  627. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  628. self.assertEqual(200, r.status_code)
  629. r = client.put(
  630. "/admin/settings",
  631. json={
  632. "storage": {
  633. "provider": "AUTO",
  634. "oss": {
  635. "endpoint": "https://oss-cn-hangzhou.aliyuncs.com",
  636. "bucket": "bkt",
  637. "accessKeyId": "id",
  638. "accessKeySecret": "sec",
  639. "clearAccessKeySecret": False,
  640. "uploadPrefix": "uploads/",
  641. "publicBaseUrl": "https://cdn.example.com",
  642. },
  643. }
  644. },
  645. )
  646. self.assertEqual(200, r.status_code)
  647. r = client.get("/admin/settings")
  648. self.assertEqual(200, r.status_code)
  649. storage = r.json.get("storage") or {}
  650. oss = storage.get("oss") or {}
  651. self.assertEqual("AUTO", storage.get("provider"))
  652. self.assertEqual("https://oss-cn-hangzhou.aliyuncs.com", oss.get("endpoint"))
  653. self.assertEqual("bkt", oss.get("bucket"))
  654. self.assertEqual("id", oss.get("accessKeyId"))
  655. self.assertEqual("uploads/", oss.get("uploadPrefix"))
  656. self.assertEqual("https://cdn.example.com", oss.get("publicBaseUrl"))
  657. self.assertTrue(bool(oss.get("hasAccessKeySecret")))
  658. self.assertNotIn("accessKeySecret", oss)
  659. r = client.put("/admin/settings", json={"storage": {"oss": {"clearAccessKeySecret": True}}})
  660. self.assertEqual(200, r.status_code)
  661. r = client.get("/admin/settings")
  662. self.assertEqual(200, r.status_code)
  663. storage = r.json.get("storage") or {}
  664. oss = storage.get("oss") or {}
  665. self.assertFalse(bool(oss.get("hasAccessKeySecret")))
  666. def test_admin_uploads_used_detection_accepts_oss_urls(self) -> None:
  667. client = self.app.test_client()
  668. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  669. self.assertEqual(200, r.status_code)
  670. from pathlib import Path
  671. from server.core import isoformat, utcnow
  672. from server.db import execute
  673. project_root = Path(__file__).resolve().parents[1]
  674. uploads_dir = project_root / "static" / "uploads"
  675. uploads_dir.mkdir(parents=True, exist_ok=True)
  676. used_name = "cccccccccccccccccccccccccccccccc.png"
  677. unused_name = "dddddddddddddddddddddddddddddddd.webp"
  678. (uploads_dir / used_name).write_bytes(b"x")
  679. (uploads_dir / unused_name).write_bytes(b"y")
  680. with self.app.app_context():
  681. now = isoformat(utcnow())
  682. execute(
  683. """
  684. INSERT INTO resources
  685. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  686. VALUES
  687. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  688. """,
  689. (
  690. "t",
  691. f"img: https://cdn.example.com/uploads/{used_name}",
  692. "FREE",
  693. "DRAFT",
  694. None,
  695. "[]",
  696. "root",
  697. "r",
  698. 0,
  699. "http://gogs.local/root/r",
  700. "master",
  701. now,
  702. now,
  703. ),
  704. )
  705. r = client.get("/admin/uploads")
  706. self.assertEqual(200, r.status_code)
  707. items = r.json.get("items") or []
  708. used_item = next((it for it in items if it.get("name") == used_name), None)
  709. unused_item = next((it for it in items if it.get("name") == unused_name), None)
  710. self.assertIsNotNone(used_item)
  711. self.assertIsNotNone(unused_item)
  712. self.assertTrue(bool(used_item.get("used")))
  713. self.assertFalse(bool(unused_item.get("used")))
  714. r = client.post("/admin/uploads/cleanup-unused", json={})
  715. self.assertEqual(200, r.status_code)
  716. self.assertTrue(int((r.json or {}).get("deletedCount") or 0) >= 1)
  717. self.assertTrue((uploads_dir / used_name).exists())
  718. self.assertFalse((uploads_dir / unused_name).exists())
  719. def test_admin_create_resource_requires_gogs_token(self) -> None:
  720. client = self.app.test_client()
  721. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  722. self.assertEqual(200, r.status_code)
  723. old_env_token = os.environ.pop("GOGS_TOKEN", None)
  724. old_env_base = os.environ.get("GOGS_BASE_URL")
  725. os.environ["GOGS_BASE_URL"] = "http://127.0.0.1:9"
  726. try:
  727. r = client.put("/admin/settings", json={"clearGogsToken": True})
  728. self.assertEqual(200, r.status_code)
  729. r = client.post(
  730. "/admin/resources",
  731. json={"title": "t", "summary": "s", "type": "FREE", "status": "DRAFT"},
  732. )
  733. self.assertEqual(400, r.status_code)
  734. self.assertEqual("gogs_token_required", r.json.get("error"))
  735. finally:
  736. if old_env_token is not None:
  737. os.environ["GOGS_TOKEN"] = old_env_token
  738. if old_env_base is None:
  739. os.environ.pop("GOGS_BASE_URL", None)
  740. else:
  741. os.environ["GOGS_BASE_URL"] = old_env_base
  742. def test_gogs_create_repo_request_shape(self) -> None:
  743. client = self.app.test_client()
  744. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  745. self.assertEqual(200, r.status_code)
  746. r = client.put("/admin/settings", json={"gogsBaseUrl": "http://gogs.local", "gogsToken": "t"})
  747. self.assertEqual(200, r.status_code)
  748. import server.gogs as gogsmod
  749. from requests import Response
  750. calls = []
  751. original = gogsmod.requests.request
  752. def stub_request(method, url, params=None, json=None, headers=None, timeout=None):
  753. calls.append({"method": method, "url": url, "params": params, "json": json, "headers": headers, "timeout": timeout})
  754. resp = Response()
  755. resp.status_code = 500
  756. resp.url = url
  757. resp._content = b'{"message":"boom"}'
  758. return resp
  759. try:
  760. gogsmod.requests.request = stub_request
  761. r = client.post(
  762. "/admin/resources",
  763. json={"title": "t", "summary": "s", "type": "FREE", "status": "DRAFT", "createRepo": True},
  764. )
  765. self.assertEqual(502, r.status_code)
  766. self.assertEqual("gogs_failed", r.json.get("error"))
  767. self.assertTrue(len(calls) >= 2)
  768. self.assertEqual("POST", calls[0]["method"])
  769. self.assertEqual("http://gogs.local/api/v1/user/repos", calls[0]["url"])
  770. self.assertFalse(bool((calls[0]["params"] or {}).get("token")))
  771. self.assertEqual("token t", (calls[0]["headers"] or {}).get("Authorization"))
  772. self.assertTrue(bool((calls[1]["params"] or {}).get("token")))
  773. finally:
  774. gogsmod.requests.request = original
  775. def test_create_resource_skips_readme_sync_when_contents_api_missing(self) -> None:
  776. client = self.app.test_client()
  777. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  778. self.assertEqual(200, r.status_code)
  779. r = client.put("/admin/settings", json={"gogsBaseUrl": "http://gogs.local", "gogsToken": "t"})
  780. self.assertEqual(200, r.status_code)
  781. import server.routes as routesmod
  782. import server.gogs as gogsmod
  783. from requests import Response
  784. original = gogsmod.requests.request
  785. original_git_write = routesmod.gogs_git_write_file
  786. def stub_request(method, url, params=None, json=None, headers=None, timeout=None):
  787. resp = Response()
  788. if params and "token" in params:
  789. resp.url = f"{url}?token={params.get('token')}"
  790. else:
  791. resp.url = url
  792. if url.endswith("/api/v1/user/repos") and method.upper() == "POST":
  793. resp.status_code = 201
  794. resp._content = b'{"full_name":"root/110","name":"110","owner":{"username":"root"},"default_branch":"master","private":false,"html_url":"http://gogs.local/root/110"}'
  795. return resp
  796. if "/contents/README.md" in url and method.upper() in {"POST", "PUT"}:
  797. resp.status_code = 404
  798. resp.headers["Content-Type"] = "text/html; charset=utf-8"
  799. resp._content = b"<!DOCTYPE html><html><head></head><body>Not Found</body></html>"
  800. return resp
  801. if "/contents/README.md" in url and method.upper() == "GET":
  802. resp.status_code = 404
  803. resp.headers["Content-Type"] = "application/json; charset=utf-8"
  804. resp._content = b'{"message":"not found"}'
  805. return resp
  806. resp.status_code = 500
  807. resp._content = b'{"message":"unexpected"}'
  808. return resp
  809. try:
  810. gogsmod.requests.request = stub_request
  811. routesmod.gogs_git_write_file = lambda owner, repo, branch, path, content_text, message, must_create: {"branch": branch, "commit": "c" * 40}
  812. r = client.post(
  813. "/admin/resources",
  814. json={"title": "t", "summary": "s", "type": "FREE", "status": "DRAFT", "createRepo": True, "syncReadme": True},
  815. )
  816. self.assertEqual(200, r.status_code)
  817. self.assertTrue(int(r.json.get("id") or 0) > 0)
  818. finally:
  819. gogsmod.requests.request = original
  820. routesmod.gogs_git_write_file = original_git_write
  821. def test_repo_file_crud_error_mapping(self) -> None:
  822. client = self.app.test_client()
  823. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  824. self.assertEqual(200, r.status_code)
  825. import server.routes as routesmod
  826. import server.gogs as gogsmod
  827. from server.core import isoformat, utcnow
  828. from server.db import execute
  829. with self.app.app_context():
  830. now = isoformat(utcnow())
  831. cur = execute(
  832. """
  833. INSERT INTO resources
  834. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  835. VALUES
  836. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  837. """,
  838. ("t", "s", "FREE", "DRAFT", None, "[]", "root", "r", 0, None, "master", now, now),
  839. )
  840. resource_id = int(cur.lastrowid)
  841. original_write = routesmod.gogs_git_write_file
  842. original_del = routesmod.gogs_git_delete_path
  843. try:
  844. def raise_exists(*_a, **_k):
  845. raise gogsmod.GogsGitError("file_exists", "exists")
  846. routesmod.gogs_git_write_file = raise_exists
  847. r = client.post(f"/resources/{resource_id}/repo/file", json={"ref": "master", "path": "a.txt", "content": "x", "message": "m"})
  848. self.assertEqual(409, r.status_code)
  849. self.assertEqual("file_exists", r.json.get("error"))
  850. def raise_not_found(*_a, **_k):
  851. raise gogsmod.GogsGitError("file_not_found", "missing")
  852. routesmod.gogs_git_write_file = raise_not_found
  853. r = client.put(f"/resources/{resource_id}/repo/file", json={"ref": "master", "path": "a.txt", "content": "x", "message": "m"})
  854. self.assertEqual(404, r.status_code)
  855. self.assertEqual("file_not_found", r.json.get("error"))
  856. def raise_git_missing(*_a, **_k):
  857. raise gogsmod.GogsGitError("git_not_found", "git required")
  858. routesmod.gogs_git_delete_path = raise_git_missing
  859. r = client.delete(f"/resources/{resource_id}/repo/file", json={"ref": "master", "path": "a.txt", "message": "m"})
  860. self.assertEqual(501, r.status_code)
  861. self.assertEqual("git_not_found", r.json.get("error"))
  862. finally:
  863. routesmod.gogs_git_write_file = original_write
  864. routesmod.gogs_git_delete_path = original_del
  865. def test_repo_commits_endpoint_works(self) -> None:
  866. old_req = os.environ.get("REQUIRE_LOGIN_TO_VIEW_REPO")
  867. os.environ["REQUIRE_LOGIN_TO_VIEW_REPO"] = "0"
  868. try:
  869. app2 = self.appmod.create_app()
  870. client = app2.test_client()
  871. import server.routes as routesmod
  872. from server.core import isoformat, utcnow
  873. from server.db import execute
  874. with app2.app_context():
  875. now = isoformat(utcnow())
  876. cur = execute(
  877. """
  878. INSERT INTO resources
  879. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  880. VALUES
  881. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  882. """,
  883. ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
  884. )
  885. resource_id = int(cur.lastrowid)
  886. original = routesmod.gogs_commits
  887. try:
  888. from requests import Response
  889. def stub_commits(owner, repo, *, ref, path, limit):
  890. resp = Response()
  891. resp.status_code = 200
  892. resp.url = f"http://gogs.local/api/v1/repos/{owner}/{repo}/commits"
  893. resp.headers["Content-Type"] = "application/json"
  894. resp._content = b'[{"sha":"' + (b"a" * 40) + b'","commit":{"author":{"name":"u","date":"2026-01-01T00:00:00Z"},"message":"init\\n"}}]'
  895. return resp
  896. routesmod.gogs_commits = stub_commits
  897. r = client.get(f"/resources/{resource_id}/repo/commits?ref=master&limit=10")
  898. self.assertEqual(200, r.status_code)
  899. self.assertEqual("master", r.json.get("ref"))
  900. self.assertTrue(isinstance(r.json.get("items"), list))
  901. self.assertEqual(1, len(r.json.get("items") or []))
  902. finally:
  903. routesmod.gogs_commits = original
  904. finally:
  905. if old_req is None:
  906. os.environ.pop("REQUIRE_LOGIN_TO_VIEW_REPO", None)
  907. else:
  908. os.environ["REQUIRE_LOGIN_TO_VIEW_REPO"] = old_req
  909. def test_repo_refs_falls_back_to_git_when_gogs_api_fails(self) -> None:
  910. old_req = os.environ.get("REQUIRE_LOGIN_TO_VIEW_REPO")
  911. os.environ["REQUIRE_LOGIN_TO_VIEW_REPO"] = "0"
  912. try:
  913. app2 = self.appmod.create_app()
  914. client = app2.test_client()
  915. import server.routes as routesmod
  916. from requests import Response
  917. from server.core import isoformat, utcnow
  918. from server.db import execute
  919. with app2.app_context():
  920. now = isoformat(utcnow())
  921. cur = execute(
  922. """
  923. INSERT INTO resources
  924. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  925. VALUES
  926. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  927. """,
  928. ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
  929. )
  930. resource_id = int(cur.lastrowid)
  931. original_branches = routesmod.gogs_branches
  932. original_tags = routesmod.gogs_tags
  933. original_git_refs = routesmod.gogs_git_list_refs
  934. try:
  935. def stub_branches(owner, repo):
  936. resp = Response()
  937. resp.status_code = 500
  938. resp.url = f"http://gogs.local/api/v1/repos/{owner}/{repo}/branches"
  939. resp._content = b'{"message":"boom"}'
  940. resp.headers["Content-Type"] = "application/json"
  941. return resp
  942. def stub_tags(owner, repo):
  943. resp = Response()
  944. resp.status_code = 500
  945. resp.url = f"http://gogs.local/api/v1/repos/{owner}/{repo}/tags"
  946. resp._content = b'{"message":"boom"}'
  947. resp.headers["Content-Type"] = "application/json"
  948. return resp
  949. routesmod.gogs_branches = stub_branches
  950. routesmod.gogs_tags = stub_tags
  951. routesmod.gogs_git_list_refs = lambda owner, repo: {"branches": [{"name": "master"}], "tags": []}
  952. r = client.get(f"/resources/{resource_id}/repo/refs")
  953. self.assertEqual(200, r.status_code)
  954. self.assertEqual([{"name": "master"}], r.json.get("branches"))
  955. finally:
  956. routesmod.gogs_branches = original_branches
  957. routesmod.gogs_tags = original_tags
  958. routesmod.gogs_git_list_refs = original_git_refs
  959. finally:
  960. if old_req is None:
  961. os.environ.pop("REQUIRE_LOGIN_TO_VIEW_REPO", None)
  962. else:
  963. os.environ["REQUIRE_LOGIN_TO_VIEW_REPO"] = old_req
  964. def test_download_requires_login(self) -> None:
  965. client = self.app.test_client()
  966. from server.core import isoformat, utcnow
  967. from server.db import execute
  968. with self.app.app_context():
  969. now = isoformat(utcnow())
  970. cur = execute(
  971. """
  972. INSERT INTO resources
  973. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  974. VALUES
  975. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  976. """,
  977. ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
  978. )
  979. resource_id = int(cur.lastrowid)
  980. r = client.post(f"/resources/{resource_id}/download", json={"ref": "master"})
  981. self.assertEqual(401, r.status_code)
  982. def test_download_vip_requires_vip(self) -> None:
  983. client = self.app.test_client()
  984. r = client.post("/auth/register", json={"phone": "13900000002", "password": "abc12345"})
  985. self.assertIn(r.status_code, {200, 400})
  986. r = client.post("/auth/login", json={"phone": "13900000002", "password": "abc12345"})
  987. self.assertEqual(200, r.status_code)
  988. from server.core import isoformat, utcnow
  989. from server.db import execute
  990. with self.app.app_context():
  991. now = isoformat(utcnow())
  992. cur = execute(
  993. """
  994. INSERT INTO resources
  995. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  996. VALUES
  997. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  998. """,
  999. ("t", "s", "VIP", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
  1000. )
  1001. resource_id = int(cur.lastrowid)
  1002. r = client.post(f"/resources/{resource_id}/download", json={"ref": "master"})
  1003. self.assertEqual(403, r.status_code)
  1004. self.assertEqual("vip_required", (r.json or {}).get("error"))
  1005. def test_download_uses_git_archive_when_gogs_archive_fails(self) -> None:
  1006. client = self.app.test_client()
  1007. r = client.post("/auth/register", json={"phone": "13900000001", "password": "abc12345"})
  1008. self.assertIn(r.status_code, {200, 400})
  1009. r = client.post("/auth/login", json={"phone": "13900000001", "password": "abc12345"})
  1010. self.assertEqual(200, r.status_code)
  1011. import tempfile
  1012. import server.routes as routesmod
  1013. from requests import Response
  1014. from server.core import isoformat, utcnow
  1015. from server.db import execute
  1016. with self.app.app_context():
  1017. now = isoformat(utcnow())
  1018. cur = execute(
  1019. """
  1020. INSERT INTO resources
  1021. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  1022. VALUES
  1023. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  1024. """,
  1025. ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
  1026. )
  1027. resource_id = int(cur.lastrowid)
  1028. original_archive_get = routesmod.gogs_archive_get
  1029. original_git_archive = routesmod.gogs_git_archive_zip
  1030. original_git_archive_commit = routesmod.gogs_git_archive_zip_commit
  1031. original_resolve = routesmod.gogs_resolve_ref_commit
  1032. try:
  1033. def stub_archive_get(owner, repo, ref):
  1034. resp = Response()
  1035. resp.status_code = 500
  1036. resp.url = "http://gogs.local/api/v1/repos/root/r/archive/master.zip"
  1037. resp._content = b'{"message":"boom"}'
  1038. return resp
  1039. def stub_git_archive_commit(owner, repo, commit):
  1040. f = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
  1041. f.write(b"ZIPDATA")
  1042. f.close()
  1043. return f.name
  1044. routesmod.gogs_archive_get = stub_archive_get
  1045. routesmod.gogs_resolve_ref_commit = lambda owner, repo, ref: {"ok": True, "ref": ref, "commit": "a" * 40, "kind": "branch"}
  1046. routesmod.gogs_git_archive_zip_commit = stub_git_archive_commit
  1047. r = client.post(f"/resources/{resource_id}/download", json={"ref": "master"})
  1048. self.assertEqual(200, r.status_code)
  1049. self.assertTrue(bool((r.json or {}).get("ok")))
  1050. self.assertEqual("a" * 40, (r.json or {}).get("cacheKey"))
  1051. download_url = (r.json or {}).get("downloadUrl")
  1052. self.assertTrue(bool(download_url))
  1053. status_url = (r.json or {}).get("statusUrl")
  1054. self.assertTrue(bool(status_url))
  1055. import time
  1056. for _ in range(200):
  1057. st = client.get(status_url)
  1058. self.assertEqual(200, st.status_code)
  1059. if (st.json or {}).get("ready"):
  1060. break
  1061. if (st.json or {}).get("state") == "error":
  1062. self.fail((st.json or {}).get("error") or "build_failed")
  1063. time.sleep(0.01)
  1064. r2 = client.get(download_url)
  1065. self.assertEqual(200, r2.status_code)
  1066. self.assertEqual("application/zip", r2.headers.get("Content-Type"))
  1067. self.assertTrue((r2.data or b"").startswith(b"ZIPDATA"))
  1068. try:
  1069. r2.close()
  1070. except Exception:
  1071. pass
  1072. finally:
  1073. routesmod.gogs_archive_get = original_archive_get
  1074. routesmod.gogs_git_archive_zip = original_git_archive
  1075. routesmod.gogs_git_archive_zip_commit = original_git_archive_commit
  1076. routesmod.gogs_resolve_ref_commit = original_resolve
  1077. def test_admin_delete_resource_deletes_gogs_repo(self) -> None:
  1078. client = self.app.test_client()
  1079. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  1080. self.assertEqual(200, r.status_code)
  1081. r = client.put("/admin/settings", json={"gogsBaseUrl": "http://gogs.local", "gogsToken": "t"})
  1082. self.assertEqual(200, r.status_code)
  1083. from server.core import isoformat, utcnow
  1084. from server.db import execute, fetch_one
  1085. with self.app.app_context():
  1086. now = isoformat(utcnow())
  1087. cur = execute(
  1088. """
  1089. INSERT INTO resources
  1090. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  1091. VALUES
  1092. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  1093. """,
  1094. ("t", "s", "FREE", "DRAFT", None, "[]", "root", "r", 0, "http://gogs.local/root/r", "master", now, now),
  1095. )
  1096. resource_id = int(cur.lastrowid)
  1097. import server.gogs as gogsmod
  1098. from requests import Response
  1099. calls = []
  1100. original = gogsmod.requests.request
  1101. def stub_request(method, url, params=None, json=None, headers=None, timeout=None):
  1102. calls.append({"method": method, "url": url, "params": params, "json": json, "headers": headers, "timeout": timeout})
  1103. resp = Response()
  1104. resp.url = url
  1105. if method.upper() == "DELETE" and url == "http://gogs.local/api/v1/repos/root/r":
  1106. resp.status_code = 204
  1107. resp._content = b""
  1108. return resp
  1109. resp.status_code = 500
  1110. resp._content = b'{"message":"unexpected"}'
  1111. return resp
  1112. try:
  1113. gogsmod.requests.request = stub_request
  1114. r = client.delete(f"/admin/resources/{resource_id}")
  1115. self.assertEqual(200, r.status_code)
  1116. self.assertTrue(bool(r.json.get("ok")))
  1117. finally:
  1118. gogsmod.requests.request = original
  1119. self.assertTrue(any(c["method"] == "DELETE" and c["url"].endswith("/api/v1/repos/root/r") for c in calls))
  1120. with self.app.app_context():
  1121. row = fetch_one("SELECT id FROM resources WHERE id = ?", (resource_id,))
  1122. self.assertIsNone(row)
  1123. def test_admin_download_cache_clear_all(self) -> None:
  1124. client = self.app.test_client()
  1125. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  1126. self.assertEqual(200, r.status_code)
  1127. from server.core import isoformat, utcnow
  1128. from server.db import execute
  1129. from server.context import get_config
  1130. from pathlib import Path
  1131. import json
  1132. import re
  1133. with self.app.app_context():
  1134. now = isoformat(utcnow())
  1135. cur = execute(
  1136. """
  1137. INSERT INTO resources
  1138. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  1139. VALUES
  1140. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  1141. """,
  1142. ("t", "s", "FREE", "ONLINE", None, "[]", "root", "r", 0, None, "master", now, now),
  1143. )
  1144. resource_id = int(cur.lastrowid)
  1145. cfg = get_config()
  1146. cache_dir = cfg.database_path.parent / "download_cache"
  1147. cache_dir.mkdir(parents=True, exist_ok=True)
  1148. safe_owner = re.sub(r"[^a-zA-Z0-9._-]+", "_", "root")[:50] or "owner"
  1149. safe_repo = re.sub(r"[^a-zA-Z0-9._-]+", "_", "r")[:50] or "repo"
  1150. commit = "b" * 40
  1151. zip_path = Path(cache_dir) / f"res{resource_id}__{safe_owner}__{safe_repo}__{commit[:24]}.zip"
  1152. zip_path.write_bytes(b"ZIPDATA")
  1153. meta_path = zip_path.with_suffix(zip_path.suffix + ".meta.json")
  1154. meta_path.write_text(json.dumps({"owner": "root", "repo": "r", "ref": "master", "commit": commit}, ensure_ascii=False), encoding="utf-8")
  1155. r = client.get(f"/admin/resources/{resource_id}/download-cache/summary")
  1156. self.assertEqual(200, r.status_code)
  1157. self.assertEqual(1, int((r.json or {}).get("count") or 0))
  1158. r = client.delete(f"/admin/resources/{resource_id}/download-cache?all=1")
  1159. self.assertEqual(200, r.status_code)
  1160. self.assertEqual(1, int((r.json or {}).get("removed") or 0))
  1161. with self.app.app_context():
  1162. self.assertFalse(zip_path.exists())
  1163. self.assertFalse(meta_path.exists())
  1164. r = client.get(f"/admin/resources/{resource_id}/download-cache/summary")
  1165. self.assertEqual(200, r.status_code)
  1166. self.assertEqual(0, int((r.json or {}).get("count") or 0))
  1167. def test_admin_delete_resource_deletes_uploaded_files(self) -> None:
  1168. client = self.app.test_client()
  1169. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  1170. self.assertEqual(200, r.status_code)
  1171. r = client.put("/admin/settings", json={"gogsBaseUrl": "http://gogs.local", "gogsToken": "t"})
  1172. self.assertEqual(200, r.status_code)
  1173. from pathlib import Path
  1174. from server.core import isoformat, utcnow
  1175. from server.db import execute
  1176. project_root = Path(__file__).resolve().parents[1]
  1177. uploads_dir = project_root / "static" / "uploads"
  1178. uploads_dir.mkdir(parents=True, exist_ok=True)
  1179. name1 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.png"
  1180. name2 = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.webp"
  1181. p1 = uploads_dir / name1
  1182. p2 = uploads_dir / name2
  1183. p1.write_bytes(b"x")
  1184. p2.write_bytes(b"y")
  1185. with self.app.app_context():
  1186. now = isoformat(utcnow())
  1187. cur = execute(
  1188. """
  1189. INSERT INTO resources
  1190. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  1191. VALUES
  1192. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  1193. """,
  1194. (
  1195. "t",
  1196. f"img1: /static/uploads/{name2}",
  1197. "FREE",
  1198. "DRAFT",
  1199. f"/static/uploads/{name1}",
  1200. "[]",
  1201. "root",
  1202. "r",
  1203. 0,
  1204. "http://gogs.local/root/r",
  1205. "master",
  1206. now,
  1207. now,
  1208. ),
  1209. )
  1210. resource_id = int(cur.lastrowid)
  1211. import server.gogs as gogsmod
  1212. from requests import Response
  1213. original = gogsmod.requests.request
  1214. def stub_request(method, url, params=None, json=None, headers=None, timeout=None):
  1215. resp = Response()
  1216. resp.url = url
  1217. if method.upper() == "DELETE" and url.endswith("/api/v1/repos/root/r"):
  1218. resp.status_code = 204
  1219. resp._content = b""
  1220. return resp
  1221. resp.status_code = 500
  1222. resp._content = b'{"message":"unexpected"}'
  1223. return resp
  1224. try:
  1225. gogsmod.requests.request = stub_request
  1226. r = client.delete(f"/admin/resources/{resource_id}")
  1227. self.assertEqual(200, r.status_code)
  1228. self.assertTrue(bool(r.json.get("ok")))
  1229. finally:
  1230. gogsmod.requests.request = original
  1231. self.assertFalse(p1.exists())
  1232. self.assertFalse(p2.exists())
  1233. def test_admin_delete_resource_aborts_when_gogs_delete_fails(self) -> None:
  1234. client = self.app.test_client()
  1235. r = client.post("/admin/auth/login", json={"username": "admin", "password": "admin123"})
  1236. self.assertEqual(200, r.status_code)
  1237. r = client.put("/admin/settings", json={"gogsBaseUrl": "http://gogs.local", "gogsToken": "t"})
  1238. self.assertEqual(200, r.status_code)
  1239. from server.core import isoformat, utcnow
  1240. from server.db import execute, fetch_one
  1241. with self.app.app_context():
  1242. now = isoformat(utcnow())
  1243. cur = execute(
  1244. """
  1245. INSERT INTO resources
  1246. (title, summary, type, status, cover_url, tags_json, repo_owner, repo_name, repo_private, repo_html_url, default_ref, created_at, updated_at)
  1247. VALUES
  1248. (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  1249. """,
  1250. ("t", "s", "FREE", "DRAFT", None, "[]", "root", "r2", 0, "http://gogs.local/root/r2", "master", now, now),
  1251. )
  1252. resource_id = int(cur.lastrowid)
  1253. import server.gogs as gogsmod
  1254. from requests import Response
  1255. original = gogsmod.requests.request
  1256. def stub_request(method, url, params=None, json=None, headers=None, timeout=None):
  1257. resp = Response()
  1258. resp.url = url
  1259. if method.upper() == "DELETE" and url == "http://gogs.local/api/v1/repos/root/r2":
  1260. resp.status_code = 403
  1261. resp._content = b'{"message":"forbidden"}'
  1262. resp.headers["Content-Type"] = "application/json"
  1263. return resp
  1264. resp.status_code = 500
  1265. resp._content = b'{"message":"unexpected"}'
  1266. return resp
  1267. try:
  1268. gogsmod.requests.request = stub_request
  1269. r = client.delete(f"/admin/resources/{resource_id}")
  1270. self.assertEqual(400, r.status_code)
  1271. self.assertEqual("gogs_unauthorized", r.json.get("error"))
  1272. finally:
  1273. gogsmod.requests.request = original
  1274. with self.app.app_context():
  1275. row = fetch_one("SELECT id FROM resources WHERE id = ?", (resource_id,))
  1276. self.assertIsNotNone(row)
  1277. if __name__ == "__main__":
  1278. unittest.main()