| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- import logging
- import os
- import platform
- import time
- import threading
- import json
- import socket
- from typing import Optional
- from filelock import SoftFileLock, Timeout
- from modelscope.hub.utils.utils import model_id_to_group_owner_name
- _HAS_FCNTL = False
- try:
- import fcntl
- import errno
- _HAS_FCNTL = True
- except ModuleNotFoundError:
- pass
- from gpustack.envs import DISABLE_OS_FILELOCK
- from gpustack.schemas import ModelFile
- from gpustack.schemas.models import SourceEnum
- logger = logging.getLogger(__name__)
- class HeartbeatSoftFileLock:
- def __init__(
- self,
- lock_path: str,
- ttl_seconds: int = 60,
- heartbeat_seconds: int = 5,
- owner_worker_id: Optional[int] = None,
- ):
- """
- Initialize a heartbeat-backed, lease-based soft file lock.
- Parameters:
- - lock_path: Path to the .lock file used for coordination.
- - ttl_seconds: Only for soft lock, Lease Time-To-Live (in seconds)
- for the lock file. If the
- current holder crashes and heartbeats stop, a subsequent acquirer will
- treat a lock file whose mtime is older than this TTL as stale and may
- delete it to recover.
- - heartbeat_seconds: Interval (in seconds) at which the lock holder
- refreshes the lock file's modification time to signal liveness.
- """
- self._lock_path = lock_path
- self._ttl_seconds = ttl_seconds
- self._heartbeat_seconds = heartbeat_seconds
- self._os_lock: Optional[int] = None
- self._lock = SoftFileLock(lock_path)
- self._hb_stop = threading.Event()
- self._hb_thread: Optional[threading.Thread] = None
- self._owner_worker_id = owner_worker_id
- self._using_soft_lock = False
- def _cleanup_stale_lock(self):
- try:
- if os.path.exists(self._lock_path):
- mtime = os.path.getmtime(self._lock_path)
- if time.time() - mtime > self._ttl_seconds:
- os.remove(self._lock_path)
- except Exception as e:
- # Swallow cleanup errors to avoid interfering with lock acquisition loop
- logger.warning(f"Failed to cleanup stale lock: {e}")
- def __enter__(self):
- if DISABLE_OS_FILELOCK:
- self._using_soft_lock = True
- else:
- self._acquire_os_lock()
- if not self._using_soft_lock:
- return self
- while True:
- try:
- # Attempt fast acquisition, fall through on contention
- self._lock.acquire(timeout=1)
- break
- except Timeout:
- self._cleanup_stale_lock()
- time.sleep(1)
- except Exception as e:
- raise e
- try:
- info = {
- "worker_id": self._owner_worker_id,
- "pid": os.getpid(),
- "hostname": socket.gethostname(),
- "created_at": time.time(),
- }
- with open(self._lock_path, "w") as f:
- json.dump(info, f)
- except Exception as e:
- logger.warning(f"Failed to write lock info: {e}")
- # Start heartbeat to keep lock mtime fresh
- self._hb_thread = threading.Thread(target=self._heartbeat, daemon=True)
- self._hb_thread.start()
- return self
- def _heartbeat(self):
- while not self._hb_stop.is_set():
- try:
- os.utime(self._lock_path, None)
- except Exception as e:
- logger.warning(f"Failed to update lock mtime: {e}")
- self._hb_stop.wait(self._heartbeat_seconds)
- def __exit__(self, exc_type, exc_val, exc_tb):
- # Stop heartbeat
- if self._using_soft_lock:
- self._hb_stop.set()
- if self._hb_thread:
- self._hb_thread.join(timeout=1)
- try:
- self._lock.release()
- except Exception:
- try:
- if os.path.exists(self._lock_path):
- os.remove(self._lock_path)
- except Exception as e:
- logger.warning(f"Failed to release soft lock: {e}")
- if not self._using_soft_lock:
- self._release_os_lock()
- def _acquire_os_lock(self):
- if not _HAS_FCNTL:
- # Windows doesn't support fcntl — fall back to soft lock
- self._using_soft_lock = True
- return
- dirpath = os.path.dirname(self._lock_path)
- if dirpath:
- os.makedirs(dirpath, exist_ok=True)
- fd = os.open(self._lock_path, os.O_CREAT | os.O_RDWR, 0o644)
- try:
- fcntl.lockf(fd, fcntl.LOCK_EX)
- except OSError as e:
- os.close(fd)
- if e.errno in (errno.ENOSYS, errno.ENOTSUP):
- # File system don't support fcntl
- self._using_soft_lock = True
- else:
- raise e
- else:
- self._os_lock = fd
- def _release_os_lock(self):
- fd = self._os_lock
- try:
- fcntl.lockf(fd, fcntl.LOCK_UN)
- if os.path.exists(self._lock_path):
- os.remove(self._lock_path)
- except Exception as e:
- logger.warning(f"Failed to release OS lock: {e}")
- finally:
- os.close(fd)
- self._os_lock = None
- def get_lock_path(cache_dir: str, model_file: ModelFile):
- model_id = None
- if model_file.source == SourceEnum.HUGGING_FACE:
- model_id = model_file.huggingface_repo_id
- elif model_file.source == SourceEnum.MODEL_SCOPE:
- model_id = model_file.model_scope_model_id
- group_or_owner, name = model_id_to_group_owner_name(model_id)
- return os.path.join(
- os.path.join(cache_dir, model_file.source),
- group_or_owner,
- f"{name}.lock",
- )
- def read_lock_info(lock_path: str) -> Optional[dict]:
- try:
- if not os.path.exists(lock_path):
- return None
- with open(lock_path, "r") as f:
- return json.load(f)
- except Exception:
- return None
|