| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714 |
- import base64
- import hashlib
- import inspect
- import os
- import re
- import shutil
- import subprocess
- import tempfile
- import threading
- import time
- from pathlib import Path
- from typing import Any, Optional
- from urllib.parse import quote, urlsplit, urlunsplit
- import requests
- from .context import get_config
- from .settings import get_setting_value
- _cache_lock = threading.Lock()
- _mem_cache: dict[str, tuple[float, bytes]] = {}
- _redis_lock = threading.Lock()
- _redis_client = None
- def _get_redis_url() -> str:
- v = get_setting_value("REDIS_URL")
- if v is not None and v.strip():
- return v.strip()
- return (os.environ.get("REDIS_URL") or "").strip()
- def _get_redis_client():
- global _redis_client
- with _redis_lock:
- if _redis_client is False:
- _redis_client = None
- if _redis_client is not None:
- return _redis_client
- url = _get_redis_url()
- if not url:
- _redis_client = False
- return None
- try:
- import redis # type: ignore
- except Exception:
- return None
- try:
- _redis_client = redis.Redis.from_url(
- url,
- socket_connect_timeout=1,
- socket_timeout=2,
- decode_responses=False,
- )
- except Exception:
- return None
- return _redis_client
- def _cache_ttl_seconds(path: str) -> int:
- p = (path or "").lower()
- if "/branches" in p or "/tags" in p:
- return 30
- if "/commits" in p:
- return 10
- if "/contents" in p:
- return 10
- if p.startswith("/api/v1/repos/"):
- return 60
- return 15
- def _make_cache_key(*, method: str, base_url: str, path: str, params: dict[str, Any] | None, token: str) -> str:
- items = []
- for k, v in sorted((params or {}).items(), key=lambda kv: kv[0]):
- items.append(f"{k}={v}")
- qs = "&".join(items)
- token_tag = hashlib.sha256((token or "").encode("utf-8")).hexdigest()[:12] if token else "-"
- return f"gogs:{method.upper()}:{base_url}:{path}?{qs}:tk={token_tag}"
- def _cache_get(key: str) -> bytes | None:
- now = time.time()
- with _cache_lock:
- hit = _mem_cache.get(key)
- if hit is not None:
- exp, payload = hit
- if exp > now:
- return payload
- _mem_cache.pop(key, None)
- r = _get_redis_client()
- if r is None:
- return None
- try:
- raw = r.get(key)
- return raw if isinstance(raw, (bytes, bytearray)) else None
- except Exception:
- return None
- def _cache_set(key: str, payload: bytes, ttl_seconds: int) -> None:
- now = time.time()
- exp = now + max(1, int(ttl_seconds or 0))
- with _cache_lock:
- _mem_cache[key] = (exp, payload)
- if len(_mem_cache) > 2048:
- for k in [k for k, (e, _p) in _mem_cache.items() if e <= now][:256]:
- _mem_cache.pop(k, None)
- while len(_mem_cache) > 2048:
- _mem_cache.pop(next(iter(_mem_cache)), None)
- r = _get_redis_client()
- if r is None:
- return
- try:
- r.setex(key, max(1, int(ttl_seconds or 0)), payload)
- except Exception:
- return
- def _gogs_base_url_and_token() -> tuple[str, str]:
- config = get_config()
- base_url = (get_setting_value("GOGS_BASE_URL") or config.gogs_base_url).rstrip("/")
- token = get_setting_value("GOGS_TOKEN")
- if token is not None:
- token = token.strip() or None
- if token is None:
- token = config.gogs_token
- return base_url, (token or "").strip()
- def gogs_api_request(
- method: str,
- path: str,
- *,
- params: Optional[dict[str, Any]] = None,
- json: Optional[dict[str, Any]] = None,
- timeout: int = 6,
- ) -> requests.Response:
- base_url, token = _gogs_base_url_and_token()
- parts = urlsplit(base_url)
- if parts.scheme not in {"http", "https"} or not parts.netloc or parts.username or parts.password:
- resp = requests.Response()
- resp.status_code = 599
- resp.url = base_url
- resp._content = b""
- return resp
- url = f"{base_url}{path}"
- merged = dict(params or {})
- headers: dict[str, str] = {}
- if token:
- headers["Authorization"] = f"token {token}"
- if method.upper() == "GET":
- merged.setdefault("token", token)
- is_cacheable_get = method.upper() == "GET"
- cache_key = None
- ttl = 0
- if is_cacheable_get:
- cache_key = _make_cache_key(method="GET", base_url=base_url, path=path, params=merged, token=token)
- ttl = _cache_ttl_seconds(path)
- raw = _cache_get(cache_key)
- if raw:
- nl = raw.find(b"\n")
- code = 200
- body = raw
- if nl > 0:
- try:
- code = int(raw[:nl].decode("ascii", errors="ignore") or "200")
- body = raw[nl + 1 :]
- except Exception:
- code = 200
- body = raw
- cached = requests.Response()
- cached.status_code = code
- cached.url = url
- cached._content = body
- return cached
- def _safe_request(**kwargs) -> requests.Response:
- try:
- sig = inspect.signature(requests.request)
- has_varkw = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values())
- if "allow_redirects" in sig.parameters or has_varkw:
- kwargs.setdefault("allow_redirects", False)
- else:
- kwargs.pop("allow_redirects", None)
- except Exception:
- kwargs.pop("allow_redirects", None)
- return requests.request(**kwargs)
- try:
- resp = _safe_request(method=method.upper(), url=url, params=merged or None, json=json, headers=headers or None, timeout=timeout, allow_redirects=False)
- if token and method.upper() != "GET" and resp.status_code in {401, 403, 500}:
- merged2 = dict(merged)
- merged2.setdefault("token", token)
- return _safe_request(method=method.upper(), url=url, params=merged2 or None, json=json, headers=headers or None, timeout=timeout, allow_redirects=False)
- if is_cacheable_get and cache_key and int(resp.status_code) == 200:
- try:
- body = resp.content or b""
- if body:
- payload = f"{int(resp.status_code)}\n".encode("ascii") + body
- _cache_set(cache_key, payload, ttl)
- except Exception:
- pass
- return resp
- except requests.RequestException:
- resp = requests.Response()
- resp.status_code = 599
- resp.url = url
- resp._content = b""
- return resp
- def gogs_api_get(path: str, params: Optional[dict[str, Any]] = None) -> requests.Response:
- return gogs_api_request("GET", path, params=params)
- def gogs_create_repo(owner: str, name: str, description: str, private: bool) -> requests.Response:
- payload = {"name": name, "description": description, "private": bool(private)}
- if owner:
- org_resp = gogs_api_request("POST", f"/api/v1/org/{quote(owner)}/repos", json=payload, timeout=30)
- if org_resp.status_code < 400:
- return org_resp
- if org_resp.status_code in {403, 404, 500}:
- admin_resp = gogs_api_request("POST", f"/api/v1/admin/users/{quote(owner)}/repos", json=payload, timeout=30)
- if admin_resp.status_code < 400:
- return admin_resp
- user_resp = gogs_api_request("POST", "/api/v1/user/repos", json=payload, timeout=30)
- if user_resp.status_code < 400:
- return user_resp
- return org_resp
- return gogs_api_request("POST", "/api/v1/user/repos", json=payload, timeout=30)
- def gogs_get_content(owner: str, repo: str, path: str, ref: str) -> requests.Response:
- path = path.lstrip("/")
- if path:
- api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{quote(path)}"
- else:
- api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents"
- return gogs_api_get(api_path, params={"ref": ref})
- def gogs_upsert_file(owner: str, repo: str, path: str, *, content_text: str, message: str, branch: str) -> requests.Response:
- path = path.lstrip("/")
- api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{quote(path)}"
- content_b64 = base64.b64encode(content_text.encode("utf-8")).decode("ascii")
- create_payload = {"content": content_b64, "message": message, "branch": branch}
- create_resp = gogs_api_request("POST", api_path, json=create_payload, timeout=30)
- if create_resp.status_code < 400:
- return create_resp
- existing = gogs_api_get(api_path, params={"ref": branch})
- if existing.status_code >= 400:
- return create_resp
- data = existing.json() or {}
- sha = (data.get("sha") or "").strip()
- if not sha:
- return create_resp
- update_payload = {"content": content_b64, "message": message, "sha": sha, "branch": branch}
- return gogs_api_request("PUT", api_path, json=update_payload, timeout=30)
- def gogs_contents(owner: str, repo: str, path: str, ref: str) -> requests.Response:
- return gogs_get_content(owner, repo, path, ref)
- def gogs_repo_info(owner: str, repo: str) -> requests.Response:
- api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}"
- return gogs_api_get(api_path)
- def gogs_delete_repo(owner: str, repo: str) -> requests.Response:
- api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}"
- return gogs_api_request("DELETE", api_path, timeout=30)
- def gogs_user_repos(username: str) -> requests.Response:
- api_path = f"/api/v1/users/{quote(username)}/repos"
- return gogs_api_get(api_path)
- def gogs_my_repos() -> requests.Response:
- api_path = "/api/v1/user/repos"
- return gogs_api_get(api_path)
- def gogs_branches(owner: str, repo: str) -> requests.Response:
- api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/branches"
- return gogs_api_get(api_path)
- def gogs_tags(owner: str, repo: str) -> requests.Response:
- api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/tags"
- return gogs_api_get(api_path)
- def gogs_commits(owner: str, repo: str, *, ref: str, path: str, limit: int) -> requests.Response:
- api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/commits"
- params: dict[str, Any] = {}
- if ref:
- params["sha"] = ref
- if path:
- params["path"] = path
- params["page"] = 1
- params["limit"] = max(1, min(int(limit or 0), 50))
- return gogs_api_get(api_path, params=params)
- def gogs_archive_get(owner: str, repo: str, ref: str) -> requests.Response:
- base_url, token = _gogs_base_url_and_token()
- parts = urlsplit(base_url)
- if parts.scheme not in {"http", "https"} or not parts.netloc or parts.username or parts.password:
- resp = requests.Response()
- resp.status_code = 599
- resp.url = base_url
- resp._content = b""
- return resp
- url = f"{base_url}/api/v1/repos/{quote(owner)}/{quote(repo)}/archive/{quote(ref)}.zip"
- params: dict[str, Any] = {}
- if token:
- params["token"] = token
- try:
- return requests.get(url, params=params or None, stream=True, timeout=60, allow_redirects=False)
- except requests.RequestException:
- resp = requests.Response()
- resp.status_code = 599
- resp.url = url
- resp._content = b""
- return resp
- def gogs_archive_url(owner: str, repo: str, ref: str) -> str:
- base_url, _token = _gogs_base_url_and_token()
- return f"{base_url}/api/v1/repos/{quote(owner)}/{quote(repo)}/archive/{quote(ref)}.zip"
- class GogsGitError(Exception):
- def __init__(self, code: str, message: str) -> None:
- super().__init__(message)
- self.code = code
- self.message = message
- def _git_remote_url(owner: str, repo: str) -> str:
- base_url, _token = _gogs_base_url_and_token()
- return f"{base_url}/{quote(owner)}/{quote(repo)}.git"
- def _git_remote_url_with_token(owner: str, repo: str) -> str:
- base_url, token = _gogs_base_url_and_token()
- if not token:
- raise GogsGitError("gogs_token_required", "Gogs token is required for git operations")
- username = (owner or "").strip() or "root"
- parts = urlsplit(base_url)
- if not parts.scheme or not parts.netloc:
- raise GogsGitError("invalid_gogs_base_url", "Invalid Gogs base URL")
- auth = f"{quote(username, safe='')}:{quote(token, safe='')}"
- netloc = f"{auth}@{parts.netloc}"
- authed_base = urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment)).rstrip("/")
- return f"{authed_base}/{quote(owner)}/{quote(repo)}.git"
- def _run_git(args: list[str], cwd: Path) -> tuple[int, str, str]:
- env = dict(os.environ)
- env["GIT_TERMINAL_PROMPT"] = "0"
- try:
- proc = subprocess.run(args, cwd=str(cwd), env=env, capture_output=True)
- except FileNotFoundError:
- raise GogsGitError("git_not_found", "git is required on server PATH")
- def _decode(b: bytes | None) -> str:
- raw = b or b""
- if not raw:
- return ""
- for enc in ("utf-8", "gb18030"):
- try:
- return raw.decode(enc)
- except UnicodeDecodeError:
- pass
- return raw.decode("utf-8", errors="replace")
- return int(proc.returncode), _decode(proc.stdout), _decode(proc.stderr)
- def _scrub_git_error(err: str, token: str) -> str:
- text = err or ""
- if token:
- text = text.replace(token, "***")
- text = re.sub(r":([^:@\s]+)@", r":***@", text)
- return text
- def _git_branch_exists(remote_url: str, branch: str) -> bool:
- with tempfile.TemporaryDirectory() as d:
- code, out, _err = _run_git(["git", "ls-remote", "--heads", remote_url, branch], Path(d))
- return code == 0 and bool((out or "").strip())
- def gogs_git_list_refs(owner: str, repo: str, *, limit: int = 200) -> dict[str, Any]:
- limit = max(1, min(int(limit or 0), 500))
- _base, token = _gogs_base_url_and_token()
- remote = _git_remote_url_with_token(owner, repo)
- with tempfile.TemporaryDirectory() as d:
- code, out, err = _run_git(["git", "ls-remote", "--heads", remote], Path(d))
- if code != 0:
- raise GogsGitError("git_ls_remote_failed", (_scrub_git_error(err or "git ls-remote failed", token))[:500])
- branches: list[str] = []
- for line in (out or "").splitlines():
- parts = (line or "").split("\t", 1)
- if len(parts) != 2:
- continue
- ref = parts[1].strip()
- if not ref.startswith("refs/heads/"):
- continue
- name = ref[len("refs/heads/") :].strip()
- if name:
- branches.append(name)
- branches = sorted(set(branches))[:limit]
- code, out, err = _run_git(["git", "ls-remote", "--tags", remote], Path(d))
- if code != 0:
- raise GogsGitError("git_ls_remote_failed", (_scrub_git_error(err or "git ls-remote failed", token))[:500])
- tags: list[str] = []
- for line in (out or "").splitlines():
- parts = (line or "").split("\t", 1)
- if len(parts) != 2:
- continue
- ref = parts[1].strip()
- if not ref.startswith("refs/tags/"):
- continue
- name = ref[len("refs/tags/") :].strip()
- if not name or name.endswith("^{}"):
- continue
- tags.append(name)
- tags = sorted(set(tags))[:limit]
- return {"branches": [{"name": n} for n in branches], "tags": [{"name": n} for n in tags]}
- def _looks_like_commit(s: str) -> bool:
- t = (s or "").strip()
- if len(t) < 7 or len(t) > 40:
- return False
- return bool(re.fullmatch(r"[0-9a-fA-F]{7,40}", t))
- def _git_ls_remote_resolve(remote_url: str, ref: str) -> str | None:
- ref = (ref or "").strip()
- if not ref:
- return None
- with tempfile.TemporaryDirectory() as d:
- code, out, _err = _run_git(["git", "ls-remote", remote_url, ref], Path(d))
- if code != 0:
- return None
- best = None
- for line in (out or "").splitlines():
- parts = (line or "").split("\t", 1)
- if len(parts) != 2:
- continue
- sha = (parts[0] or "").strip()
- name = (parts[1] or "").strip()
- if not sha:
- continue
- if name.endswith("^{}"):
- return sha
- if best is None:
- best = sha
- return best
- def gogs_resolve_ref_commit(owner: str, repo: str, ref: str) -> dict[str, Any]:
- ref = (ref or "").strip() or "HEAD"
- if _looks_like_commit(ref):
- return {"ok": True, "ref": ref, "commit": ref.lower(), "kind": "commit"}
- api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/branches/{quote(ref)}"
- resp = gogs_api_get(api_path)
- if resp.status_code < 400:
- try:
- data = resp.json() or {}
- except Exception:
- data = {}
- commit = None
- if isinstance(data, dict):
- c = data.get("commit")
- if isinstance(c, dict):
- commit = (c.get("id") or c.get("sha") or c.get("commit_id") or "").strip() or None
- if commit is None:
- commit = (data.get("commit") or data.get("sha") or data.get("id") or "").strip() or None
- if commit and _looks_like_commit(commit):
- return {"ok": True, "ref": ref, "commit": commit.lower(), "kind": "branch"}
- _base, token = _gogs_base_url_and_token()
- remote = _git_remote_url_with_token(owner, repo) if token else _git_remote_url(owner, repo)
- sha = _git_ls_remote_resolve(remote, f"refs/heads/{ref}")
- if sha and _looks_like_commit(sha):
- return {"ok": True, "ref": ref, "commit": sha.lower(), "kind": "branch"}
- sha2 = _git_ls_remote_resolve(remote, f"refs/tags/{ref}^{{}}") or _git_ls_remote_resolve(remote, f"refs/tags/{ref}")
- if sha2 and _looks_like_commit(sha2):
- return {"ok": True, "ref": ref, "commit": sha2.lower(), "kind": "tag"}
- return {"ok": False, "ref": ref, "commit": None, "kind": "unknown"}
- def gogs_git_archive_zip_commit(owner: str, repo: str, commit: str) -> str:
- commit = (commit or "").strip()
- if not _looks_like_commit(commit):
- raise GogsGitError("invalid_commit", "Invalid commit")
- _base, token = _gogs_base_url_and_token()
- remote = _git_remote_url_with_token(owner, repo) if token else _git_remote_url(owner, repo)
- tmpdir = tempfile.mkdtemp()
- zip_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
- zip_path = zip_file.name
- zip_file.close()
- try:
- repo_dir = Path(tmpdir) / "repo"
- repo_dir.mkdir(parents=True, exist_ok=True)
- code, _out, err = _run_git(["git", "init"], repo_dir)
- if code != 0:
- raise GogsGitError("git_init_failed", (_scrub_git_error(err or "git init failed", token))[:500])
- code, _out, err = _run_git(["git", "remote", "add", "origin", remote], repo_dir)
- if code != 0:
- raise GogsGitError("git_remote_failed", (_scrub_git_error(err or "git remote failed", token))[:500])
- code, _out, err = _run_git(["git", "fetch", "--depth", "1", "origin", commit], repo_dir)
- if code != 0:
- raise GogsGitError("git_fetch_failed", (_scrub_git_error(err or "git fetch failed", token))[:500])
- code, _out, err = _run_git(["git", "checkout", "--detach", "FETCH_HEAD"], repo_dir)
- if code != 0:
- raise GogsGitError("git_checkout_failed", (_scrub_git_error(err or "git checkout failed", token))[:500])
- code, _out, err = _run_git(["git", "archive", "--format=zip", "-o", zip_path, "HEAD"], repo_dir)
- if code != 0:
- msg = (err or "git archive failed").lower()
- if "not a valid object name" in msg or "unknown revision" in msg or "bad object" in msg:
- raise GogsGitError("empty_repo", "Repository has no commits")
- raise GogsGitError("git_archive_failed", (_scrub_git_error(err or "git archive failed", token))[:500])
- return zip_path
- except Exception:
- try:
- os.unlink(zip_path)
- except Exception:
- pass
- raise
- finally:
- try:
- shutil.rmtree(tmpdir, ignore_errors=True)
- except Exception:
- pass
- def gogs_git_archive_zip(owner: str, repo: str, ref: str) -> str:
- ref = (ref or "").strip() or "HEAD"
- base_url, token = _gogs_base_url_and_token()
- _ = base_url
- remote = _git_remote_url_with_token(owner, repo) if token else _git_remote_url(owner, repo)
- tmpdir = tempfile.mkdtemp()
- zip_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
- zip_path = zip_file.name
- zip_file.close()
- try:
- repo_dir = Path(tmpdir) / "repo"
- clone_args = ["git", "clone", "--depth", "1"]
- if ref != "HEAD":
- clone_args.extend(["--branch", ref])
- clone_args.extend([remote, str(repo_dir)])
- code, _out, err = _run_git(clone_args, Path(tmpdir))
- if code != 0:
- code2, _out2, err2 = _run_git(["git", "clone", "--depth", "1", remote, str(repo_dir)], Path(tmpdir))
- if code2 != 0:
- raise GogsGitError("git_clone_failed", (_scrub_git_error(err2 or "git clone failed", token))[:500])
- if ref != "HEAD":
- code3, _out3, err3 = _run_git(["git", "checkout", ref], repo_dir)
- if code3 != 0:
- raise GogsGitError("branch_not_found", (_scrub_git_error(err3 or "ref not found", token))[:500])
- code4, _out4, err4 = _run_git(["git", "archive", "--format=zip", "-o", zip_path, "HEAD"], repo_dir)
- if code4 != 0:
- msg = (err4 or "git archive failed").lower()
- if "not a valid object name" in msg or "unknown revision" in msg or "bad object" in msg:
- raise GogsGitError("empty_repo", "Repository has no commits")
- raise GogsGitError("git_archive_failed", (_scrub_git_error(err4 or "git archive failed", token))[:500])
- return zip_path
- except Exception:
- try:
- os.unlink(zip_path)
- except Exception:
- pass
- raise
- finally:
- try:
- shutil.rmtree(tmpdir, ignore_errors=True)
- except Exception:
- pass
- def gogs_git_write_file(owner: str, repo: str, branch: str, path: str, content_text: str, message: str, *, must_create: bool) -> dict[str, str]:
- branch = (branch or "").strip()
- if not branch:
- raise GogsGitError("ref_required", "Branch is required")
- _base, token = _gogs_base_url_and_token()
- remote = _git_remote_url_with_token(owner, repo)
- exists = _git_branch_exists(remote, branch)
- with tempfile.TemporaryDirectory() as d:
- repo_dir = Path(d) / "repo"
- if exists:
- code, _out, err = _run_git(["git", "clone", "--depth", "1", "--branch", branch, remote, str(repo_dir)], Path(d))
- if code != 0:
- raise GogsGitError("git_clone_failed", (_scrub_git_error(err or "git clone failed", token))[:500])
- else:
- code, _out, err = _run_git(["git", "clone", remote, str(repo_dir)], Path(d))
- if code != 0:
- raise GogsGitError("git_clone_failed", (_scrub_git_error(err or "git clone failed", token))[:500])
- code, _out, err = _run_git(["git", "checkout", "--orphan", branch], repo_dir)
- if code != 0:
- raise GogsGitError("git_checkout_failed", (_scrub_git_error(err or "git checkout failed", token))[:500])
- rel = Path(path)
- full = repo_dir / rel
- if must_create and full.exists():
- raise GogsGitError("file_exists", "File already exists")
- if (not must_create) and (not full.exists()):
- raise GogsGitError("file_not_found", "File not found")
- full.parent.mkdir(parents=True, exist_ok=True)
- full.write_text(content_text, encoding="utf-8")
- rel_posix = rel.as_posix()
- code, _out, err = _run_git(["git", "add", rel_posix], repo_dir)
- if code != 0:
- raise GogsGitError("git_add_failed", (_scrub_git_error(err or "git add failed", token))[:500])
- msg = (message or "").strip() or f"Update {rel_posix}"
- code, _out, err = _run_git(["git", "-c", "user.name=SourceShare", "-c", "user.email=source@local", "commit", "-m", msg], repo_dir)
- if code != 0:
- raise GogsGitError("git_commit_failed", (_scrub_git_error(err or "git commit failed", token))[:500])
- code, out, err = _run_git(["git", "rev-parse", "HEAD"], repo_dir)
- if code != 0:
- raise GogsGitError("git_rev_parse_failed", (_scrub_git_error(err or "git rev-parse failed", token))[:500])
- commit = (out or "").strip()
- code, _out, err = _run_git(["git", "push", remote, f"HEAD:{branch}"], repo_dir)
- if code != 0:
- raise GogsGitError("git_push_failed", (_scrub_git_error(err or "git push failed", token))[:500])
- return {"branch": branch, "commit": commit}
- def gogs_git_delete_path(owner: str, repo: str, branch: str, path: str, message: str) -> dict[str, str]:
- branch = (branch or "").strip()
- if not branch:
- raise GogsGitError("ref_required", "Branch is required")
- _base, token = _gogs_base_url_and_token()
- remote = _git_remote_url_with_token(owner, repo)
- exists = _git_branch_exists(remote, branch)
- if not exists:
- raise GogsGitError("branch_not_found", "Branch not found")
- with tempfile.TemporaryDirectory() as d:
- repo_dir = Path(d) / "repo"
- code, _out, err = _run_git(["git", "clone", "--depth", "1", "--branch", branch, remote, str(repo_dir)], Path(d))
- if code != 0:
- raise GogsGitError("git_clone_failed", (_scrub_git_error(err or "git clone failed", token))[:500])
- rel = Path(path)
- rel_posix = rel.as_posix()
- full = repo_dir / rel
- if not full.exists():
- raise GogsGitError("path_not_found", "Path not found")
- code, _out, err = _run_git(["git", "rm", "-r", "--", rel_posix], repo_dir)
- if code != 0:
- raise GogsGitError("git_rm_failed", (_scrub_git_error(err or "git rm failed", token))[:500])
- msg = (message or "").strip() or f"Delete {rel_posix}"
- code, _out, err = _run_git(["git", "-c", "user.name=SourceShare", "-c", "user.email=source@local", "commit", "-m", msg], repo_dir)
- if code != 0:
- raise GogsGitError("git_commit_failed", (_scrub_git_error(err or "git commit failed", token))[:500])
- code, out, err = _run_git(["git", "rev-parse", "HEAD"], repo_dir)
- if code != 0:
- raise GogsGitError("git_rev_parse_failed", (_scrub_git_error(err or "git rev-parse failed", token))[:500])
- commit = (out or "").strip()
- code, _out, err = _run_git(["git", "push", remote, f"HEAD:{branch}"], repo_dir)
- if code != 0:
- raise GogsGitError("git_push_failed", (_scrub_git_error(err or "git push failed", token))[:500])
- return {"branch": branch, "commit": commit}
- def gogs_git_log(owner: str, repo: str, ref: str, path: str, limit: int) -> dict[str, Any]:
- ref = (ref or "").strip()
- if not ref:
- raise GogsGitError("ref_required", "Ref is required")
- limit = max(1, min(int(limit or 0), 50))
- _base, token = _gogs_base_url_and_token()
- remote = _git_remote_url_with_token(owner, repo) if token else _git_remote_url(owner, repo)
- depth = str(max(limit, 50))
- with tempfile.TemporaryDirectory() as d:
- repo_dir = Path(d) / "repo"
- code, _out, err = _run_git(["git", "clone", "--depth", depth, "--branch", ref, remote, str(repo_dir)], Path(d))
- if code != 0:
- raise GogsGitError("git_clone_failed", (_scrub_git_error(err or "git clone failed", token))[:500])
- args = ["git", "log", "-n", str(limit), "--date=iso-strict", "--pretty=format:%H%x1f%an%x1f%ad%x1f%s"]
- if path:
- args.extend(["--", Path(path).as_posix()])
- code, out, err = _run_git(args, repo_dir)
- if code != 0:
- raise GogsGitError("git_log_failed", (_scrub_git_error(err or "git log failed", token))[:500])
- items = []
- for line in (out or "").splitlines():
- parts = line.split("\x1f")
- if len(parts) < 4:
- continue
- sha, author_name, author_date, subject = parts[0], parts[1], parts[2], "\x1f".join(parts[3:])
- items.append({"sha": sha, "authorName": author_name, "authorDate": author_date, "subject": subject})
- return {"ref": ref, "path": path or "", "items": items}
|