import base64 import hashlib import inspect import os import re import shutil import subprocess import tempfile import threading import time from pathlib import Path from typing import Any, Optional from urllib.parse import quote, urlsplit, urlunsplit import requests from .context import get_config from .settings import get_setting_value _cache_lock = threading.Lock() _mem_cache: dict[str, tuple[float, bytes]] = {} _redis_lock = threading.Lock() _redis_client = None def _get_redis_url() -> str: v = get_setting_value("REDIS_URL") if v is not None and v.strip(): return v.strip() return (os.environ.get("REDIS_URL") or "").strip() def _get_redis_client(): global _redis_client with _redis_lock: if _redis_client is False: _redis_client = None if _redis_client is not None: return _redis_client url = _get_redis_url() if not url: _redis_client = False return None try: import redis # type: ignore except Exception: return None try: _redis_client = redis.Redis.from_url( url, socket_connect_timeout=1, socket_timeout=2, decode_responses=False, ) except Exception: return None return _redis_client def _cache_ttl_seconds(path: str) -> int: p = (path or "").lower() if "/branches" in p or "/tags" in p: return 30 if "/commits" in p: return 10 if "/contents" in p: return 10 if p.startswith("/api/v1/repos/"): return 60 return 15 def _make_cache_key(*, method: str, base_url: str, path: str, params: dict[str, Any] | None, token: str) -> str: items = [] for k, v in sorted((params or {}).items(), key=lambda kv: kv[0]): items.append(f"{k}={v}") qs = "&".join(items) token_tag = hashlib.sha256((token or "").encode("utf-8")).hexdigest()[:12] if token else "-" return f"gogs:{method.upper()}:{base_url}:{path}?{qs}:tk={token_tag}" def _cache_get(key: str) -> bytes | None: now = time.time() with _cache_lock: hit = _mem_cache.get(key) if hit is not None: exp, payload = hit if exp > now: return payload _mem_cache.pop(key, None) r = _get_redis_client() if r is None: return None try: raw = r.get(key) return raw if isinstance(raw, (bytes, bytearray)) else None except Exception: return None def _cache_set(key: str, payload: bytes, ttl_seconds: int) -> None: now = time.time() exp = now + max(1, int(ttl_seconds or 0)) with _cache_lock: _mem_cache[key] = (exp, payload) if len(_mem_cache) > 2048: for k in [k for k, (e, _p) in _mem_cache.items() if e <= now][:256]: _mem_cache.pop(k, None) while len(_mem_cache) > 2048: _mem_cache.pop(next(iter(_mem_cache)), None) r = _get_redis_client() if r is None: return try: r.setex(key, max(1, int(ttl_seconds or 0)), payload) except Exception: return def _gogs_base_url_and_token() -> tuple[str, str]: config = get_config() base_url = (get_setting_value("GOGS_BASE_URL") or config.gogs_base_url).rstrip("/") token = get_setting_value("GOGS_TOKEN") if token is not None: token = token.strip() or None if token is None: token = config.gogs_token return base_url, (token or "").strip() def gogs_api_request( method: str, path: str, *, params: Optional[dict[str, Any]] = None, json: Optional[dict[str, Any]] = None, timeout: int = 6, ) -> requests.Response: base_url, token = _gogs_base_url_and_token() parts = urlsplit(base_url) if parts.scheme not in {"http", "https"} or not parts.netloc or parts.username or parts.password: resp = requests.Response() resp.status_code = 599 resp.url = base_url resp._content = b"" return resp url = f"{base_url}{path}" merged = dict(params or {}) headers: dict[str, str] = {} if token: headers["Authorization"] = f"token {token}" if method.upper() == "GET": merged.setdefault("token", token) is_cacheable_get = method.upper() == "GET" cache_key = None ttl = 0 if is_cacheable_get: cache_key = _make_cache_key(method="GET", base_url=base_url, path=path, params=merged, token=token) ttl = _cache_ttl_seconds(path) raw = _cache_get(cache_key) if raw: nl = raw.find(b"\n") code = 200 body = raw if nl > 0: try: code = int(raw[:nl].decode("ascii", errors="ignore") or "200") body = raw[nl + 1 :] except Exception: code = 200 body = raw cached = requests.Response() cached.status_code = code cached.url = url cached._content = body return cached def _safe_request(**kwargs) -> requests.Response: try: sig = inspect.signature(requests.request) has_varkw = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()) if "allow_redirects" in sig.parameters or has_varkw: kwargs.setdefault("allow_redirects", False) else: kwargs.pop("allow_redirects", None) except Exception: kwargs.pop("allow_redirects", None) return requests.request(**kwargs) try: resp = _safe_request(method=method.upper(), url=url, params=merged or None, json=json, headers=headers or None, timeout=timeout, allow_redirects=False) if token and method.upper() != "GET" and resp.status_code in {401, 403, 500}: merged2 = dict(merged) merged2.setdefault("token", token) return _safe_request(method=method.upper(), url=url, params=merged2 or None, json=json, headers=headers or None, timeout=timeout, allow_redirects=False) if is_cacheable_get and cache_key and int(resp.status_code) == 200: try: body = resp.content or b"" if body: payload = f"{int(resp.status_code)}\n".encode("ascii") + body _cache_set(cache_key, payload, ttl) except Exception: pass return resp except requests.RequestException: resp = requests.Response() resp.status_code = 599 resp.url = url resp._content = b"" return resp def gogs_api_get(path: str, params: Optional[dict[str, Any]] = None) -> requests.Response: return gogs_api_request("GET", path, params=params) def gogs_create_repo(owner: str, name: str, description: str, private: bool) -> requests.Response: payload = {"name": name, "description": description, "private": bool(private)} if owner: org_resp = gogs_api_request("POST", f"/api/v1/org/{quote(owner)}/repos", json=payload, timeout=30) if org_resp.status_code < 400: return org_resp if org_resp.status_code in {403, 404, 500}: admin_resp = gogs_api_request("POST", f"/api/v1/admin/users/{quote(owner)}/repos", json=payload, timeout=30) if admin_resp.status_code < 400: return admin_resp user_resp = gogs_api_request("POST", "/api/v1/user/repos", json=payload, timeout=30) if user_resp.status_code < 400: return user_resp return org_resp return gogs_api_request("POST", "/api/v1/user/repos", json=payload, timeout=30) def gogs_get_content(owner: str, repo: str, path: str, ref: str) -> requests.Response: path = path.lstrip("/") if path: api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{quote(path)}" else: api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents" return gogs_api_get(api_path, params={"ref": ref}) def gogs_upsert_file(owner: str, repo: str, path: str, *, content_text: str, message: str, branch: str) -> requests.Response: path = path.lstrip("/") api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/contents/{quote(path)}" content_b64 = base64.b64encode(content_text.encode("utf-8")).decode("ascii") create_payload = {"content": content_b64, "message": message, "branch": branch} create_resp = gogs_api_request("POST", api_path, json=create_payload, timeout=30) if create_resp.status_code < 400: return create_resp existing = gogs_api_get(api_path, params={"ref": branch}) if existing.status_code >= 400: return create_resp data = existing.json() or {} sha = (data.get("sha") or "").strip() if not sha: return create_resp update_payload = {"content": content_b64, "message": message, "sha": sha, "branch": branch} return gogs_api_request("PUT", api_path, json=update_payload, timeout=30) def gogs_contents(owner: str, repo: str, path: str, ref: str) -> requests.Response: return gogs_get_content(owner, repo, path, ref) def gogs_repo_info(owner: str, repo: str) -> requests.Response: api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}" return gogs_api_get(api_path) def gogs_delete_repo(owner: str, repo: str) -> requests.Response: api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}" return gogs_api_request("DELETE", api_path, timeout=30) def gogs_user_repos(username: str) -> requests.Response: api_path = f"/api/v1/users/{quote(username)}/repos" return gogs_api_get(api_path) def gogs_my_repos() -> requests.Response: api_path = "/api/v1/user/repos" return gogs_api_get(api_path) def gogs_branches(owner: str, repo: str) -> requests.Response: api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/branches" return gogs_api_get(api_path) def gogs_tags(owner: str, repo: str) -> requests.Response: api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/tags" return gogs_api_get(api_path) def gogs_commits(owner: str, repo: str, *, ref: str, path: str, limit: int) -> requests.Response: api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/commits" params: dict[str, Any] = {} if ref: params["sha"] = ref if path: params["path"] = path params["page"] = 1 params["limit"] = max(1, min(int(limit or 0), 50)) return gogs_api_get(api_path, params=params) def gogs_archive_get(owner: str, repo: str, ref: str) -> requests.Response: base_url, token = _gogs_base_url_and_token() parts = urlsplit(base_url) if parts.scheme not in {"http", "https"} or not parts.netloc or parts.username or parts.password: resp = requests.Response() resp.status_code = 599 resp.url = base_url resp._content = b"" return resp url = f"{base_url}/api/v1/repos/{quote(owner)}/{quote(repo)}/archive/{quote(ref)}.zip" params: dict[str, Any] = {} if token: params["token"] = token try: return requests.get(url, params=params or None, stream=True, timeout=60, allow_redirects=False) except requests.RequestException: resp = requests.Response() resp.status_code = 599 resp.url = url resp._content = b"" return resp def gogs_archive_url(owner: str, repo: str, ref: str) -> str: base_url, _token = _gogs_base_url_and_token() return f"{base_url}/api/v1/repos/{quote(owner)}/{quote(repo)}/archive/{quote(ref)}.zip" class GogsGitError(Exception): def __init__(self, code: str, message: str) -> None: super().__init__(message) self.code = code self.message = message def _git_remote_url(owner: str, repo: str) -> str: base_url, _token = _gogs_base_url_and_token() return f"{base_url}/{quote(owner)}/{quote(repo)}.git" def _git_remote_url_with_token(owner: str, repo: str) -> str: base_url, token = _gogs_base_url_and_token() if not token: raise GogsGitError("gogs_token_required", "Gogs token is required for git operations") username = (owner or "").strip() or "root" parts = urlsplit(base_url) if not parts.scheme or not parts.netloc: raise GogsGitError("invalid_gogs_base_url", "Invalid Gogs base URL") auth = f"{quote(username, safe='')}:{quote(token, safe='')}" netloc = f"{auth}@{parts.netloc}" authed_base = urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment)).rstrip("/") return f"{authed_base}/{quote(owner)}/{quote(repo)}.git" def _run_git(args: list[str], cwd: Path) -> tuple[int, str, str]: env = dict(os.environ) env["GIT_TERMINAL_PROMPT"] = "0" try: proc = subprocess.run(args, cwd=str(cwd), env=env, capture_output=True) except FileNotFoundError: raise GogsGitError("git_not_found", "git is required on server PATH") def _decode(b: bytes | None) -> str: raw = b or b"" if not raw: return "" for enc in ("utf-8", "gb18030"): try: return raw.decode(enc) except UnicodeDecodeError: pass return raw.decode("utf-8", errors="replace") return int(proc.returncode), _decode(proc.stdout), _decode(proc.stderr) def _scrub_git_error(err: str, token: str) -> str: text = err or "" if token: text = text.replace(token, "***") text = re.sub(r":([^:@\s]+)@", r":***@", text) return text def _git_branch_exists(remote_url: str, branch: str) -> bool: with tempfile.TemporaryDirectory() as d: code, out, _err = _run_git(["git", "ls-remote", "--heads", remote_url, branch], Path(d)) return code == 0 and bool((out or "").strip()) def gogs_git_list_refs(owner: str, repo: str, *, limit: int = 200) -> dict[str, Any]: limit = max(1, min(int(limit or 0), 500)) _base, token = _gogs_base_url_and_token() remote = _git_remote_url_with_token(owner, repo) with tempfile.TemporaryDirectory() as d: code, out, err = _run_git(["git", "ls-remote", "--heads", remote], Path(d)) if code != 0: raise GogsGitError("git_ls_remote_failed", (_scrub_git_error(err or "git ls-remote failed", token))[:500]) branches: list[str] = [] for line in (out or "").splitlines(): parts = (line or "").split("\t", 1) if len(parts) != 2: continue ref = parts[1].strip() if not ref.startswith("refs/heads/"): continue name = ref[len("refs/heads/") :].strip() if name: branches.append(name) branches = sorted(set(branches))[:limit] code, out, err = _run_git(["git", "ls-remote", "--tags", remote], Path(d)) if code != 0: raise GogsGitError("git_ls_remote_failed", (_scrub_git_error(err or "git ls-remote failed", token))[:500]) tags: list[str] = [] for line in (out or "").splitlines(): parts = (line or "").split("\t", 1) if len(parts) != 2: continue ref = parts[1].strip() if not ref.startswith("refs/tags/"): continue name = ref[len("refs/tags/") :].strip() if not name or name.endswith("^{}"): continue tags.append(name) tags = sorted(set(tags))[:limit] return {"branches": [{"name": n} for n in branches], "tags": [{"name": n} for n in tags]} def _looks_like_commit(s: str) -> bool: t = (s or "").strip() if len(t) < 7 or len(t) > 40: return False return bool(re.fullmatch(r"[0-9a-fA-F]{7,40}", t)) def _git_ls_remote_resolve(remote_url: str, ref: str) -> str | None: ref = (ref or "").strip() if not ref: return None with tempfile.TemporaryDirectory() as d: code, out, _err = _run_git(["git", "ls-remote", remote_url, ref], Path(d)) if code != 0: return None best = None for line in (out or "").splitlines(): parts = (line or "").split("\t", 1) if len(parts) != 2: continue sha = (parts[0] or "").strip() name = (parts[1] or "").strip() if not sha: continue if name.endswith("^{}"): return sha if best is None: best = sha return best def gogs_resolve_ref_commit(owner: str, repo: str, ref: str) -> dict[str, Any]: ref = (ref or "").strip() or "HEAD" if _looks_like_commit(ref): return {"ok": True, "ref": ref, "commit": ref.lower(), "kind": "commit"} api_path = f"/api/v1/repos/{quote(owner)}/{quote(repo)}/branches/{quote(ref)}" resp = gogs_api_get(api_path) if resp.status_code < 400: try: data = resp.json() or {} except Exception: data = {} commit = None if isinstance(data, dict): c = data.get("commit") if isinstance(c, dict): commit = (c.get("id") or c.get("sha") or c.get("commit_id") or "").strip() or None if commit is None: commit = (data.get("commit") or data.get("sha") or data.get("id") or "").strip() or None if commit and _looks_like_commit(commit): return {"ok": True, "ref": ref, "commit": commit.lower(), "kind": "branch"} _base, token = _gogs_base_url_and_token() remote = _git_remote_url_with_token(owner, repo) if token else _git_remote_url(owner, repo) sha = _git_ls_remote_resolve(remote, f"refs/heads/{ref}") if sha and _looks_like_commit(sha): return {"ok": True, "ref": ref, "commit": sha.lower(), "kind": "branch"} sha2 = _git_ls_remote_resolve(remote, f"refs/tags/{ref}^{{}}") or _git_ls_remote_resolve(remote, f"refs/tags/{ref}") if sha2 and _looks_like_commit(sha2): return {"ok": True, "ref": ref, "commit": sha2.lower(), "kind": "tag"} return {"ok": False, "ref": ref, "commit": None, "kind": "unknown"} def gogs_git_archive_zip_commit(owner: str, repo: str, commit: str) -> str: commit = (commit or "").strip() if not _looks_like_commit(commit): raise GogsGitError("invalid_commit", "Invalid commit") _base, token = _gogs_base_url_and_token() remote = _git_remote_url_with_token(owner, repo) if token else _git_remote_url(owner, repo) tmpdir = tempfile.mkdtemp() zip_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip") zip_path = zip_file.name zip_file.close() try: repo_dir = Path(tmpdir) / "repo" repo_dir.mkdir(parents=True, exist_ok=True) code, _out, err = _run_git(["git", "init"], repo_dir) if code != 0: raise GogsGitError("git_init_failed", (_scrub_git_error(err or "git init failed", token))[:500]) code, _out, err = _run_git(["git", "remote", "add", "origin", remote], repo_dir) if code != 0: raise GogsGitError("git_remote_failed", (_scrub_git_error(err or "git remote failed", token))[:500]) code, _out, err = _run_git(["git", "fetch", "--depth", "1", "origin", commit], repo_dir) if code != 0: raise GogsGitError("git_fetch_failed", (_scrub_git_error(err or "git fetch failed", token))[:500]) code, _out, err = _run_git(["git", "checkout", "--detach", "FETCH_HEAD"], repo_dir) if code != 0: raise GogsGitError("git_checkout_failed", (_scrub_git_error(err or "git checkout failed", token))[:500]) code, _out, err = _run_git(["git", "archive", "--format=zip", "-o", zip_path, "HEAD"], repo_dir) if code != 0: msg = (err or "git archive failed").lower() if "not a valid object name" in msg or "unknown revision" in msg or "bad object" in msg: raise GogsGitError("empty_repo", "Repository has no commits") raise GogsGitError("git_archive_failed", (_scrub_git_error(err or "git archive failed", token))[:500]) return zip_path except Exception: try: os.unlink(zip_path) except Exception: pass raise finally: try: shutil.rmtree(tmpdir, ignore_errors=True) except Exception: pass def gogs_git_archive_zip(owner: str, repo: str, ref: str) -> str: ref = (ref or "").strip() or "HEAD" base_url, token = _gogs_base_url_and_token() _ = base_url remote = _git_remote_url_with_token(owner, repo) if token else _git_remote_url(owner, repo) tmpdir = tempfile.mkdtemp() zip_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip") zip_path = zip_file.name zip_file.close() try: repo_dir = Path(tmpdir) / "repo" clone_args = ["git", "clone", "--depth", "1"] if ref != "HEAD": clone_args.extend(["--branch", ref]) clone_args.extend([remote, str(repo_dir)]) code, _out, err = _run_git(clone_args, Path(tmpdir)) if code != 0: code2, _out2, err2 = _run_git(["git", "clone", "--depth", "1", remote, str(repo_dir)], Path(tmpdir)) if code2 != 0: raise GogsGitError("git_clone_failed", (_scrub_git_error(err2 or "git clone failed", token))[:500]) if ref != "HEAD": code3, _out3, err3 = _run_git(["git", "checkout", ref], repo_dir) if code3 != 0: raise GogsGitError("branch_not_found", (_scrub_git_error(err3 or "ref not found", token))[:500]) code4, _out4, err4 = _run_git(["git", "archive", "--format=zip", "-o", zip_path, "HEAD"], repo_dir) if code4 != 0: msg = (err4 or "git archive failed").lower() if "not a valid object name" in msg or "unknown revision" in msg or "bad object" in msg: raise GogsGitError("empty_repo", "Repository has no commits") raise GogsGitError("git_archive_failed", (_scrub_git_error(err4 or "git archive failed", token))[:500]) return zip_path except Exception: try: os.unlink(zip_path) except Exception: pass raise finally: try: shutil.rmtree(tmpdir, ignore_errors=True) except Exception: pass def gogs_git_write_file(owner: str, repo: str, branch: str, path: str, content_text: str, message: str, *, must_create: bool) -> dict[str, str]: branch = (branch or "").strip() if not branch: raise GogsGitError("ref_required", "Branch is required") _base, token = _gogs_base_url_and_token() remote = _git_remote_url_with_token(owner, repo) exists = _git_branch_exists(remote, branch) with tempfile.TemporaryDirectory() as d: repo_dir = Path(d) / "repo" if exists: code, _out, err = _run_git(["git", "clone", "--depth", "1", "--branch", branch, remote, str(repo_dir)], Path(d)) if code != 0: raise GogsGitError("git_clone_failed", (_scrub_git_error(err or "git clone failed", token))[:500]) else: code, _out, err = _run_git(["git", "clone", remote, str(repo_dir)], Path(d)) if code != 0: raise GogsGitError("git_clone_failed", (_scrub_git_error(err or "git clone failed", token))[:500]) code, _out, err = _run_git(["git", "checkout", "--orphan", branch], repo_dir) if code != 0: raise GogsGitError("git_checkout_failed", (_scrub_git_error(err or "git checkout failed", token))[:500]) rel = Path(path) full = repo_dir / rel if must_create and full.exists(): raise GogsGitError("file_exists", "File already exists") if (not must_create) and (not full.exists()): raise GogsGitError("file_not_found", "File not found") full.parent.mkdir(parents=True, exist_ok=True) full.write_text(content_text, encoding="utf-8") rel_posix = rel.as_posix() code, _out, err = _run_git(["git", "add", rel_posix], repo_dir) if code != 0: raise GogsGitError("git_add_failed", (_scrub_git_error(err or "git add failed", token))[:500]) msg = (message or "").strip() or f"Update {rel_posix}" code, _out, err = _run_git(["git", "-c", "user.name=SourceShare", "-c", "user.email=source@local", "commit", "-m", msg], repo_dir) if code != 0: raise GogsGitError("git_commit_failed", (_scrub_git_error(err or "git commit failed", token))[:500]) code, out, err = _run_git(["git", "rev-parse", "HEAD"], repo_dir) if code != 0: raise GogsGitError("git_rev_parse_failed", (_scrub_git_error(err or "git rev-parse failed", token))[:500]) commit = (out or "").strip() code, _out, err = _run_git(["git", "push", remote, f"HEAD:{branch}"], repo_dir) if code != 0: raise GogsGitError("git_push_failed", (_scrub_git_error(err or "git push failed", token))[:500]) return {"branch": branch, "commit": commit} def gogs_git_delete_path(owner: str, repo: str, branch: str, path: str, message: str) -> dict[str, str]: branch = (branch or "").strip() if not branch: raise GogsGitError("ref_required", "Branch is required") _base, token = _gogs_base_url_and_token() remote = _git_remote_url_with_token(owner, repo) exists = _git_branch_exists(remote, branch) if not exists: raise GogsGitError("branch_not_found", "Branch not found") with tempfile.TemporaryDirectory() as d: repo_dir = Path(d) / "repo" code, _out, err = _run_git(["git", "clone", "--depth", "1", "--branch", branch, remote, str(repo_dir)], Path(d)) if code != 0: raise GogsGitError("git_clone_failed", (_scrub_git_error(err or "git clone failed", token))[:500]) rel = Path(path) rel_posix = rel.as_posix() full = repo_dir / rel if not full.exists(): raise GogsGitError("path_not_found", "Path not found") code, _out, err = _run_git(["git", "rm", "-r", "--", rel_posix], repo_dir) if code != 0: raise GogsGitError("git_rm_failed", (_scrub_git_error(err or "git rm failed", token))[:500]) msg = (message or "").strip() or f"Delete {rel_posix}" code, _out, err = _run_git(["git", "-c", "user.name=SourceShare", "-c", "user.email=source@local", "commit", "-m", msg], repo_dir) if code != 0: raise GogsGitError("git_commit_failed", (_scrub_git_error(err or "git commit failed", token))[:500]) code, out, err = _run_git(["git", "rev-parse", "HEAD"], repo_dir) if code != 0: raise GogsGitError("git_rev_parse_failed", (_scrub_git_error(err or "git rev-parse failed", token))[:500]) commit = (out or "").strip() code, _out, err = _run_git(["git", "push", remote, f"HEAD:{branch}"], repo_dir) if code != 0: raise GogsGitError("git_push_failed", (_scrub_git_error(err or "git push failed", token))[:500]) return {"branch": branch, "commit": commit} def gogs_git_log(owner: str, repo: str, ref: str, path: str, limit: int) -> dict[str, Any]: ref = (ref or "").strip() if not ref: raise GogsGitError("ref_required", "Ref is required") limit = max(1, min(int(limit or 0), 50)) _base, token = _gogs_base_url_and_token() remote = _git_remote_url_with_token(owner, repo) if token else _git_remote_url(owner, repo) depth = str(max(limit, 50)) with tempfile.TemporaryDirectory() as d: repo_dir = Path(d) / "repo" code, _out, err = _run_git(["git", "clone", "--depth", depth, "--branch", ref, remote, str(repo_dir)], Path(d)) if code != 0: raise GogsGitError("git_clone_failed", (_scrub_git_error(err or "git clone failed", token))[:500]) args = ["git", "log", "-n", str(limit), "--date=iso-strict", "--pretty=format:%H%x1f%an%x1f%ad%x1f%s"] if path: args.extend(["--", Path(path).as_posix()]) code, out, err = _run_git(args, repo_dir) if code != 0: raise GogsGitError("git_log_failed", (_scrub_git_error(err or "git log failed", token))[:500]) items = [] for line in (out or "").splitlines(): parts = line.split("\x1f") if len(parts) < 4: continue sha, author_name, author_date, subject = parts[0], parts[1], parts[2], "\x1f".join(parts[3:]) items.append({"sha": sha, "authorName": author_name, "authorDate": author_date, "subject": subject}) return {"ref": ref, "path": path or "", "items": items}