| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- import json
- import logging
- import os
- from pathlib import Path
- import shutil
- import stat
- import time
- from typing import Optional, Dict
- import zipfile
- import requests
- from gpustack.utils.compat_importlib import pkg_resources
- from gpustack.utils import platform
- from gpustack.worker.backend_dependency_manager import BackendDependencyManager
- logger = logging.getLogger(__name__)
- BUILTIN_GGUF_PARSER_VERSION = "v0.24.0"
- class ToolsManager:
- """
- ToolsManager is responsible for managing prebuilt binary tools including the following:
- - `fastfetch`
- - `gguf-parser`
- """
- def __init__(
- self,
- tools_download_base_url: str = None,
- data_dir: Optional[str] = None,
- bin_dir: Optional[str] = None,
- pipx_path: Optional[str] = None,
- system: Optional[str] = None,
- arch: Optional[str] = None,
- ):
- with pkg_resources.path("gpustack.third_party", "bin") as third_party_bin_path:
- self.third_party_bin_path: Path = third_party_bin_path
- self.versions_file = third_party_bin_path.joinpath("versions.json")
- self._current_tools_version = {}
- if os.path.exists(self.versions_file):
- try:
- with open(self.versions_file, 'r', encoding='utf-8') as file:
- self._current_tools_version = json.load(file)
- except Exception as e:
- logger.warning(f"Failed to load versions.json: {e}")
- self._os = system if system else platform.system()
- self._arch = arch if arch else platform.arch()
- self._download_base_url = tools_download_base_url
- self._data_dir = data_dir
- self._bin_dir = bin_dir
- self._pipx_path = pipx_path
- # Initialize backend dependency manager
- self._dependency_manager = None
- def init_dependency_manager(
- self, backend: str, version: str, model_env: Dict[str, str]
- ):
- """
- Init dependency_manager for custom backend version and dependencies.
- No need for other scenarios.
- """
- self._dependency_manager = BackendDependencyManager(
- backend=backend, version=version, model_env=model_env
- )
- def _check_and_set_download_base_url(self):
- urls = [
- "https://github.com",
- "https://gpustack-1303613262.cos.ap-guangzhou.myqcloud.com",
- "https://gh-proxy.com/https://github.com",
- ]
- test_path = f"/gpustack/gguf-parser-go/releases/download/{BUILTIN_GGUF_PARSER_VERSION}/gguf-parser-linux-amd64"
- test_size = 512 * 1024 # 512KB
- download_tests = []
- for url in urls:
- test_url = f"{url}{test_path}"
- try:
- start_time = time.time()
- headers = {"Range": f"bytes=0-{test_size - 1}"}
- response = requests.get(
- test_url, headers=headers, timeout=5, stream=True
- )
- response.raise_for_status()
- if "Content-Range" not in response.headers:
- continue
- if len(response.content) == 0:
- continue
- elapsed_time = time.time() - start_time
- download_tests.append((url, elapsed_time))
- logger.debug(f"Tested {url}, elapsed time {elapsed_time:.2f} seconds")
- except Exception as e:
- logger.debug(f"Failed to connect to {url}: {e}")
- if not download_tests:
- raise Exception(
- f"It is required to download dependency tools from the internet, but failed to connect to any of {urls}"
- )
- best_url, _ = min(download_tests, key=lambda x: x[1])
- self._download_base_url = best_url
- logger.debug(
- f"Using {best_url} as the base URL for downloading dependency tools"
- )
- def prepare_tools(self):
- """
- Prepare prebuilt binary tools.
- """
- logger.debug("Preparing dependency tools")
- logger.debug(f"OS: {self._os}, Arch: {self._arch}")
- self.download_gguf_parser()
- self.download_fastfetch()
- def remove_cached_tools(self):
- """
- Remove all cached tools.
- """
- if os.path.exists(self.third_party_bin_path):
- shutil.rmtree(self.third_party_bin_path)
- def save_archive(self, archive_path: str):
- """
- Save all downloaded tools as a tar archive.
- """
- # Ensure the directory exists
- target_dir = os.path.dirname(archive_path)
- if target_dir and not os.path.exists(target_dir):
- os.makedirs(target_dir)
- # Remove extension from archive_path for make_archive. e.g., .tar.gz
- base_name = os.path.splitext(os.path.splitext(archive_path)[0])[0]
- logger.info(f"Saving dependency tools to {archive_path}")
- shutil.make_archive(base_name, "gztar", self.third_party_bin_path)
- def load_archive(self, archive_path: str):
- """
- Load downloaded tools from a tar archive.
- """
- if not os.path.isfile(archive_path):
- raise FileNotFoundError(f"Archive file not found: {archive_path}")
- if not os.path.exists(self.third_party_bin_path):
- os.makedirs(self.third_party_bin_path)
- logger.info(f"Loading dependency tools from {archive_path}")
- shutil.unpack_archive(archive_path, self.third_party_bin_path)
- def download_gguf_parser(self):
- version = BUILTIN_GGUF_PARSER_VERSION
- gguf_parser_dir = self.third_party_bin_path.joinpath("gguf-parser")
- os.makedirs(gguf_parser_dir, exist_ok=True)
- file_name = "gguf-parser"
- suffix = ""
- if self._os == "windows":
- suffix = ".exe"
- file_name += suffix
- target_file = gguf_parser_dir.joinpath(file_name)
- if (
- os.path.isfile(target_file)
- and self._current_tools_version.get(file_name) == version
- ):
- logger.debug(f"{file_name} already exists, skipping download")
- return
- platform_name = self._get_gguf_parser_platform_name()
- url_path = f"gpustack/gguf-parser-go/releases/download/{version}/gguf-parser-{platform_name}{suffix}"
- logger.info(f"Downloading gguf-parser-{platform_name} '{version}'")
- self._download_file(url_path, target_file)
- if self._os != "windows":
- st = os.stat(target_file)
- os.chmod(target_file, st.st_mode | stat.S_IEXEC)
- # Update versions.json
- self._update_versions_file(file_name, version)
- def _get_gguf_parser_platform_name(self) -> str:
- platform_name = ""
- if self._os == "darwin":
- platform_name = "darwin-universal"
- elif self._os == "linux" and self._arch == "amd64":
- platform_name = "linux-amd64"
- elif self._os == "linux" and self._arch == "arm64":
- platform_name = "linux-arm64"
- elif self._os == "windows" and self._arch == "amd64":
- platform_name = "windows-amd64"
- elif self._os == "windows" and self._arch == "arm64":
- platform_name = "windows-arm64"
- else:
- raise Exception(f"Unsupported platform: {self._os} {self._arch}")
- return platform_name
- def download_fastfetch(self):
- version = "2.25.0.1"
- fastfetch_dir = self.third_party_bin_path.joinpath("fastfetch")
- fastfetch_tmp_dir = fastfetch_dir.joinpath("tmp")
- platform_name = self._get_fastfetch_platform_name()
- file_name = "fastfetch"
- if self._os == "windows":
- file_name += ".exe"
- target_file = os.path.join(fastfetch_dir, file_name)
- if (
- os.path.isfile(target_file)
- and self._current_tools_version.get(file_name) == version
- ):
- logger.debug(f"{file_name} already exists, skipping download")
- return
- logger.info(f"Downloading fastfetch-{platform_name} '{version}'")
- tmp_file = os.path.join(fastfetch_tmp_dir, f"fastfetch-{platform_name}.zip")
- if os.path.exists(fastfetch_tmp_dir):
- shutil.rmtree(fastfetch_tmp_dir)
- os.makedirs(fastfetch_tmp_dir, exist_ok=True)
- url_path = f"gpustack/fastfetch/releases/download/{version}/fastfetch-{platform_name}.zip"
- self._download_file(url_path, tmp_file)
- self._extract_file(tmp_file, fastfetch_tmp_dir)
- extracted_fastfetch = fastfetch_tmp_dir.joinpath(
- f"fastfetch-{platform_name}",
- "usr",
- "bin",
- "fastfetch",
- )
- if self._os == "windows":
- extracted_fastfetch = fastfetch_tmp_dir.joinpath(
- "fastfetch.exe",
- )
- if os.path.exists(extracted_fastfetch):
- shutil.copy(extracted_fastfetch, target_file)
- else:
- raise Exception("failed to find fastfetch binary in extracted archive")
- if self._os != "windows":
- st = os.stat(target_file)
- os.chmod(target_file, st.st_mode | stat.S_IEXEC)
- # Clean up.
- if os.path.exists(fastfetch_tmp_dir):
- shutil.rmtree(fastfetch_tmp_dir)
- # Update versions.json
- self._update_versions_file(file_name, version)
- def _update_versions_file(self, tool_name: str, version: str):
- updated_versions = self._current_tools_version.copy()
- updated_versions[tool_name] = version
- try:
- with open(self.versions_file, 'w', encoding='utf-8') as file:
- json.dump(updated_versions, file, indent=4)
- self._current_tools_version[tool_name] = version
- except Exception as e:
- logger.error(f"Failed to update versions.json: {e}")
- def _get_fastfetch_platform_name(self) -> str:
- platform_name = ""
- if self._os == "darwin":
- platform_name = "macos-universal"
- elif self._os == "linux" and self._arch == "amd64":
- platform_name = "linux-amd64"
- elif self._os == "linux" and self._arch == "arm64":
- platform_name = "linux-aarch64"
- elif self._os == "windows":
- platform_name = "windows-amd64"
- else:
- raise Exception(f"unsupported platform: {self._os} {self._arch}")
- return platform_name
- def _download_file(
- self,
- url_path: str,
- target_path: str,
- base_url: str = None,
- headers: Optional[Dict[str, str]] = None,
- ):
- """Download a file from the URL to the target path."""
- if not base_url and not self._download_base_url:
- self._check_and_set_download_base_url()
- final_base_url = base_url or self._download_base_url
- url = f"{final_base_url}/{url_path}"
- max_retries = 5
- retries = 0
- while retries < max_retries:
- try:
- with requests.get(
- url,
- stream=True,
- timeout=30,
- headers=headers,
- ) as response:
- response.raise_for_status()
- with open(target_path, 'wb') as f:
- for chunk in response.iter_content(chunk_size=8192):
- f.write(chunk)
- break
- except Exception as e:
- retries += 1
- if retries >= max_retries:
- raise Exception(f"Error downloading from {url}: {e}")
- else:
- logger.debug(
- f"Attempt {retries} failed: {e}. Retrying in 2 seconds..."
- )
- time.sleep(2)
- @staticmethod
- def _extract_file(file_path, target_dir):
- """Extract a file to the target directory."""
- try:
- with zipfile.ZipFile(file_path, 'r') as zip_ref:
- zip_ref.extractall(target_dir)
- except zipfile.BadZipFile as e:
- raise Exception(f"error extracting {file_path}: {e}")
- except Exception as e:
- raise Exception(f"error extracting {file_path}: {e}")
|