locks.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. import logging
  2. import os
  3. import platform
  4. import time
  5. import threading
  6. import json
  7. import socket
  8. from typing import Optional
  9. from filelock import SoftFileLock, Timeout
  10. from modelscope.hub.utils.utils import model_id_to_group_owner_name
  11. _HAS_FCNTL = False
  12. try:
  13. import fcntl
  14. import errno
  15. _HAS_FCNTL = True
  16. except ModuleNotFoundError:
  17. pass
  18. from gpustack.envs import DISABLE_OS_FILELOCK
  19. from gpustack.schemas import ModelFile
  20. from gpustack.schemas.models import SourceEnum
  21. logger = logging.getLogger(__name__)
  22. class HeartbeatSoftFileLock:
  23. def __init__(
  24. self,
  25. lock_path: str,
  26. ttl_seconds: int = 60,
  27. heartbeat_seconds: int = 5,
  28. owner_worker_id: Optional[int] = None,
  29. ):
  30. """
  31. Initialize a heartbeat-backed, lease-based soft file lock.
  32. Parameters:
  33. - lock_path: Path to the .lock file used for coordination.
  34. - ttl_seconds: Only for soft lock, Lease Time-To-Live (in seconds)
  35. for the lock file. If the
  36. current holder crashes and heartbeats stop, a subsequent acquirer will
  37. treat a lock file whose mtime is older than this TTL as stale and may
  38. delete it to recover.
  39. - heartbeat_seconds: Interval (in seconds) at which the lock holder
  40. refreshes the lock file's modification time to signal liveness.
  41. """
  42. self._lock_path = lock_path
  43. self._ttl_seconds = ttl_seconds
  44. self._heartbeat_seconds = heartbeat_seconds
  45. self._os_lock: Optional[int] = None
  46. self._lock = SoftFileLock(lock_path)
  47. self._hb_stop = threading.Event()
  48. self._hb_thread: Optional[threading.Thread] = None
  49. self._owner_worker_id = owner_worker_id
  50. self._using_soft_lock = False
  51. def _cleanup_stale_lock(self):
  52. try:
  53. if os.path.exists(self._lock_path):
  54. mtime = os.path.getmtime(self._lock_path)
  55. if time.time() - mtime > self._ttl_seconds:
  56. os.remove(self._lock_path)
  57. except Exception as e:
  58. # Swallow cleanup errors to avoid interfering with lock acquisition loop
  59. logger.warning(f"Failed to cleanup stale lock: {e}")
  60. def __enter__(self):
  61. if DISABLE_OS_FILELOCK:
  62. self._using_soft_lock = True
  63. else:
  64. self._acquire_os_lock()
  65. if not self._using_soft_lock:
  66. return self
  67. while True:
  68. try:
  69. # Attempt fast acquisition, fall through on contention
  70. self._lock.acquire(timeout=1)
  71. break
  72. except Timeout:
  73. self._cleanup_stale_lock()
  74. time.sleep(1)
  75. except Exception as e:
  76. raise e
  77. try:
  78. info = {
  79. "worker_id": self._owner_worker_id,
  80. "pid": os.getpid(),
  81. "hostname": socket.gethostname(),
  82. "created_at": time.time(),
  83. }
  84. with open(self._lock_path, "w") as f:
  85. json.dump(info, f)
  86. except Exception as e:
  87. logger.warning(f"Failed to write lock info: {e}")
  88. # Start heartbeat to keep lock mtime fresh
  89. self._hb_thread = threading.Thread(target=self._heartbeat, daemon=True)
  90. self._hb_thread.start()
  91. return self
  92. def _heartbeat(self):
  93. while not self._hb_stop.is_set():
  94. try:
  95. os.utime(self._lock_path, None)
  96. except Exception as e:
  97. logger.warning(f"Failed to update lock mtime: {e}")
  98. self._hb_stop.wait(self._heartbeat_seconds)
  99. def __exit__(self, exc_type, exc_val, exc_tb):
  100. # Stop heartbeat
  101. if self._using_soft_lock:
  102. self._hb_stop.set()
  103. if self._hb_thread:
  104. self._hb_thread.join(timeout=1)
  105. try:
  106. self._lock.release()
  107. except Exception:
  108. try:
  109. if os.path.exists(self._lock_path):
  110. os.remove(self._lock_path)
  111. except Exception as e:
  112. logger.warning(f"Failed to release soft lock: {e}")
  113. if not self._using_soft_lock:
  114. self._release_os_lock()
  115. def _acquire_os_lock(self):
  116. if not _HAS_FCNTL:
  117. # Windows doesn't support fcntl — fall back to soft lock
  118. self._using_soft_lock = True
  119. return
  120. dirpath = os.path.dirname(self._lock_path)
  121. if dirpath:
  122. os.makedirs(dirpath, exist_ok=True)
  123. fd = os.open(self._lock_path, os.O_CREAT | os.O_RDWR, 0o644)
  124. try:
  125. fcntl.lockf(fd, fcntl.LOCK_EX)
  126. except OSError as e:
  127. os.close(fd)
  128. if e.errno in (errno.ENOSYS, errno.ENOTSUP):
  129. # File system don't support fcntl
  130. self._using_soft_lock = True
  131. else:
  132. raise e
  133. else:
  134. self._os_lock = fd
  135. def _release_os_lock(self):
  136. fd = self._os_lock
  137. try:
  138. fcntl.lockf(fd, fcntl.LOCK_UN)
  139. if os.path.exists(self._lock_path):
  140. os.remove(self._lock_path)
  141. except Exception as e:
  142. logger.warning(f"Failed to release OS lock: {e}")
  143. finally:
  144. os.close(fd)
  145. self._os_lock = None
  146. def get_lock_path(cache_dir: str, model_file: ModelFile):
  147. model_id = None
  148. if model_file.source == SourceEnum.HUGGING_FACE:
  149. model_id = model_file.huggingface_repo_id
  150. elif model_file.source == SourceEnum.MODEL_SCOPE:
  151. model_id = model_file.model_scope_model_id
  152. group_or_owner, name = model_id_to_group_owner_name(model_id)
  153. return os.path.join(
  154. os.path.join(cache_dir, model_file.source),
  155. group_or_owner,
  156. f"{name}.lock",
  157. )
  158. def read_lock_info(lock_path: str) -> Optional[dict]:
  159. try:
  160. if not os.path.exists(lock_path):
  161. return None
  162. with open(lock_path, "r") as f:
  163. return json.load(f)
  164. except Exception:
  165. return None