gogs.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. import base64
  2. import hashlib
  3. import inspect
  4. import os
  5. import re
  6. import shutil
  7. import subprocess
  8. import tempfile
  9. import threading
  10. import time
  11. from pathlib import Path
  12. from typing import Any, Optional
  13. from urllib.parse import quote, urlsplit, urlunsplit
  14. import requests
  15. from .context import get_config
  16. from .settings import get_setting_value
  17. _cache_lock = threading.Lock()
  18. _mem_cache: dict[str, tuple[float, bytes]] = {}
  19. _redis_lock = threading.Lock()
  20. _redis_client = None
  21. def _get_redis_url() -> str:
  22. v = get_setting_value("REDIS_URL")
  23. if v is not None and v.strip():
  24. return v.strip()
  25. return (os.environ.get("REDIS_URL") or "").strip()
  26. def _get_redis_client():
  27. global _redis_client
  28. with _redis_lock:
  29. if _redis_client is False:
  30. _redis_client = None
  31. if _redis_client is not None:
  32. return _redis_client
  33. url = _get_redis_url()
  34. if not url:
  35. _redis_client = False
  36. return None
  37. try:
  38. import redis # type: ignore
  39. except Exception:
  40. return None
  41. try:
  42. _redis_client = redis.Redis.from_url(
  43. url,
  44. socket_connect_timeout=1,
  45. socket_timeout=2,
  46. decode_responses=False,
  47. )
  48. except Exception:
  49. return None
  50. return _redis_client
  51. def _cache_ttl_seconds(path: str) -> int:
  52. p = (path or "").lower()
  53. if "/branches" in p or "/tags" in p:
  54. return 30
  55. if "/commits" in p:
  56. return 10
  57. if "/contents" in p:
  58. return 10
  59. if p.startswith("/api/v1/repos/"):
  60. return 60
  61. return 15
  62. def _make_cache_key(*, method: str, base_url: str, path: str, params: dict[str, Any] | None, token: str) -> str:
  63. items = []
  64. for k, v in sorted((params or {}).items(), key=lambda kv: kv[0]):
  65. items.append(f"{k}={v}")
  66. qs = "&".join(items)
  67. token_tag = hashlib.sha256((token or "").encode("utf-8")).hexdigest()[:12] if token else "-"
  68. return f"gogs:{method.upper()}:{base_url}:{path}?{qs}:tk={token_tag}"
  69. def _cache_get(key: str) -> bytes | None:
  70. now = time.time()
  71. with _cache_lock:
  72. hit = _mem_cache.get(key)
  73. if hit is not None:
  74. exp, payload = hit
  75. if exp > now:
  76. return payload
  77. _mem_cache.pop(key, None)
  78. r = _get_redis_client()
  79. if r is None:
  80. return None
  81. try:
  82. raw = r.get(key)
  83. return raw if isinstance(raw, (bytes, bytearray)) else None
  84. except Exception:
  85. return None
  86. def _cache_set(key: str, payload: bytes, ttl_seconds: int) -> None:
  87. now = time.time()
  88. exp = now + max(1, int(ttl_seconds or 0))
  89. with _cache_lock:
  90. _mem_cache[key] = (exp, payload)
  91. if len(_mem_cache) > 2048:
  92. for k in [k for k, (e, _p) in _mem_cache.items() if e <= now][:256]:
  93. _mem_cache.pop(k, None)
  94. while len(_mem_cache) > 2048:
  95. _mem_cache.pop(next(iter(_mem_cache)), None)
  96. r = _get_redis_client()
  97. if r is None:
  98. return
  99. try:
  100. r.setex(key, max(1, int(ttl_seconds or 0)), payload)
  101. except Exception:
  102. return
  103. def _gogs_base_url_and_token() -> tuple[str, str]:
  104. config = get_config()
  105. base_url = (get_setting_value("GOGS_BASE_URL") or config.gogs_base_url).rstrip("/")
  106. token = get_setting_value("GOGS_TOKEN")
  107. if token is not None:
  108. token = token.strip() or None
  109. if token is None:
  110. token = config.gogs_token
  111. return base_url, (token or "").strip()
  112. def gogs_api_request(
  113. method: str,
  114. path: str,
  115. *,
  116. params: Optional[dict[str, Any]] = None,
  117. json: Optional[dict[str, Any]] = None,
  118. timeout: int = 6,
  119. ) -> requests.Response:
  120. base_url, token = _gogs_base_url_and_token()
  121. parts = urlsplit(base_url)
  122. if parts.scheme not in {"http", "https"} or not parts.netloc or parts.username or parts.password:
  123. resp = requests.Response()
  124. resp.status_code = 599
  125. resp.url = base_url
  126. resp._content = b""
  127. return resp
  128. url = f"{base_url}{path}"
  129. merged = dict(params or {})
  130. headers: dict[str, str] = {}
  131. if token:
  132. headers["Authorization"] = f"token {token}"
  133. if method.upper() == "GET":
  134. merged.setdefault("token", token)
  135. is_cacheable_get = method.upper() == "GET"
  136. cache_key = None
  137. ttl = 0
  138. if is_cacheable_get:
  139. cache_key = _make_cache_key(method="GET", base_url=base_url, path=path, params=merged, token=token)
  140. ttl = _cache_ttl_seconds(path)
  141. raw = _cache_get(cache_key)
  142. if raw:
  143. nl = raw.find(b"\n")
  144. code = 200
  145. body = raw
  146. if nl > 0:
  147. try:
  148. code = int(raw[:nl].decode("ascii", errors="ignore") or "200")
  149. body = raw[nl + 1 :]
  150. except Exception:
  151. code = 200
  152. body = raw
  153. cached = requests.Response()
  154. cached.status_code = code
  155. cached.url = url
  156. cached._content = body
  157. return cached
  158. def _safe_request(**kwargs) -> requests.Response:
  159. try:
  160. sig = inspect.signature(requests.request)
  161. has_varkw = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values())
  162. if "allow_redirects" in sig.parameters or has_varkw:
  163. kwargs.setdefault("allow_redirects", False)
  164. else:
  165. kwargs.pop("allow_redirects", None)
  166. except Exception:
  167. kwargs.pop("allow_redirects", None)
  168. return requests.request(**kwargs)
  169. try:
  170. resp = _safe_request(method=method.upper(), url=url, params=merged or None, json=json, headers=headers or None, timeout=timeout, allow_redirects=False)
  171. if token and method.upper() != "GET" and resp.status_code in {401, 403, 500}:
  172. merged2 = dict(merged)
  173. merged2.setdefault("token", token)
  174. return _safe_request(method=method.upper(), url=url, params=merged2 or None, json=json, headers=headers or None, timeout=timeout, allow_redirects=False)
  175. if is_cacheable_get and cache_key and int(resp.status_code) == 200:
  176. try:
  177. body = resp.content or b""
  178. if body:
  179. payload = f"{int(resp.status_code)}\n".encode("ascii") + body
  180. _cache_set(cache_key, payload, ttl)
  181. except Exception:
  182. pass
  183. return resp
  184. except requests.RequestException:
  185. resp = requests.Response()
  186. resp.status_code = 599
  187. resp.url = url
  188. resp._content = b""
  189. return resp
  190. def gogs_api_get(path: str, params: Optional[dict[str, Any]] = None) -> requests.Response:
  191. return gogs_api_request("GET", path, params=params)
  192. def gogs_create_repo(owner: str, name: str, description: str, private: bool) -> requests.Response:
  193. payload = {"name": name, "description": description, "private": bool(private)}
  194. if owner:
  195. org_resp = gogs_api_request("POST", f"/api/v1/org/{quote(owner)}/repos", json=payload, timeout=30)
  196. if org_resp.status_code < 400:
  197. return org_resp
  198. if org_resp.status_code in {403, 404, 500}:
  199. admin_resp = gogs_api_request("POST", f"/api/v1/admin/users/{quote(owner)}/repos", json=payload, timeout=30)
  200. if admin_resp.status_code < 400:
  201. return admin_resp
  202. user_resp = gogs_api_request("POST", "/api/v1/user/repos", json=payload, timeout=30)
  203. if user_resp.status_code < 400:
  204. return user_resp
  205. return org_resp
  206. return gogs_api_request("POST", "/api/v1/user/repos", json=payload, timeout=30)
  207. def gogs_get_content(owner: str, repo: str, path: str, ref: str) -> requests.Response:
  208. path = path.lstrip("/")
  209. if path:
  210. api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{quote(path)}"
  211. else:
  212. api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents"
  213. return gogs_api_get(api_path, params={"ref": ref})
  214. def gogs_upsert_file(owner: str, repo: str, path: str, *, content_text: str, message: str, branch: str) -> requests.Response:
  215. path = path.lstrip("/")
  216. api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{quote(path)}"
  217. content_b64 = base64.b64encode(content_text.encode("utf-8")).decode("ascii")
  218. create_payload = {"content": content_b64, "message": message, "branch": branch}
  219. create_resp = gogs_api_request("POST", api_path, json=create_payload, timeout=30)
  220. if create_resp.status_code < 400:
  221. return create_resp
  222. existing = gogs_api_get(api_path, params={"ref": branch})
  223. if existing.status_code >= 400:
  224. return create_resp
  225. data = existing.json() or {}
  226. sha = (data.get("sha") or "").strip()
  227. if not sha:
  228. return create_resp
  229. update_payload = {"content": content_b64, "message": message, "sha": sha, "branch": branch}
  230. return gogs_api_request("PUT", api_path, json=update_payload, timeout=30)
  231. def gogs_contents(owner: str, repo: str, path: str, ref: str) -> requests.Response:
  232. return gogs_get_content(owner, repo, path, ref)
  233. def gogs_repo_info(owner: str, repo: str) -> requests.Response:
  234. api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}"
  235. return gogs_api_get(api_path)
  236. def gogs_delete_repo(owner: str, repo: str) -> requests.Response:
  237. api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}"
  238. return gogs_api_request("DELETE", api_path, timeout=30)
  239. def gogs_user_repos(username: str) -> requests.Response:
  240. api_path = f"/api/v1/users/{quote(username)}/repos"
  241. return gogs_api_get(api_path)
  242. def gogs_my_repos() -> requests.Response:
  243. api_path = "/api/v1/user/repos"
  244. return gogs_api_get(api_path)
  245. def gogs_branches(owner: str, repo: str) -> requests.Response:
  246. api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/branches"
  247. return gogs_api_get(api_path)
  248. def gogs_tags(owner: str, repo: str) -> requests.Response:
  249. api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/tags"
  250. return gogs_api_get(api_path)
  251. def gogs_commits(owner: str, repo: str, *, ref: str, path: str, limit: int) -> requests.Response:
  252. api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/commits"
  253. params: dict[str, Any] = {}
  254. if ref:
  255. params["sha"] = ref
  256. if path:
  257. params["path"] = path
  258. params["page"] = 1
  259. params["limit"] = max(1, min(int(limit or 0), 50))
  260. return gogs_api_get(api_path, params=params)
  261. def gogs_archive_get(owner: str, repo: str, ref: str) -> requests.Response:
  262. base_url, token = _gogs_base_url_and_token()
  263. parts = urlsplit(base_url)
  264. if parts.scheme not in {"http", "https"} or not parts.netloc or parts.username or parts.password:
  265. resp = requests.Response()
  266. resp.status_code = 599
  267. resp.url = base_url
  268. resp._content = b""
  269. return resp
  270. url = f"{base_url}/api/v1/repos/{quote(owner)}/{quote(repo)}/archive/{quote(ref)}.zip"
  271. params: dict[str, Any] = {}
  272. if token:
  273. params["token"] = token
  274. try:
  275. return requests.get(url, params=params or None, stream=True, timeout=60, allow_redirects=False)
  276. except requests.RequestException:
  277. resp = requests.Response()
  278. resp.status_code = 599
  279. resp.url = url
  280. resp._content = b""
  281. return resp
  282. def gogs_archive_url(owner: str, repo: str, ref: str) -> str:
  283. base_url, _token = _gogs_base_url_and_token()
  284. return f"{base_url}/api/v1/repos/{quote(owner)}/{quote(repo)}/archive/{quote(ref)}.zip"
  285. class GogsGitError(Exception):
  286. def __init__(self, code: str, message: str) -> None:
  287. super().__init__(message)
  288. self.code = code
  289. self.message = message
  290. def _git_remote_url(owner: str, repo: str) -> str:
  291. base_url, _token = _gogs_base_url_and_token()
  292. return f"{base_url}/{quote(owner)}/{quote(repo)}.git"
  293. def _git_remote_url_with_token(owner: str, repo: str) -> str:
  294. base_url, token = _gogs_base_url_and_token()
  295. if not token:
  296. raise GogsGitError("gogs_token_required", "Gogs token is required for git operations")
  297. username = (owner or "").strip() or "root"
  298. parts = urlsplit(base_url)
  299. if not parts.scheme or not parts.netloc:
  300. raise GogsGitError("invalid_gogs_base_url", "Invalid Gogs base URL")
  301. auth = f"{quote(username, safe='')}:{quote(token, safe='')}"
  302. netloc = f"{auth}@{parts.netloc}"
  303. authed_base = urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment)).rstrip("/")
  304. return f"{authed_base}/{quote(owner)}/{quote(repo)}.git"
  305. def _run_git(args: list[str], cwd: Path) -> tuple[int, str, str]:
  306. env = dict(os.environ)
  307. env["GIT_TERMINAL_PROMPT"] = "0"
  308. try:
  309. proc = subprocess.run(args, cwd=str(cwd), env=env, capture_output=True)
  310. except FileNotFoundError:
  311. raise GogsGitError("git_not_found", "git is required on server PATH")
  312. def _decode(b: bytes | None) -> str:
  313. raw = b or b""
  314. if not raw:
  315. return ""
  316. for enc in ("utf-8", "gb18030"):
  317. try:
  318. return raw.decode(enc)
  319. except UnicodeDecodeError:
  320. pass
  321. return raw.decode("utf-8", errors="replace")
  322. return int(proc.returncode), _decode(proc.stdout), _decode(proc.stderr)
  323. def _scrub_git_error(err: str, token: str) -> str:
  324. text = err or ""
  325. if token:
  326. text = text.replace(token, "***")
  327. text = re.sub(r":([^:@\s]+)@", r":***@", text)
  328. return text
  329. def _git_branch_exists(remote_url: str, branch: str) -> bool:
  330. with tempfile.TemporaryDirectory() as d:
  331. code, out, _err = _run_git(["git", "ls-remote", "--heads", remote_url, branch], Path(d))
  332. return code == 0 and bool((out or "").strip())
  333. def gogs_git_list_refs(owner: str, repo: str, *, limit: int = 200) -> dict[str, Any]:
  334. limit = max(1, min(int(limit or 0), 500))
  335. _base, token = _gogs_base_url_and_token()
  336. remote = _git_remote_url_with_token(owner, repo)
  337. with tempfile.TemporaryDirectory() as d:
  338. code, out, err = _run_git(["git", "ls-remote", "--heads", remote], Path(d))
  339. if code != 0:
  340. raise GogsGitError("git_ls_remote_failed", (_scrub_git_error(err or "git ls-remote failed", token))[:500])
  341. branches: list[str] = []
  342. for line in (out or "").splitlines():
  343. parts = (line or "").split("\t", 1)
  344. if len(parts) != 2:
  345. continue
  346. ref = parts[1].strip()
  347. if not ref.startswith("refs/heads/"):
  348. continue
  349. name = ref[len("refs/heads/") :].strip()
  350. if name:
  351. branches.append(name)
  352. branches = sorted(set(branches))[:limit]
  353. code, out, err = _run_git(["git", "ls-remote", "--tags", remote], Path(d))
  354. if code != 0:
  355. raise GogsGitError("git_ls_remote_failed", (_scrub_git_error(err or "git ls-remote failed", token))[:500])
  356. tags: list[str] = []
  357. for line in (out or "").splitlines():
  358. parts = (line or "").split("\t", 1)
  359. if len(parts) != 2:
  360. continue
  361. ref = parts[1].strip()
  362. if not ref.startswith("refs/tags/"):
  363. continue
  364. name = ref[len("refs/tags/") :].strip()
  365. if not name or name.endswith("^{}"):
  366. continue
  367. tags.append(name)
  368. tags = sorted(set(tags))[:limit]
  369. return {"branches": [{"name": n} for n in branches], "tags": [{"name": n} for n in tags]}
  370. def _looks_like_commit(s: str) -> bool:
  371. t = (s or "").strip()
  372. if len(t) < 7 or len(t) > 40:
  373. return False
  374. return bool(re.fullmatch(r"[0-9a-fA-F]{7,40}", t))
  375. def _git_ls_remote_resolve(remote_url: str, ref: str) -> str | None:
  376. ref = (ref or "").strip()
  377. if not ref:
  378. return None
  379. with tempfile.TemporaryDirectory() as d:
  380. code, out, _err = _run_git(["git", "ls-remote", remote_url, ref], Path(d))
  381. if code != 0:
  382. return None
  383. best = None
  384. for line in (out or "").splitlines():
  385. parts = (line or "").split("\t", 1)
  386. if len(parts) != 2:
  387. continue
  388. sha = (parts[0] or "").strip()
  389. name = (parts[1] or "").strip()
  390. if not sha:
  391. continue
  392. if name.endswith("^{}"):
  393. return sha
  394. if best is None:
  395. best = sha
  396. return best
  397. def gogs_resolve_ref_commit(owner: str, repo: str, ref: str) -> dict[str, Any]:
  398. ref = (ref or "").strip() or "HEAD"
  399. if _looks_like_commit(ref):
  400. return {"ok": True, "ref": ref, "commit": ref.lower(), "kind": "commit"}
  401. api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/branches/{quote(ref)}"
  402. resp = gogs_api_get(api_path)
  403. if resp.status_code < 400:
  404. try:
  405. data = resp.json() or {}
  406. except Exception:
  407. data = {}
  408. commit = None
  409. if isinstance(data, dict):
  410. c = data.get("commit")
  411. if isinstance(c, dict):
  412. commit = (c.get("id") or c.get("sha") or c.get("commit_id") or "").strip() or None
  413. if commit is None:
  414. commit = (data.get("commit") or data.get("sha") or data.get("id") or "").strip() or None
  415. if commit and _looks_like_commit(commit):
  416. return {"ok": True, "ref": ref, "commit": commit.lower(), "kind": "branch"}
  417. _base, token = _gogs_base_url_and_token()
  418. remote = _git_remote_url_with_token(owner, repo) if token else _git_remote_url(owner, repo)
  419. sha = _git_ls_remote_resolve(remote, f"refs/heads/{ref}")
  420. if sha and _looks_like_commit(sha):
  421. return {"ok": True, "ref": ref, "commit": sha.lower(), "kind": "branch"}
  422. sha2 = _git_ls_remote_resolve(remote, f"refs/tags/{ref}^{{}}") or _git_ls_remote_resolve(remote, f"refs/tags/{ref}")
  423. if sha2 and _looks_like_commit(sha2):
  424. return {"ok": True, "ref": ref, "commit": sha2.lower(), "kind": "tag"}
  425. return {"ok": False, "ref": ref, "commit": None, "kind": "unknown"}
  426. def gogs_git_archive_zip_commit(owner: str, repo: str, commit: str) -> str:
  427. commit = (commit or "").strip()
  428. if not _looks_like_commit(commit):
  429. raise GogsGitError("invalid_commit", "Invalid commit")
  430. _base, token = _gogs_base_url_and_token()
  431. remote = _git_remote_url_with_token(owner, repo) if token else _git_remote_url(owner, repo)
  432. tmpdir = tempfile.mkdtemp()
  433. zip_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
  434. zip_path = zip_file.name
  435. zip_file.close()
  436. try:
  437. repo_dir = Path(tmpdir) / "repo"
  438. repo_dir.mkdir(parents=True, exist_ok=True)
  439. code, _out, err = _run_git(["git", "init"], repo_dir)
  440. if code != 0:
  441. raise GogsGitError("git_init_failed", (_scrub_git_error(err or "git init failed", token))[:500])
  442. code, _out, err = _run_git(["git", "remote", "add", "origin", remote], repo_dir)
  443. if code != 0:
  444. raise GogsGitError("git_remote_failed", (_scrub_git_error(err or "git remote failed", token))[:500])
  445. code, _out, err = _run_git(["git", "fetch", "--depth", "1", "origin", commit], repo_dir)
  446. if code != 0:
  447. raise GogsGitError("git_fetch_failed", (_scrub_git_error(err or "git fetch failed", token))[:500])
  448. code, _out, err = _run_git(["git", "checkout", "--detach", "FETCH_HEAD"], repo_dir)
  449. if code != 0:
  450. raise GogsGitError("git_checkout_failed", (_scrub_git_error(err or "git checkout failed", token))[:500])
  451. code, _out, err = _run_git(["git", "archive", "--format=zip", "-o", zip_path, "HEAD"], repo_dir)
  452. if code != 0:
  453. msg = (err or "git archive failed").lower()
  454. if "not a valid object name" in msg or "unknown revision" in msg or "bad object" in msg:
  455. raise GogsGitError("empty_repo", "Repository has no commits")
  456. raise GogsGitError("git_archive_failed", (_scrub_git_error(err or "git archive failed", token))[:500])
  457. return zip_path
  458. except Exception:
  459. try:
  460. os.unlink(zip_path)
  461. except Exception:
  462. pass
  463. raise
  464. finally:
  465. try:
  466. shutil.rmtree(tmpdir, ignore_errors=True)
  467. except Exception:
  468. pass
  469. def gogs_git_archive_zip(owner: str, repo: str, ref: str) -> str:
  470. ref = (ref or "").strip() or "HEAD"
  471. base_url, token = _gogs_base_url_and_token()
  472. _ = base_url
  473. remote = _git_remote_url_with_token(owner, repo) if token else _git_remote_url(owner, repo)
  474. tmpdir = tempfile.mkdtemp()
  475. zip_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
  476. zip_path = zip_file.name
  477. zip_file.close()
  478. try:
  479. repo_dir = Path(tmpdir) / "repo"
  480. clone_args = ["git", "clone", "--depth", "1"]
  481. if ref != "HEAD":
  482. clone_args.extend(["--branch", ref])
  483. clone_args.extend([remote, str(repo_dir)])
  484. code, _out, err = _run_git(clone_args, Path(tmpdir))
  485. if code != 0:
  486. code2, _out2, err2 = _run_git(["git", "clone", "--depth", "1", remote, str(repo_dir)], Path(tmpdir))
  487. if code2 != 0:
  488. raise GogsGitError("git_clone_failed", (_scrub_git_error(err2 or "git clone failed", token))[:500])
  489. if ref != "HEAD":
  490. code3, _out3, err3 = _run_git(["git", "checkout", ref], repo_dir)
  491. if code3 != 0:
  492. raise GogsGitError("branch_not_found", (_scrub_git_error(err3 or "ref not found", token))[:500])
  493. code4, _out4, err4 = _run_git(["git", "archive", "--format=zip", "-o", zip_path, "HEAD"], repo_dir)
  494. if code4 != 0:
  495. msg = (err4 or "git archive failed").lower()
  496. if "not a valid object name" in msg or "unknown revision" in msg or "bad object" in msg:
  497. raise GogsGitError("empty_repo", "Repository has no commits")
  498. raise GogsGitError("git_archive_failed", (_scrub_git_error(err4 or "git archive failed", token))[:500])
  499. return zip_path
  500. except Exception:
  501. try:
  502. os.unlink(zip_path)
  503. except Exception:
  504. pass
  505. raise
  506. finally:
  507. try:
  508. shutil.rmtree(tmpdir, ignore_errors=True)
  509. except Exception:
  510. pass
  511. def gogs_git_write_file(owner: str, repo: str, branch: str, path: str, content_text: str, message: str, *, must_create: bool) -> dict[str, str]:
  512. branch = (branch or "").strip()
  513. if not branch:
  514. raise GogsGitError("ref_required", "Branch is required")
  515. _base, token = _gogs_base_url_and_token()
  516. remote = _git_remote_url_with_token(owner, repo)
  517. exists = _git_branch_exists(remote, branch)
  518. with tempfile.TemporaryDirectory() as d:
  519. repo_dir = Path(d) / "repo"
  520. if exists:
  521. code, _out, err = _run_git(["git", "clone", "--depth", "1", "--branch", branch, remote, str(repo_dir)], Path(d))
  522. if code != 0:
  523. raise GogsGitError("git_clone_failed", (_scrub_git_error(err or "git clone failed", token))[:500])
  524. else:
  525. code, _out, err = _run_git(["git", "clone", remote, str(repo_dir)], Path(d))
  526. if code != 0:
  527. raise GogsGitError("git_clone_failed", (_scrub_git_error(err or "git clone failed", token))[:500])
  528. code, _out, err = _run_git(["git", "checkout", "--orphan", branch], repo_dir)
  529. if code != 0:
  530. raise GogsGitError("git_checkout_failed", (_scrub_git_error(err or "git checkout failed", token))[:500])
  531. rel = Path(path)
  532. full = repo_dir / rel
  533. if must_create and full.exists():
  534. raise GogsGitError("file_exists", "File already exists")
  535. if (not must_create) and (not full.exists()):
  536. raise GogsGitError("file_not_found", "File not found")
  537. full.parent.mkdir(parents=True, exist_ok=True)
  538. full.write_text(content_text, encoding="utf-8")
  539. rel_posix = rel.as_posix()
  540. code, _out, err = _run_git(["git", "add", rel_posix], repo_dir)
  541. if code != 0:
  542. raise GogsGitError("git_add_failed", (_scrub_git_error(err or "git add failed", token))[:500])
  543. msg = (message or "").strip() or f"Update {rel_posix}"
  544. code, _out, err = _run_git(["git", "-c", "user.name=SourceShare", "-c", "user.email=source@local", "commit", "-m", msg], repo_dir)
  545. if code != 0:
  546. raise GogsGitError("git_commit_failed", (_scrub_git_error(err or "git commit failed", token))[:500])
  547. code, out, err = _run_git(["git", "rev-parse", "HEAD"], repo_dir)
  548. if code != 0:
  549. raise GogsGitError("git_rev_parse_failed", (_scrub_git_error(err or "git rev-parse failed", token))[:500])
  550. commit = (out or "").strip()
  551. code, _out, err = _run_git(["git", "push", remote, f"HEAD:{branch}"], repo_dir)
  552. if code != 0:
  553. raise GogsGitError("git_push_failed", (_scrub_git_error(err or "git push failed", token))[:500])
  554. return {"branch": branch, "commit": commit}
  555. def gogs_git_delete_path(owner: str, repo: str, branch: str, path: str, message: str) -> dict[str, str]:
  556. branch = (branch or "").strip()
  557. if not branch:
  558. raise GogsGitError("ref_required", "Branch is required")
  559. _base, token = _gogs_base_url_and_token()
  560. remote = _git_remote_url_with_token(owner, repo)
  561. exists = _git_branch_exists(remote, branch)
  562. if not exists:
  563. raise GogsGitError("branch_not_found", "Branch not found")
  564. with tempfile.TemporaryDirectory() as d:
  565. repo_dir = Path(d) / "repo"
  566. code, _out, err = _run_git(["git", "clone", "--depth", "1", "--branch", branch, remote, str(repo_dir)], Path(d))
  567. if code != 0:
  568. raise GogsGitError("git_clone_failed", (_scrub_git_error(err or "git clone failed", token))[:500])
  569. rel = Path(path)
  570. rel_posix = rel.as_posix()
  571. full = repo_dir / rel
  572. if not full.exists():
  573. raise GogsGitError("path_not_found", "Path not found")
  574. code, _out, err = _run_git(["git", "rm", "-r", "--", rel_posix], repo_dir)
  575. if code != 0:
  576. raise GogsGitError("git_rm_failed", (_scrub_git_error(err or "git rm failed", token))[:500])
  577. msg = (message or "").strip() or f"Delete {rel_posix}"
  578. code, _out, err = _run_git(["git", "-c", "user.name=SourceShare", "-c", "user.email=source@local", "commit", "-m", msg], repo_dir)
  579. if code != 0:
  580. raise GogsGitError("git_commit_failed", (_scrub_git_error(err or "git commit failed", token))[:500])
  581. code, out, err = _run_git(["git", "rev-parse", "HEAD"], repo_dir)
  582. if code != 0:
  583. raise GogsGitError("git_rev_parse_failed", (_scrub_git_error(err or "git rev-parse failed", token))[:500])
  584. commit = (out or "").strip()
  585. code, _out, err = _run_git(["git", "push", remote, f"HEAD:{branch}"], repo_dir)
  586. if code != 0:
  587. raise GogsGitError("git_push_failed", (_scrub_git_error(err or "git push failed", token))[:500])
  588. return {"branch": branch, "commit": commit}
  589. def gogs_git_log(owner: str, repo: str, ref: str, path: str, limit: int) -> dict[str, Any]:
  590. ref = (ref or "").strip()
  591. if not ref:
  592. raise GogsGitError("ref_required", "Ref is required")
  593. limit = max(1, min(int(limit or 0), 50))
  594. _base, token = _gogs_base_url_and_token()
  595. remote = _git_remote_url_with_token(owner, repo) if token else _git_remote_url(owner, repo)
  596. depth = str(max(limit, 50))
  597. with tempfile.TemporaryDirectory() as d:
  598. repo_dir = Path(d) / "repo"
  599. code, _out, err = _run_git(["git", "clone", "--depth", depth, "--branch", ref, remote, str(repo_dir)], Path(d))
  600. if code != 0:
  601. raise GogsGitError("git_clone_failed", (_scrub_git_error(err or "git clone failed", token))[:500])
  602. args = ["git", "log", "-n", str(limit), "--date=iso-strict", "--pretty=format:%H%x1f%an%x1f%ad%x1f%s"]
  603. if path:
  604. args.extend(["--", Path(path).as_posix()])
  605. code, out, err = _run_git(args, repo_dir)
  606. if code != 0:
  607. raise GogsGitError("git_log_failed", (_scrub_git_error(err or "git log failed", token))[:500])
  608. items = []
  609. for line in (out or "").splitlines():
  610. parts = line.split("\x1f")
  611. if len(parts) < 4:
  612. continue
  613. sha, author_name, author_date, subject = parts[0], parts[1], parts[2], "\x1f".join(parts[3:])
  614. items.append({"sha": sha, "authorName": author_name, "authorDate": author_date, "subject": subject})
  615. return {"ref": ref, "path": path or "", "items": items}